Skip to content

vllm.model_executor.models.deepseek_v2

Inference-only DeepseekV2/DeepseekV3 model.

logger module-attribute

logger = init_logger(__name__)

DeepseekAttention

Bases: Module

Normal MHA implementation used by Deepseek v1.

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekAttention(nn.Module):
    """Normal MHA implementation used by Deepseek v1."""

    def __init__(
        self,
        vllm_config: VllmConfig,
        config: DeepseekV2Config | DeepseekV3Config,
        hidden_size: int,
        num_heads: int,
        rope_theta: float = 10000,
        rope_parameters: dict[str, Any] | None = None,
        max_position_embeddings: int = 8192,
        cache_config: CacheConfig | None = None,
        quant_config: QuantizationConfig | None = None,
        prefix: str = "",
        **kwargs,
    ) -> None:
        super().__init__()
        self.hidden_size = hidden_size
        tp_size = get_tensor_model_parallel_world_size()
        self.total_num_heads = num_heads
        assert self.total_num_heads % tp_size == 0
        self.num_heads = self.total_num_heads // tp_size
        self.total_num_kv_heads = config.num_key_value_heads
        if self.total_num_kv_heads >= tp_size:
            # Number of KV heads is greater than TP size, so we partition
            # the KV heads across multiple tensor parallel GPUs.
            assert self.total_num_kv_heads % tp_size == 0
        else:
            # Number of KV heads is less than TP size, so we replicate
            # the KV heads across multiple tensor parallel GPUs.
            assert tp_size % self.total_num_kv_heads == 0
        self.num_kv_heads = max(1, self.total_num_kv_heads // tp_size)
        self.head_dim = hidden_size // self.total_num_heads
        self.q_size = self.num_heads * self.head_dim
        self.kv_size = self.num_kv_heads * self.head_dim
        self.scaling = self.head_dim**-0.5
        self.rope_theta = rope_theta
        self.max_position_embeddings = max_position_embeddings

        self.qkv_proj = QKVParallelLinear(
            hidden_size,
            self.head_dim,
            self.total_num_heads,
            self.total_num_kv_heads,
            bias=False,
            quant_config=quant_config,
        )

        self.o_proj = RowParallelLinear(
            self.total_num_heads * self.head_dim,
            hidden_size,
            bias=False,
            quant_config=quant_config,
        )

        self.rotary_emb = get_rope(
            self.head_dim,
            rotary_dim=self.head_dim,
            max_position=max_position_embeddings,
            base=rope_theta,
            rope_parameters=rope_parameters,
        )
        self.attn = Attention(
            self.num_heads,
            self.head_dim,
            self.scaling,
            num_kv_heads=self.num_kv_heads,
            cache_config=cache_config,
            quant_config=quant_config,
            prefix=f"{prefix}.attn",
        )

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        qkv, _ = self.qkv_proj(hidden_states)
        q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
        q, k = self.rotary_emb(positions, q, k)
        attn_output = self.attn(q, k, v)
        output, _ = self.o_proj(attn_output)
        return output

attn instance-attribute

attn = Attention(
    num_heads,
    head_dim,
    scaling,
    num_kv_heads=num_kv_heads,
    cache_config=cache_config,
    quant_config=quant_config,
    prefix=f"{prefix}.attn",
)

head_dim instance-attribute

head_dim = hidden_size // total_num_heads

hidden_size instance-attribute

hidden_size = hidden_size

kv_size instance-attribute

kv_size = num_kv_heads * head_dim

max_position_embeddings instance-attribute

max_position_embeddings = max_position_embeddings

num_heads instance-attribute

num_heads = total_num_heads // tp_size

num_kv_heads instance-attribute

num_kv_heads = max(1, total_num_kv_heads // tp_size)

o_proj instance-attribute

o_proj = RowParallelLinear(
    total_num_heads * head_dim,
    hidden_size,
    bias=False,
    quant_config=quant_config,
)

q_size instance-attribute

q_size = num_heads * head_dim

qkv_proj instance-attribute

qkv_proj = QKVParallelLinear(
    hidden_size,
    head_dim,
    total_num_heads,
    total_num_kv_heads,
    bias=False,
    quant_config=quant_config,
)

rope_theta instance-attribute

rope_theta = rope_theta

rotary_emb instance-attribute

rotary_emb = get_rope(
    head_dim,
    rotary_dim=head_dim,
    max_position=max_position_embeddings,
    base=rope_theta,
    rope_parameters=rope_parameters,
)

scaling instance-attribute

scaling = head_dim ** -0.5

total_num_heads instance-attribute

total_num_heads = num_heads

total_num_kv_heads instance-attribute

total_num_kv_heads = num_key_value_heads

__init__

__init__(
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
    **kwargs,
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
    **kwargs,
) -> None:
    super().__init__()
    self.hidden_size = hidden_size
    tp_size = get_tensor_model_parallel_world_size()
    self.total_num_heads = num_heads
    assert self.total_num_heads % tp_size == 0
    self.num_heads = self.total_num_heads // tp_size
    self.total_num_kv_heads = config.num_key_value_heads
    if self.total_num_kv_heads >= tp_size:
        # Number of KV heads is greater than TP size, so we partition
        # the KV heads across multiple tensor parallel GPUs.
        assert self.total_num_kv_heads % tp_size == 0
    else:
        # Number of KV heads is less than TP size, so we replicate
        # the KV heads across multiple tensor parallel GPUs.
        assert tp_size % self.total_num_kv_heads == 0
    self.num_kv_heads = max(1, self.total_num_kv_heads // tp_size)
    self.head_dim = hidden_size // self.total_num_heads
    self.q_size = self.num_heads * self.head_dim
    self.kv_size = self.num_kv_heads * self.head_dim
    self.scaling = self.head_dim**-0.5
    self.rope_theta = rope_theta
    self.max_position_embeddings = max_position_embeddings

    self.qkv_proj = QKVParallelLinear(
        hidden_size,
        self.head_dim,
        self.total_num_heads,
        self.total_num_kv_heads,
        bias=False,
        quant_config=quant_config,
    )

    self.o_proj = RowParallelLinear(
        self.total_num_heads * self.head_dim,
        hidden_size,
        bias=False,
        quant_config=quant_config,
    )

    self.rotary_emb = get_rope(
        self.head_dim,
        rotary_dim=self.head_dim,
        max_position=max_position_embeddings,
        base=rope_theta,
        rope_parameters=rope_parameters,
    )
    self.attn = Attention(
        self.num_heads,
        self.head_dim,
        self.scaling,
        num_kv_heads=self.num_kv_heads,
        cache_config=cache_config,
        quant_config=quant_config,
        prefix=f"{prefix}.attn",
    )

forward

forward(positions: Tensor, hidden_states: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
) -> torch.Tensor:
    qkv, _ = self.qkv_proj(hidden_states)
    q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
    q, k = self.rotary_emb(positions, q, k)
    attn_output = self.attn(q, k, v)
    output, _ = self.o_proj(attn_output)
    return output

DeepseekForCausalLM

Bases: DeepseekV2ForCausalLM

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekForCausalLM(DeepseekV2ForCausalLM):
    pass

DeepseekV2Attention

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2Attention(nn.Module):
    def __init__(
        self,
        vllm_config: VllmConfig,
        config: DeepseekV2Config | DeepseekV3Config,
        hidden_size: int,
        num_heads: int,
        qk_nope_head_dim: int,
        qk_rope_head_dim: int,
        v_head_dim: int,
        q_lora_rank: int,
        kv_lora_rank: int,
        rope_theta: float = 10000,
        rope_parameters: dict[str, Any] | None = None,
        max_position_embeddings: int = 8192,
        cache_config: CacheConfig | None = None,
        quant_config: QuantizationConfig | None = None,
        topk_indices_buffer: torch.Tensor | None = None,
        prefix: str = "",
    ) -> None:
        super().__init__()
        self.hidden_size = hidden_size
        self.qk_nope_head_dim = qk_nope_head_dim
        self.qk_rope_head_dim = qk_rope_head_dim
        self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim
        self.v_head_dim = v_head_dim
        self.q_lora_rank = q_lora_rank
        self.kv_lora_rank = kv_lora_rank
        self.num_heads = num_heads
        tp_size = get_tensor_model_parallel_world_size()
        assert num_heads % tp_size == 0
        self.num_local_heads = num_heads // tp_size
        self.scaling = self.qk_head_dim**-0.5
        self.rope_theta = rope_theta
        self.max_position_embeddings = max_position_embeddings
        assert topk_indices_buffer is None, (
            "topk_indices_buffer is not \
        supported for DeepseekV2Attention"
        )

        if self.q_lora_rank is not None:
            self.q_a_proj = ReplicatedLinear(
                self.hidden_size,
                self.q_lora_rank,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.q_a_proj",
            )
            self.q_a_layernorm = RMSNorm(self.q_lora_rank, eps=config.rms_norm_eps)
            self.q_b_proj = ColumnParallelLinear(
                q_lora_rank,
                self.num_heads * self.qk_head_dim,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.q_b_proj",
            )
        else:
            self.q_proj = ColumnParallelLinear(
                self.hidden_size,
                self.num_heads * self.qk_head_dim,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.q_proj",
            )

        self.kv_a_proj_with_mqa = ReplicatedLinear(
            self.hidden_size,
            self.kv_lora_rank + self.qk_rope_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.kv_a_proj_with_mqa",
        )
        self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, eps=config.rms_norm_eps)
        self.kv_b_proj = ColumnParallelLinear(
            self.kv_lora_rank,
            self.num_heads * (self.qk_nope_head_dim + self.v_head_dim),
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.kv_b_proj",
        )
        # O projection.
        self.o_proj = RowParallelLinear(
            self.num_heads * self.v_head_dim,
            self.hidden_size,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.o_proj",
        )
        if rope_parameters:
            rope_parameters["rope_type"] = "deepseek_yarn"

        self.rotary_emb = get_rope(
            qk_rope_head_dim,
            rotary_dim=qk_rope_head_dim,
            max_position=max_position_embeddings,
            base=rope_theta,
            rope_parameters=rope_parameters,
            is_neox_style=False,
        )

        if rope_parameters:
            mscale_all_dim = rope_parameters.get("mscale_all_dim", False)
            scaling_factor = rope_parameters["factor"]
            mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim))
            self.scaling = self.scaling * mscale * mscale

        self.attn = Attention(
            self.num_local_heads,
            self.qk_head_dim,
            self.scaling,
            num_kv_heads=self.num_local_heads,
            cache_config=cache_config,
            quant_config=quant_config,
            prefix=f"{prefix}.attn",
        )

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        if self.q_lora_rank is not None:
            q = self.q_a_proj(hidden_states)[0]
            q = self.q_a_layernorm(q)
            q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim)
        else:
            q = self.q_proj(hidden_states)[0].view(
                -1, self.num_local_heads, self.qk_head_dim
            )
        q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1)
        latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0]
        kv_a, _ = latent_cache.split([self.kv_lora_rank, self.qk_rope_head_dim], dim=-1)
        latent_cache = latent_cache.unsqueeze(1)
        kv_a = self.kv_a_layernorm(kv_a)
        kv = self.kv_b_proj(kv_a)[0]
        kv = kv.view(-1, self.num_local_heads, self.qk_nope_head_dim + self.v_head_dim)
        k_nope, v = kv.split([self.qk_nope_head_dim, self.v_head_dim], dim=-1)
        k_pe = latent_cache[:, :, self.kv_lora_rank :]

        q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe)

        q[..., self.qk_nope_head_dim :] = q_pe
        k = torch.empty_like(q)
        k[..., : self.qk_nope_head_dim] = k_nope
        k[..., self.qk_nope_head_dim :] = k_pe
        # padding value to qk_head_dim for alignment
        v = torch.nn.functional.pad(
            v, [0, self.qk_head_dim - self.v_head_dim], value=0
        ).view(-1, self.num_local_heads * self.qk_head_dim)
        attn_output = self.attn(q, k, v)
        attn_output = attn_output.view(-1, self.num_local_heads, self.qk_head_dim)[
            ..., : self.v_head_dim
        ].reshape(-1, self.num_local_heads * self.v_head_dim)
        output, _ = self.o_proj(attn_output)
        return output

attn instance-attribute

attn = Attention(
    num_local_heads,
    qk_head_dim,
    scaling,
    num_kv_heads=num_local_heads,
    cache_config=cache_config,
    quant_config=quant_config,
    prefix=f"{prefix}.attn",
)

hidden_size instance-attribute

hidden_size = hidden_size

kv_a_layernorm instance-attribute

kv_a_layernorm = RMSNorm(kv_lora_rank, eps=rms_norm_eps)

kv_a_proj_with_mqa instance-attribute

kv_a_proj_with_mqa = ReplicatedLinear(
    hidden_size,
    kv_lora_rank + qk_rope_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.kv_a_proj_with_mqa",
)

kv_b_proj instance-attribute

kv_b_proj = ColumnParallelLinear(
    kv_lora_rank,
    num_heads * (qk_nope_head_dim + v_head_dim),
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.kv_b_proj",
)

kv_lora_rank instance-attribute

kv_lora_rank = kv_lora_rank

max_position_embeddings instance-attribute

max_position_embeddings = max_position_embeddings

num_heads instance-attribute

num_heads = num_heads

num_local_heads instance-attribute

num_local_heads = num_heads // tp_size

o_proj instance-attribute

o_proj = RowParallelLinear(
    num_heads * v_head_dim,
    hidden_size,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.o_proj",
)

q_a_layernorm instance-attribute

q_a_layernorm = RMSNorm(q_lora_rank, eps=rms_norm_eps)

q_a_proj instance-attribute

q_a_proj = ReplicatedLinear(
    hidden_size,
    q_lora_rank,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.q_a_proj",
)

q_b_proj instance-attribute

q_b_proj = ColumnParallelLinear(
    q_lora_rank,
    num_heads * qk_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.q_b_proj",
)

q_lora_rank instance-attribute

q_lora_rank = q_lora_rank

q_proj instance-attribute

q_proj = ColumnParallelLinear(
    hidden_size,
    num_heads * qk_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.q_proj",
)

qk_head_dim instance-attribute

qk_head_dim = qk_nope_head_dim + qk_rope_head_dim

qk_nope_head_dim instance-attribute

qk_nope_head_dim = qk_nope_head_dim

qk_rope_head_dim instance-attribute

qk_rope_head_dim = qk_rope_head_dim

rope_theta instance-attribute

rope_theta = rope_theta

rotary_emb instance-attribute

rotary_emb = get_rope(
    qk_rope_head_dim,
    rotary_dim=qk_rope_head_dim,
    max_position=max_position_embeddings,
    base=rope_theta,
    rope_parameters=rope_parameters,
    is_neox_style=False,
)

scaling instance-attribute

scaling = qk_head_dim ** -0.5

v_head_dim instance-attribute

v_head_dim = v_head_dim

__init__

__init__(
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    qk_nope_head_dim: int,
    qk_rope_head_dim: int,
    v_head_dim: int,
    q_lora_rank: int,
    kv_lora_rank: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    topk_indices_buffer: Tensor | None = None,
    prefix: str = "",
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    qk_nope_head_dim: int,
    qk_rope_head_dim: int,
    v_head_dim: int,
    q_lora_rank: int,
    kv_lora_rank: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    topk_indices_buffer: torch.Tensor | None = None,
    prefix: str = "",
) -> None:
    super().__init__()
    self.hidden_size = hidden_size
    self.qk_nope_head_dim = qk_nope_head_dim
    self.qk_rope_head_dim = qk_rope_head_dim
    self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim
    self.v_head_dim = v_head_dim
    self.q_lora_rank = q_lora_rank
    self.kv_lora_rank = kv_lora_rank
    self.num_heads = num_heads
    tp_size = get_tensor_model_parallel_world_size()
    assert num_heads % tp_size == 0
    self.num_local_heads = num_heads // tp_size
    self.scaling = self.qk_head_dim**-0.5
    self.rope_theta = rope_theta
    self.max_position_embeddings = max_position_embeddings
    assert topk_indices_buffer is None, (
        "topk_indices_buffer is not \
    supported for DeepseekV2Attention"
    )

    if self.q_lora_rank is not None:
        self.q_a_proj = ReplicatedLinear(
            self.hidden_size,
            self.q_lora_rank,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.q_a_proj",
        )
        self.q_a_layernorm = RMSNorm(self.q_lora_rank, eps=config.rms_norm_eps)
        self.q_b_proj = ColumnParallelLinear(
            q_lora_rank,
            self.num_heads * self.qk_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.q_b_proj",
        )
    else:
        self.q_proj = ColumnParallelLinear(
            self.hidden_size,
            self.num_heads * self.qk_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.q_proj",
        )

    self.kv_a_proj_with_mqa = ReplicatedLinear(
        self.hidden_size,
        self.kv_lora_rank + self.qk_rope_head_dim,
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.kv_a_proj_with_mqa",
    )
    self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, eps=config.rms_norm_eps)
    self.kv_b_proj = ColumnParallelLinear(
        self.kv_lora_rank,
        self.num_heads * (self.qk_nope_head_dim + self.v_head_dim),
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.kv_b_proj",
    )
    # O projection.
    self.o_proj = RowParallelLinear(
        self.num_heads * self.v_head_dim,
        self.hidden_size,
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.o_proj",
    )
    if rope_parameters:
        rope_parameters["rope_type"] = "deepseek_yarn"

    self.rotary_emb = get_rope(
        qk_rope_head_dim,
        rotary_dim=qk_rope_head_dim,
        max_position=max_position_embeddings,
        base=rope_theta,
        rope_parameters=rope_parameters,
        is_neox_style=False,
    )

    if rope_parameters:
        mscale_all_dim = rope_parameters.get("mscale_all_dim", False)
        scaling_factor = rope_parameters["factor"]
        mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim))
        self.scaling = self.scaling * mscale * mscale

    self.attn = Attention(
        self.num_local_heads,
        self.qk_head_dim,
        self.scaling,
        num_kv_heads=self.num_local_heads,
        cache_config=cache_config,
        quant_config=quant_config,
        prefix=f"{prefix}.attn",
    )

forward

forward(positions: Tensor, hidden_states: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
) -> torch.Tensor:
    if self.q_lora_rank is not None:
        q = self.q_a_proj(hidden_states)[0]
        q = self.q_a_layernorm(q)
        q = self.q_b_proj(q)[0].view(-1, self.num_local_heads, self.qk_head_dim)
    else:
        q = self.q_proj(hidden_states)[0].view(
            -1, self.num_local_heads, self.qk_head_dim
        )
    q_nope, q_pe = q.split([self.qk_nope_head_dim, self.qk_rope_head_dim], dim=-1)
    latent_cache = self.kv_a_proj_with_mqa(hidden_states)[0]
    kv_a, _ = latent_cache.split([self.kv_lora_rank, self.qk_rope_head_dim], dim=-1)
    latent_cache = latent_cache.unsqueeze(1)
    kv_a = self.kv_a_layernorm(kv_a)
    kv = self.kv_b_proj(kv_a)[0]
    kv = kv.view(-1, self.num_local_heads, self.qk_nope_head_dim + self.v_head_dim)
    k_nope, v = kv.split([self.qk_nope_head_dim, self.v_head_dim], dim=-1)
    k_pe = latent_cache[:, :, self.kv_lora_rank :]

    q_pe, k_pe = self.rotary_emb(positions, q_pe, k_pe)

    q[..., self.qk_nope_head_dim :] = q_pe
    k = torch.empty_like(q)
    k[..., : self.qk_nope_head_dim] = k_nope
    k[..., self.qk_nope_head_dim :] = k_pe
    # padding value to qk_head_dim for alignment
    v = torch.nn.functional.pad(
        v, [0, self.qk_head_dim - self.v_head_dim], value=0
    ).view(-1, self.num_local_heads * self.qk_head_dim)
    attn_output = self.attn(q, k, v)
    attn_output = attn_output.view(-1, self.num_local_heads, self.qk_head_dim)[
        ..., : self.v_head_dim
    ].reshape(-1, self.num_local_heads * self.v_head_dim)
    output, _ = self.o_proj(attn_output)
    return output

DeepseekV2DecoderLayer

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2DecoderLayer(nn.Module):
    def __init__(
        self,
        vllm_config: VllmConfig,
        prefix: str,
        config: DeepseekV2Config | None = None,
        topk_indices_buffer: torch.Tensor | None = None,
    ) -> None:
        super().__init__()

        if config is None:
            config = vllm_config.model_config.hf_config
        model_config = vllm_config.model_config
        cache_config = vllm_config.cache_config
        quant_config = vllm_config.quant_config
        parallel_config = vllm_config.parallel_config

        self.hidden_size = config.hidden_size
        max_position_embeddings = getattr(config, "max_position_embeddings", 8192)
        moe_layer_freq = getattr(config, "moe_layer_freq", 1)
        # DecoderLayers are created with `make_layers` which passes the prefix
        # with the layer's index.
        layer_idx = int(prefix.split(sep=".")[-1])
        self.layer_idx = layer_idx

        # verify MLA attention specific fields
        qk_nope_head_dim = getattr(config, "qk_nope_head_dim", 0)
        qk_rope_head_dim = getattr(config, "qk_rope_head_dim", 0)
        v_head_dim = getattr(config, "v_head_dim", 0)
        kv_lora_rank = getattr(config, "kv_lora_rank", 0)
        use_mha = config.model_type == "deepseek" or all(
            dim == 0 for dim in (qk_nope_head_dim, qk_rope_head_dim)
        )

        if use_mha:
            attn_cls = DeepseekAttention
        elif model_config.use_mla:
            attn_cls = DeepseekV2MLAAttention
        else:
            attn_cls = DeepseekV2Attention
        self.self_attn = attn_cls(
            vllm_config=vllm_config,
            config=config,
            hidden_size=self.hidden_size,
            num_heads=config.num_attention_heads,
            qk_nope_head_dim=qk_nope_head_dim,
            qk_rope_head_dim=qk_rope_head_dim,
            v_head_dim=v_head_dim,
            q_lora_rank=config.q_lora_rank if hasattr(config, "q_lora_rank") else None,
            kv_lora_rank=kv_lora_rank,
            rope_theta=config.rope_parameters["rope_theta"],
            rope_parameters=config.rope_parameters,
            max_position_embeddings=max_position_embeddings,
            cache_config=cache_config,
            quant_config=quant_config,
            prefix=f"{prefix}.self_attn",
            topk_indices_buffer=topk_indices_buffer,
        )

        if (
            config.n_routed_experts is not None
            and layer_idx >= config.first_k_dense_replace
            and layer_idx % moe_layer_freq == 0
        ):
            self.mlp = DeepseekV2MoE(
                config=config,
                parallel_config=parallel_config,
                quant_config=quant_config,
                prefix=f"{prefix}.mlp",
            )
        else:
            self.mlp = DeepseekV2MLP(
                hidden_size=config.hidden_size,
                intermediate_size=config.intermediate_size,
                hidden_act=config.hidden_act,
                quant_config=quant_config,
                prefix=f"{prefix}.mlp",
            )
        self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.post_attention_layernorm = RMSNorm(
            config.hidden_size, eps=config.rms_norm_eps
        )
        self.routed_scaling_factor = getattr(config, "routed_scaling_factor", 1.0)

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        residual: torch.Tensor | None,
    ) -> torch.Tensor:
        # Self Attention
        if residual is None:
            residual = hidden_states.clone()
            hidden_states = self.input_layernorm(hidden_states)
        else:
            hidden_states, residual = self.input_layernorm(hidden_states, residual)
        hidden_states = self.self_attn(
            positions=positions,
            hidden_states=hidden_states,
        )

        if (
            not isinstance(self.self_attn, DeepseekAttention)
            and hidden_states.dtype == torch.float16
        ):
            # Fix FP16 overflow
            # We scale both hidden_states and residual before
            # rmsnorm, and rmsnorm result would not affect by scale.
            hidden_states *= 1.0 / self.routed_scaling_factor
            if self.layer_idx == 0:
                # The residual is shared by all layers, we only scale it on
                # first layer.
                residual *= 1.0 / self.routed_scaling_factor

        # Fully Connected
        hidden_states, residual = self.post_attention_layernorm(hidden_states, residual)
        hidden_states = self.mlp(hidden_states)

        if isinstance(self.mlp, DeepseekV2MLP) and hidden_states.dtype == torch.float16:
            # Fix FP16 overflow
            # Scaling the DeepseekV2MLP output, it is the input of
            # input_layernorm of next decoder layer.
            # The scaling of DeepseekV2MOE output would be done in the forward
            # of DeepseekV2MOE
            hidden_states *= 1.0 / self.routed_scaling_factor

        return hidden_states, residual

hidden_size instance-attribute

hidden_size = hidden_size

input_layernorm instance-attribute

input_layernorm = RMSNorm(hidden_size, eps=rms_norm_eps)

layer_idx instance-attribute

layer_idx = layer_idx

mlp instance-attribute

mlp = DeepseekV2MoE(
    config=config,
    parallel_config=parallel_config,
    quant_config=quant_config,
    prefix=f"{prefix}.mlp",
)

post_attention_layernorm instance-attribute

post_attention_layernorm = RMSNorm(
    hidden_size, eps=rms_norm_eps
)

routed_scaling_factor instance-attribute

routed_scaling_factor = getattr(
    config, "routed_scaling_factor", 1.0
)

self_attn instance-attribute

self_attn = attn_cls(
    vllm_config=vllm_config,
    config=config,
    hidden_size=hidden_size,
    num_heads=num_attention_heads,
    qk_nope_head_dim=qk_nope_head_dim,
    qk_rope_head_dim=qk_rope_head_dim,
    v_head_dim=v_head_dim,
    q_lora_rank=q_lora_rank
    if hasattr(config, "q_lora_rank")
    else None,
    kv_lora_rank=kv_lora_rank,
    rope_theta=rope_parameters["rope_theta"],
    rope_parameters=rope_parameters,
    max_position_embeddings=max_position_embeddings,
    cache_config=cache_config,
    quant_config=quant_config,
    prefix=f"{prefix}.self_attn",
    topk_indices_buffer=topk_indices_buffer,
)

__init__

__init__(
    vllm_config: VllmConfig,
    prefix: str,
    config: DeepseekV2Config | None = None,
    topk_indices_buffer: Tensor | None = None,
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    vllm_config: VllmConfig,
    prefix: str,
    config: DeepseekV2Config | None = None,
    topk_indices_buffer: torch.Tensor | None = None,
) -> None:
    super().__init__()

    if config is None:
        config = vllm_config.model_config.hf_config
    model_config = vllm_config.model_config
    cache_config = vllm_config.cache_config
    quant_config = vllm_config.quant_config
    parallel_config = vllm_config.parallel_config

    self.hidden_size = config.hidden_size
    max_position_embeddings = getattr(config, "max_position_embeddings", 8192)
    moe_layer_freq = getattr(config, "moe_layer_freq", 1)
    # DecoderLayers are created with `make_layers` which passes the prefix
    # with the layer's index.
    layer_idx = int(prefix.split(sep=".")[-1])
    self.layer_idx = layer_idx

    # verify MLA attention specific fields
    qk_nope_head_dim = getattr(config, "qk_nope_head_dim", 0)
    qk_rope_head_dim = getattr(config, "qk_rope_head_dim", 0)
    v_head_dim = getattr(config, "v_head_dim", 0)
    kv_lora_rank = getattr(config, "kv_lora_rank", 0)
    use_mha = config.model_type == "deepseek" or all(
        dim == 0 for dim in (qk_nope_head_dim, qk_rope_head_dim)
    )

    if use_mha:
        attn_cls = DeepseekAttention
    elif model_config.use_mla:
        attn_cls = DeepseekV2MLAAttention
    else:
        attn_cls = DeepseekV2Attention
    self.self_attn = attn_cls(
        vllm_config=vllm_config,
        config=config,
        hidden_size=self.hidden_size,
        num_heads=config.num_attention_heads,
        qk_nope_head_dim=qk_nope_head_dim,
        qk_rope_head_dim=qk_rope_head_dim,
        v_head_dim=v_head_dim,
        q_lora_rank=config.q_lora_rank if hasattr(config, "q_lora_rank") else None,
        kv_lora_rank=kv_lora_rank,
        rope_theta=config.rope_parameters["rope_theta"],
        rope_parameters=config.rope_parameters,
        max_position_embeddings=max_position_embeddings,
        cache_config=cache_config,
        quant_config=quant_config,
        prefix=f"{prefix}.self_attn",
        topk_indices_buffer=topk_indices_buffer,
    )

    if (
        config.n_routed_experts is not None
        and layer_idx >= config.first_k_dense_replace
        and layer_idx % moe_layer_freq == 0
    ):
        self.mlp = DeepseekV2MoE(
            config=config,
            parallel_config=parallel_config,
            quant_config=quant_config,
            prefix=f"{prefix}.mlp",
        )
    else:
        self.mlp = DeepseekV2MLP(
            hidden_size=config.hidden_size,
            intermediate_size=config.intermediate_size,
            hidden_act=config.hidden_act,
            quant_config=quant_config,
            prefix=f"{prefix}.mlp",
        )
    self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
    self.post_attention_layernorm = RMSNorm(
        config.hidden_size, eps=config.rms_norm_eps
    )
    self.routed_scaling_factor = getattr(config, "routed_scaling_factor", 1.0)

forward

forward(
    positions: Tensor,
    hidden_states: Tensor,
    residual: Tensor | None,
) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
    residual: torch.Tensor | None,
) -> torch.Tensor:
    # Self Attention
    if residual is None:
        residual = hidden_states.clone()
        hidden_states = self.input_layernorm(hidden_states)
    else:
        hidden_states, residual = self.input_layernorm(hidden_states, residual)
    hidden_states = self.self_attn(
        positions=positions,
        hidden_states=hidden_states,
    )

    if (
        not isinstance(self.self_attn, DeepseekAttention)
        and hidden_states.dtype == torch.float16
    ):
        # Fix FP16 overflow
        # We scale both hidden_states and residual before
        # rmsnorm, and rmsnorm result would not affect by scale.
        hidden_states *= 1.0 / self.routed_scaling_factor
        if self.layer_idx == 0:
            # The residual is shared by all layers, we only scale it on
            # first layer.
            residual *= 1.0 / self.routed_scaling_factor

    # Fully Connected
    hidden_states, residual = self.post_attention_layernorm(hidden_states, residual)
    hidden_states = self.mlp(hidden_states)

    if isinstance(self.mlp, DeepseekV2MLP) and hidden_states.dtype == torch.float16:
        # Fix FP16 overflow
        # Scaling the DeepseekV2MLP output, it is the input of
        # input_layernorm of next decoder layer.
        # The scaling of DeepseekV2MOE output would be done in the forward
        # of DeepseekV2MOE
        hidden_states *= 1.0 / self.routed_scaling_factor

    return hidden_states, residual

DeepseekV2ForCausalLM

Bases: Module, SupportsPP, DeepseekV2MixtureOfExperts, SupportsLoRA

Source code in vllm/model_executor/models/deepseek_v2.py
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
class DeepseekV2ForCausalLM(
    nn.Module, SupportsPP, DeepseekV2MixtureOfExperts, SupportsLoRA
):
    packed_modules_mapping = {
        "gate_up_proj": ["gate_proj", "up_proj"],
    }

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config
        quant_config = vllm_config.quant_config
        self.config = config
        self.quant_config = quant_config

        qk_nope_head_dim = getattr(config, "qk_nope_head_dim", 0)
        qk_rope_head_dim = getattr(config, "qk_rope_head_dim", 0)
        self.use_mha = config.model_type == "deepseek" or all(
            dim == 0 for dim in (qk_nope_head_dim, qk_rope_head_dim)
        )

        if self.use_mha:
            self.packed_modules_mapping["qkv_proj"] = ["q_proj", "k_proj", "v_proj"]

        # `packed_modules_mapping` needs to be modified before
        # initializing DeepseekV2Model, as it is passed inplace to
        # quantization config init and may be used to select the
        # quant_method for relevant layers during initialization.
        self.fuse_qkv_a_proj = (
            hasattr(config, "q_lora_rank") and config.q_lora_rank is not None
        )
        if self.fuse_qkv_a_proj:
            self.packed_modules_mapping["fused_qkv_a_proj"] = [
                "q_a_proj",
                "kv_a_proj_with_mqa",
            ]

        self.model = DeepseekV2Model(
            vllm_config=vllm_config, prefix=maybe_prefix(prefix, "model")
        )
        if get_pp_group().is_last_rank:
            self.lm_head = ParallelLMHead(
                config.vocab_size,
                config.hidden_size,
                quant_config=quant_config,
                prefix=maybe_prefix(prefix, "lm_head"),
            )
        else:
            self.lm_head = PPMissingLayer()
        self.logits_processor = LogitsProcessor(config.vocab_size)
        self.make_empty_intermediate_tensors = (
            self.model.make_empty_intermediate_tensors
        )
        # Set MoE hyperparameters
        self.num_moe_layers = (
            self.config.num_hidden_layers - self.config.first_k_dense_replace
        )
        self.set_moe_parameters()

    def set_moe_parameters(self):
        self.expert_weights = []

        self.num_expert_groups = getattr(self.config, "n_group", 1)

        self.moe_layers = []
        self.moe_mlp_layers = []
        example_moe = None
        for layer in self.model.layers:
            if isinstance(layer, PPMissingLayer):
                continue

            assert isinstance(layer, DeepseekV2DecoderLayer)
            if isinstance(layer.mlp, DeepseekV2MoE):
                # Pick last one layer since the first ones may be dense layers.
                example_moe = layer.mlp
                self.moe_mlp_layers.append(layer.mlp)
                self.moe_layers.append(layer.mlp.experts)

        self.extract_moe_parameters(example_moe)

    def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
        return self.model.embed_input_ids(input_ids)

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None = None,
        inputs_embeds: torch.Tensor | None = None,
    ) -> torch.Tensor | IntermediateTensors:
        hidden_states = self.model(
            input_ids, positions, intermediate_tensors, inputs_embeds
        )
        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor | None:
        logits = self.logits_processor(self.lm_head, hidden_states)
        return logits

    def get_expert_mapping(self) -> list[tuple[str, str, int, str]]:
        # Params for weights, fp8 weight scales, fp8 activation scales
        # (param_name, weight_name, expert_id, shard_id)
        return SharedFusedMoE.make_expert_params_mapping(
            ckpt_gate_proj_name="gate_proj",
            ckpt_down_proj_name="down_proj",
            ckpt_up_proj_name="up_proj",
            num_experts=self.config.n_routed_experts,
            num_redundant_experts=0,
        )

    def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]:
        rocm_aiter_moe_shared_expert_enabled = (
            rocm_aiter_ops.is_fusion_moe_shared_experts_enabled()
        )
        stacked_params_mapping = [
            # (param_name, shard_name, shard_id)
            ("gate_up_proj", "gate_proj", 0),
            ("gate_up_proj", "up_proj", 1),
        ]
        mla_params_mapping = [
            ("fused_qkv_a_proj", "q_a_proj", 0),
            ("fused_qkv_a_proj", "kv_a_proj_with_mqa", 1),
        ]
        mha_params_mapping = [
            ("qkv_proj", "q_proj", "q"),
            ("qkv_proj", "k_proj", "k"),
            ("qkv_proj", "v_proj", "v"),
        ]
        if self.use_mha:
            stacked_params_mapping.extend(mha_params_mapping)
        else:
            stacked_params_mapping.extend(mla_params_mapping)

        # Params for weights, fp8 weight scales, fp8 activation scales
        # (param_name, weight_name, expert_id, shard_id)
        expert_params_mapping = SharedFusedMoE.make_expert_params_mapping(
            ckpt_gate_proj_name="gate_proj",
            ckpt_down_proj_name="down_proj",
            ckpt_up_proj_name="up_proj",
            num_experts=self.config.n_routed_experts
            + (
                self.config.n_shared_experts
                if rocm_aiter_moe_shared_expert_enabled
                else 0
            ),
            num_redundant_experts=self.num_redundant_experts,
        )

        params_dict = dict(self.named_parameters())
        loaded_params: set[str] = set()
        for name, loaded_weight in weights:
            if "rotary_emb.inv_freq" in name:
                continue

            spec_layer = get_spec_layer_idx_from_weight_name(self.config, name)
            if spec_layer is not None:
                continue  # skip spec decode layers for main model

            is_fuse_shared_experts_layer = rocm_aiter_moe_shared_expert_enabled and (
                "mlp.shared_experts" in name
            )

            for param_name, weight_name, shard_id in stacked_params_mapping:
                # Skip non-stacked layers and experts (experts handled below).
                if weight_name not in name:
                    continue
                # We have mlp.experts[0].gate_proj in the checkpoint.
                # Since we handle the experts below in expert_params_mapping,
                # we need to skip here BEFORE we update the name, otherwise
                # name will be updated to mlp.experts[0].gate_up_proj, which
                # will then be updated below in expert_params_mapping
                # for mlp.experts[0].gate_gate_up_proj, which breaks load.
                if ("mlp.experts." in name) and name not in params_dict:
                    continue
                if is_fuse_shared_experts_layer:
                    continue
                name_mapped = name.replace(weight_name, param_name)

                # QKV fusion is optional, fall back to normal
                # weight loading if it's not enabled
                # if go with fusion option, then update name
                if (
                    param_name == "fused_qkv_a_proj"
                ) and name_mapped not in params_dict:
                    continue
                else:
                    name = name_mapped
                # Skip loading extra bias for GPTQ models.
                if name.endswith(".bias") and name not in params_dict:
                    continue

                if is_pp_missing_parameter(name, self):
                    continue

                param = params_dict[name]
                weight_loader = param.weight_loader
                weight_loader(param, loaded_weight, shard_id)
                break
            else:
                is_expert_weight = False

                # Special handling: when AITER fusion_shared_experts is enabled,
                # checkpoints may provide a single widened shared_experts tensor
                # without explicit expert indices
                # (e.g. ...mlp.shared_experts.gate_proj.weight).
                # For models with multiple shared experts, split that tensor
                # evenly into per-shared-expert slices and load them into
                # appended expert slots mlp.experts.{n_routed_experts + j}.*
                # accordingly.
                num_chunks = 1
                if is_fuse_shared_experts_layer:
                    num_chunks = getattr(self.config, "n_shared_experts", 1) or 1
                    # Determine split axis based on op type
                    # gate/up: ColumnParallel → split along dim 0
                    # down: RowParallel → split along dim 1
                    split_dim = 1 if "down_proj.weight" in name else 0
                    total = loaded_weight.shape[split_dim]
                    assert total % num_chunks == 0, (
                        f"Shared expert weight dim {total} "
                        f"not divisible by num_chunks {num_chunks}"
                    )
                    chunk_size = total // num_chunks

                for j in range(num_chunks):
                    chunk_name = name
                    weight_to_load = loaded_weight

                    if is_fuse_shared_experts_layer:
                        if split_dim == 0:
                            weight_to_load = loaded_weight[
                                j * chunk_size : (j + 1) * chunk_size, :
                            ]
                        else:
                            weight_to_load = loaded_weight[
                                :, j * chunk_size : (j + 1) * chunk_size
                            ]
                        # Synthesize an expert-style name so expert mapping
                        # can route it
                        chunk_name = name.replace(
                            "mlp.shared_experts",
                            f"mlp.experts.{self.config.n_routed_experts + j}",
                        )

                    # Use expert_params_mapping to locate the destination
                    # param and delegate to its expert-aware weight_loader
                    # with expert_id.
                    for mapping in expert_params_mapping:
                        param_name, weight_name, expert_id, shard_id = mapping
                        if weight_name not in chunk_name:
                            continue

                        # Anyway, this is an expert weight and should not be
                        # attempted to load as other weights later
                        is_expert_weight = True

                        # Do not modify `name` since the loop may continue here
                        # Instead, create a new variable
                        name_mapped = chunk_name.replace(weight_name, param_name)

                        if is_pp_missing_parameter(name_mapped, self):
                            continue

                        param = params_dict[name_mapped]
                        # We should ask the weight loader to return success or
                        # not here since otherwise we may skip experts with
                        # other available replicas.
                        weight_loader = typing.cast(
                            Callable[..., bool], param.weight_loader
                        )
                        success = weight_loader(
                            param,
                            weight_to_load,
                            name_mapped,
                            shard_id=shard_id,
                            expert_id=expert_id,
                            return_success=True,
                        )
                        if success:
                            if not is_fuse_shared_experts_layer:
                                name = name_mapped
                            else:
                                loaded_params.add(name_mapped)
                            break
                    else:
                        if is_expert_weight:
                            # We've checked that this is an expert weight
                            # However it's not mapped locally to this rank
                            # So we simply skip it
                            continue

                        # Skip loading extra bias for GPTQ models.
                        if name.endswith(".bias") and name not in params_dict:
                            continue

                        # Remapping the name of FP8 kv-scale.
                        name = maybe_remap_kv_scale_name(name, params_dict)
                        if name is None:
                            continue

                        if is_pp_missing_parameter(name, self):
                            continue

                        param = params_dict[name]
                        weight_loader = getattr(
                            param, "weight_loader", default_weight_loader
                        )
                        weight_loader(param, loaded_weight)
            if not is_fuse_shared_experts_layer:
                loaded_params.add(name)

        return loaded_params

config instance-attribute

config = config

fuse_qkv_a_proj instance-attribute

fuse_qkv_a_proj = (
    hasattr(config, "q_lora_rank")
    and q_lora_rank is not None
)

lm_head instance-attribute

lm_head = ParallelLMHead(
    vocab_size,
    hidden_size,
    quant_config=quant_config,
    prefix=maybe_prefix(prefix, "lm_head"),
)

logits_processor instance-attribute

logits_processor = LogitsProcessor(vocab_size)

make_empty_intermediate_tensors instance-attribute

make_empty_intermediate_tensors = (
    make_empty_intermediate_tensors
)

model instance-attribute

model = DeepseekV2Model(
    vllm_config=vllm_config,
    prefix=maybe_prefix(prefix, "model"),
)

num_moe_layers instance-attribute

num_moe_layers = num_hidden_layers - first_k_dense_replace

packed_modules_mapping class-attribute instance-attribute

packed_modules_mapping = {
    "gate_up_proj": ["gate_proj", "up_proj"]
}

quant_config instance-attribute

quant_config = quant_config

use_mha instance-attribute

use_mha = model_type == "deepseek" or all(
    (dim == 0)
    for dim in (qk_nope_head_dim, qk_rope_head_dim)
)

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()
    config = vllm_config.model_config.hf_config
    quant_config = vllm_config.quant_config
    self.config = config
    self.quant_config = quant_config

    qk_nope_head_dim = getattr(config, "qk_nope_head_dim", 0)
    qk_rope_head_dim = getattr(config, "qk_rope_head_dim", 0)
    self.use_mha = config.model_type == "deepseek" or all(
        dim == 0 for dim in (qk_nope_head_dim, qk_rope_head_dim)
    )

    if self.use_mha:
        self.packed_modules_mapping["qkv_proj"] = ["q_proj", "k_proj", "v_proj"]

    # `packed_modules_mapping` needs to be modified before
    # initializing DeepseekV2Model, as it is passed inplace to
    # quantization config init and may be used to select the
    # quant_method for relevant layers during initialization.
    self.fuse_qkv_a_proj = (
        hasattr(config, "q_lora_rank") and config.q_lora_rank is not None
    )
    if self.fuse_qkv_a_proj:
        self.packed_modules_mapping["fused_qkv_a_proj"] = [
            "q_a_proj",
            "kv_a_proj_with_mqa",
        ]

    self.model = DeepseekV2Model(
        vllm_config=vllm_config, prefix=maybe_prefix(prefix, "model")
    )
    if get_pp_group().is_last_rank:
        self.lm_head = ParallelLMHead(
            config.vocab_size,
            config.hidden_size,
            quant_config=quant_config,
            prefix=maybe_prefix(prefix, "lm_head"),
        )
    else:
        self.lm_head = PPMissingLayer()
    self.logits_processor = LogitsProcessor(config.vocab_size)
    self.make_empty_intermediate_tensors = (
        self.model.make_empty_intermediate_tensors
    )
    # Set MoE hyperparameters
    self.num_moe_layers = (
        self.config.num_hidden_layers - self.config.first_k_dense_replace
    )
    self.set_moe_parameters()

compute_logits

compute_logits(hidden_states: Tensor) -> Tensor | None
Source code in vllm/model_executor/models/deepseek_v2.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
) -> torch.Tensor | None:
    logits = self.logits_processor(self.lm_head, hidden_states)
    return logits

embed_input_ids

embed_input_ids(input_ids: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
    return self.model.embed_input_ids(input_ids)

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    intermediate_tensors: IntermediateTensors | None = None,
    inputs_embeds: Tensor | None = None,
) -> Tensor | IntermediateTensors
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    intermediate_tensors: IntermediateTensors | None = None,
    inputs_embeds: torch.Tensor | None = None,
) -> torch.Tensor | IntermediateTensors:
    hidden_states = self.model(
        input_ids, positions, intermediate_tensors, inputs_embeds
    )
    return hidden_states

get_expert_mapping

get_expert_mapping() -> list[tuple[str, str, int, str]]
Source code in vllm/model_executor/models/deepseek_v2.py
def get_expert_mapping(self) -> list[tuple[str, str, int, str]]:
    # Params for weights, fp8 weight scales, fp8 activation scales
    # (param_name, weight_name, expert_id, shard_id)
    return SharedFusedMoE.make_expert_params_mapping(
        ckpt_gate_proj_name="gate_proj",
        ckpt_down_proj_name="down_proj",
        ckpt_up_proj_name="up_proj",
        num_experts=self.config.n_routed_experts,
        num_redundant_experts=0,
    )

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]
Source code in vllm/model_executor/models/deepseek_v2.py
def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]:
    rocm_aiter_moe_shared_expert_enabled = (
        rocm_aiter_ops.is_fusion_moe_shared_experts_enabled()
    )
    stacked_params_mapping = [
        # (param_name, shard_name, shard_id)
        ("gate_up_proj", "gate_proj", 0),
        ("gate_up_proj", "up_proj", 1),
    ]
    mla_params_mapping = [
        ("fused_qkv_a_proj", "q_a_proj", 0),
        ("fused_qkv_a_proj", "kv_a_proj_with_mqa", 1),
    ]
    mha_params_mapping = [
        ("qkv_proj", "q_proj", "q"),
        ("qkv_proj", "k_proj", "k"),
        ("qkv_proj", "v_proj", "v"),
    ]
    if self.use_mha:
        stacked_params_mapping.extend(mha_params_mapping)
    else:
        stacked_params_mapping.extend(mla_params_mapping)

    # Params for weights, fp8 weight scales, fp8 activation scales
    # (param_name, weight_name, expert_id, shard_id)
    expert_params_mapping = SharedFusedMoE.make_expert_params_mapping(
        ckpt_gate_proj_name="gate_proj",
        ckpt_down_proj_name="down_proj",
        ckpt_up_proj_name="up_proj",
        num_experts=self.config.n_routed_experts
        + (
            self.config.n_shared_experts
            if rocm_aiter_moe_shared_expert_enabled
            else 0
        ),
        num_redundant_experts=self.num_redundant_experts,
    )

    params_dict = dict(self.named_parameters())
    loaded_params: set[str] = set()
    for name, loaded_weight in weights:
        if "rotary_emb.inv_freq" in name:
            continue

        spec_layer = get_spec_layer_idx_from_weight_name(self.config, name)
        if spec_layer is not None:
            continue  # skip spec decode layers for main model

        is_fuse_shared_experts_layer = rocm_aiter_moe_shared_expert_enabled and (
            "mlp.shared_experts" in name
        )

        for param_name, weight_name, shard_id in stacked_params_mapping:
            # Skip non-stacked layers and experts (experts handled below).
            if weight_name not in name:
                continue
            # We have mlp.experts[0].gate_proj in the checkpoint.
            # Since we handle the experts below in expert_params_mapping,
            # we need to skip here BEFORE we update the name, otherwise
            # name will be updated to mlp.experts[0].gate_up_proj, which
            # will then be updated below in expert_params_mapping
            # for mlp.experts[0].gate_gate_up_proj, which breaks load.
            if ("mlp.experts." in name) and name not in params_dict:
                continue
            if is_fuse_shared_experts_layer:
                continue
            name_mapped = name.replace(weight_name, param_name)

            # QKV fusion is optional, fall back to normal
            # weight loading if it's not enabled
            # if go with fusion option, then update name
            if (
                param_name == "fused_qkv_a_proj"
            ) and name_mapped not in params_dict:
                continue
            else:
                name = name_mapped
            # Skip loading extra bias for GPTQ models.
            if name.endswith(".bias") and name not in params_dict:
                continue

            if is_pp_missing_parameter(name, self):
                continue

            param = params_dict[name]
            weight_loader = param.weight_loader
            weight_loader(param, loaded_weight, shard_id)
            break
        else:
            is_expert_weight = False

            # Special handling: when AITER fusion_shared_experts is enabled,
            # checkpoints may provide a single widened shared_experts tensor
            # without explicit expert indices
            # (e.g. ...mlp.shared_experts.gate_proj.weight).
            # For models with multiple shared experts, split that tensor
            # evenly into per-shared-expert slices and load them into
            # appended expert slots mlp.experts.{n_routed_experts + j}.*
            # accordingly.
            num_chunks = 1
            if is_fuse_shared_experts_layer:
                num_chunks = getattr(self.config, "n_shared_experts", 1) or 1
                # Determine split axis based on op type
                # gate/up: ColumnParallel → split along dim 0
                # down: RowParallel → split along dim 1
                split_dim = 1 if "down_proj.weight" in name else 0
                total = loaded_weight.shape[split_dim]
                assert total % num_chunks == 0, (
                    f"Shared expert weight dim {total} "
                    f"not divisible by num_chunks {num_chunks}"
                )
                chunk_size = total // num_chunks

            for j in range(num_chunks):
                chunk_name = name
                weight_to_load = loaded_weight

                if is_fuse_shared_experts_layer:
                    if split_dim == 0:
                        weight_to_load = loaded_weight[
                            j * chunk_size : (j + 1) * chunk_size, :
                        ]
                    else:
                        weight_to_load = loaded_weight[
                            :, j * chunk_size : (j + 1) * chunk_size
                        ]
                    # Synthesize an expert-style name so expert mapping
                    # can route it
                    chunk_name = name.replace(
                        "mlp.shared_experts",
                        f"mlp.experts.{self.config.n_routed_experts + j}",
                    )

                # Use expert_params_mapping to locate the destination
                # param and delegate to its expert-aware weight_loader
                # with expert_id.
                for mapping in expert_params_mapping:
                    param_name, weight_name, expert_id, shard_id = mapping
                    if weight_name not in chunk_name:
                        continue

                    # Anyway, this is an expert weight and should not be
                    # attempted to load as other weights later
                    is_expert_weight = True

                    # Do not modify `name` since the loop may continue here
                    # Instead, create a new variable
                    name_mapped = chunk_name.replace(weight_name, param_name)

                    if is_pp_missing_parameter(name_mapped, self):
                        continue

                    param = params_dict[name_mapped]
                    # We should ask the weight loader to return success or
                    # not here since otherwise we may skip experts with
                    # other available replicas.
                    weight_loader = typing.cast(
                        Callable[..., bool], param.weight_loader
                    )
                    success = weight_loader(
                        param,
                        weight_to_load,
                        name_mapped,
                        shard_id=shard_id,
                        expert_id=expert_id,
                        return_success=True,
                    )
                    if success:
                        if not is_fuse_shared_experts_layer:
                            name = name_mapped
                        else:
                            loaded_params.add(name_mapped)
                        break
                else:
                    if is_expert_weight:
                        # We've checked that this is an expert weight
                        # However it's not mapped locally to this rank
                        # So we simply skip it
                        continue

                    # Skip loading extra bias for GPTQ models.
                    if name.endswith(".bias") and name not in params_dict:
                        continue

                    # Remapping the name of FP8 kv-scale.
                    name = maybe_remap_kv_scale_name(name, params_dict)
                    if name is None:
                        continue

                    if is_pp_missing_parameter(name, self):
                        continue

                    param = params_dict[name]
                    weight_loader = getattr(
                        param, "weight_loader", default_weight_loader
                    )
                    weight_loader(param, loaded_weight)
        if not is_fuse_shared_experts_layer:
            loaded_params.add(name)

    return loaded_params

set_moe_parameters

set_moe_parameters()
Source code in vllm/model_executor/models/deepseek_v2.py
def set_moe_parameters(self):
    self.expert_weights = []

    self.num_expert_groups = getattr(self.config, "n_group", 1)

    self.moe_layers = []
    self.moe_mlp_layers = []
    example_moe = None
    for layer in self.model.layers:
        if isinstance(layer, PPMissingLayer):
            continue

        assert isinstance(layer, DeepseekV2DecoderLayer)
        if isinstance(layer.mlp, DeepseekV2MoE):
            # Pick last one layer since the first ones may be dense layers.
            example_moe = layer.mlp
            self.moe_mlp_layers.append(layer.mlp)
            self.moe_layers.append(layer.mlp.experts)

    self.extract_moe_parameters(example_moe)

DeepseekV2MLAAttention

Bases: Module

Main reference: DeepseekV2 paper, and FlashInfer Implementation (https://arxiv.org/abs/2405.04434 and https://github.com/flashinfer-ai/flashinfer/pull/551).

For more info see MLACommonImpl in:
vllm/v1/attention/backends/mla/utils.py
Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2MLAAttention(nn.Module):
    """
    Main reference: DeepseekV2 paper, and FlashInfer Implementation
    (https://arxiv.org/abs/2405.04434 and https://github.com/flashinfer-ai/flashinfer/pull/551).

        For more info see MLACommonImpl in:
        vllm/v1/attention/backends/mla/utils.py
    """

    def __init__(
        self,
        vllm_config: VllmConfig,
        config: DeepseekV2Config | DeepseekV3Config,
        hidden_size: int,
        num_heads: int,
        qk_nope_head_dim: int,
        qk_rope_head_dim: int,
        v_head_dim: int,
        q_lora_rank: int | None,
        kv_lora_rank: int,
        rope_theta: float = 10000,
        rope_parameters: dict[str, Any] | None = None,
        max_position_embeddings: int = 8192,
        cache_config: CacheConfig | None = None,
        quant_config: QuantizationConfig | None = None,
        prefix: str = "",
        topk_indices_buffer: torch.Tensor | None = None,
    ) -> None:
        super().__init__()
        self.hidden_size = hidden_size
        self.qk_nope_head_dim = qk_nope_head_dim
        self.qk_rope_head_dim = qk_rope_head_dim
        self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim
        self.v_head_dim = v_head_dim

        self.q_lora_rank = q_lora_rank
        self.kv_lora_rank = kv_lora_rank

        self.num_heads = num_heads
        tp_size = get_tensor_model_parallel_world_size()
        assert num_heads % tp_size == 0
        self.num_local_heads = num_heads // tp_size

        self.scaling = self.qk_head_dim**-0.5
        self.rope_theta = rope_theta
        self.max_position_embeddings = max_position_embeddings

        if self.q_lora_rank is not None:
            self.fused_qkv_a_proj = MergedColumnParallelLinear(
                self.hidden_size,
                [self.q_lora_rank, self.kv_lora_rank + self.qk_rope_head_dim],
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.fused_qkv_a_proj",
                disable_tp=True,
            )
        else:
            self.kv_a_proj_with_mqa = ReplicatedLinear(
                self.hidden_size,
                self.kv_lora_rank + self.qk_rope_head_dim,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.kv_a_proj_with_mqa",
            )

        if self.q_lora_rank is not None:
            self.q_a_layernorm = RMSNorm(self.q_lora_rank, eps=config.rms_norm_eps)
            self.q_b_proj = ColumnParallelLinear(
                self.q_lora_rank,
                self.num_heads * self.qk_head_dim,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.q_b_proj",
            )
        else:
            self.q_proj = ColumnParallelLinear(
                self.hidden_size,
                self.num_heads * self.qk_head_dim,
                bias=False,
                quant_config=quant_config,
                prefix=f"{prefix}.q_proj",
            )
        self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, eps=config.rms_norm_eps)
        self.kv_b_proj = ColumnParallelLinear(
            self.kv_lora_rank,
            self.num_heads * (self.qk_nope_head_dim + self.v_head_dim),
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.kv_b_proj",
        )
        self.o_proj = RowParallelLinear(
            self.num_heads * self.v_head_dim,
            self.hidden_size,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.o_proj",
        )

        if rope_parameters:
            rope_parameters["rope_type"] = "deepseek_yarn"
        self.rotary_emb = get_rope(
            qk_rope_head_dim,
            rotary_dim=qk_rope_head_dim,
            max_position=max_position_embeddings,
            base=rope_theta,
            rope_parameters=rope_parameters,
            is_neox_style=False,
        )
        if rope_parameters:
            mscale_all_dim = rope_parameters.get("mscale_all_dim", False)
            scaling_factor = rope_parameters["factor"]
            mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim))
            self.scaling = self.scaling * mscale * mscale

        self.is_v32 = hasattr(config, "index_topk")

        if self.is_v32:
            self.indexer = Indexer(
                vllm_config,
                config,
                hidden_size,
                q_lora_rank,
                quant_config,
                cache_config,
                topk_indices_buffer,
                f"{prefix}.indexer",
            )
        else:
            self.indexer = None

        mla_modules = MLAModules(
            kv_a_layernorm=self.kv_a_layernorm,
            kv_b_proj=self.kv_b_proj,
            rotary_emb=self.rotary_emb,
            o_proj=self.o_proj,
            fused_qkv_a_proj=self.fused_qkv_a_proj
            if self.q_lora_rank is not None
            else None,
            kv_a_proj_with_mqa=self.kv_a_proj_with_mqa
            if self.q_lora_rank is None
            else None,
            q_a_layernorm=self.q_a_layernorm if self.q_lora_rank is not None else None,
            q_b_proj=self.q_b_proj if self.q_lora_rank is not None else None,
            q_proj=self.q_proj if self.q_lora_rank is None else None,
            indexer=self.indexer,
            is_sparse=self.is_v32,
            topk_indices_buffer=topk_indices_buffer,
        )

        self.mla_attn = MultiHeadLatentAttentionWrapper(
            self.hidden_size,
            self.num_local_heads,
            self.scaling,
            self.qk_nope_head_dim,
            self.qk_rope_head_dim,
            self.v_head_dim,
            self.q_lora_rank,
            self.kv_lora_rank,
            mla_modules,
            cache_config,
            quant_config,
            prefix,
        )

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        return self.mla_attn(positions, hidden_states)

fused_qkv_a_proj instance-attribute

fused_qkv_a_proj = MergedColumnParallelLinear(
    hidden_size,
    [q_lora_rank, kv_lora_rank + qk_rope_head_dim],
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.fused_qkv_a_proj",
    disable_tp=True,
)

hidden_size instance-attribute

hidden_size = hidden_size

indexer instance-attribute

indexer = Indexer(
    vllm_config,
    config,
    hidden_size,
    q_lora_rank,
    quant_config,
    cache_config,
    topk_indices_buffer,
    f"{prefix}.indexer",
)

is_v32 instance-attribute

is_v32 = hasattr(config, 'index_topk')

kv_a_layernorm instance-attribute

kv_a_layernorm = RMSNorm(kv_lora_rank, eps=rms_norm_eps)

kv_a_proj_with_mqa instance-attribute

kv_a_proj_with_mqa = ReplicatedLinear(
    hidden_size,
    kv_lora_rank + qk_rope_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.kv_a_proj_with_mqa",
)

kv_b_proj instance-attribute

kv_b_proj = ColumnParallelLinear(
    kv_lora_rank,
    num_heads * (qk_nope_head_dim + v_head_dim),
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.kv_b_proj",
)

kv_lora_rank instance-attribute

kv_lora_rank = kv_lora_rank

max_position_embeddings instance-attribute

max_position_embeddings = max_position_embeddings

mla_attn instance-attribute

mla_attn = MultiHeadLatentAttentionWrapper(
    hidden_size,
    num_local_heads,
    scaling,
    qk_nope_head_dim,
    qk_rope_head_dim,
    v_head_dim,
    q_lora_rank,
    kv_lora_rank,
    mla_modules,
    cache_config,
    quant_config,
    prefix,
)

num_heads instance-attribute

num_heads = num_heads

num_local_heads instance-attribute

num_local_heads = num_heads // tp_size

o_proj instance-attribute

o_proj = RowParallelLinear(
    num_heads * v_head_dim,
    hidden_size,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.o_proj",
)

q_a_layernorm instance-attribute

q_a_layernorm = RMSNorm(q_lora_rank, eps=rms_norm_eps)

q_b_proj instance-attribute

q_b_proj = ColumnParallelLinear(
    q_lora_rank,
    num_heads * qk_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.q_b_proj",
)

q_lora_rank instance-attribute

q_lora_rank = q_lora_rank

q_proj instance-attribute

q_proj = ColumnParallelLinear(
    hidden_size,
    num_heads * qk_head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.q_proj",
)

qk_head_dim instance-attribute

qk_head_dim = qk_nope_head_dim + qk_rope_head_dim

qk_nope_head_dim instance-attribute

qk_nope_head_dim = qk_nope_head_dim

qk_rope_head_dim instance-attribute

qk_rope_head_dim = qk_rope_head_dim

rope_theta instance-attribute

rope_theta = rope_theta

rotary_emb instance-attribute

rotary_emb = get_rope(
    qk_rope_head_dim,
    rotary_dim=qk_rope_head_dim,
    max_position=max_position_embeddings,
    base=rope_theta,
    rope_parameters=rope_parameters,
    is_neox_style=False,
)

scaling instance-attribute

scaling = qk_head_dim ** -0.5

v_head_dim instance-attribute

v_head_dim = v_head_dim

__init__

__init__(
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    qk_nope_head_dim: int,
    qk_rope_head_dim: int,
    v_head_dim: int,
    q_lora_rank: int | None,
    kv_lora_rank: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
    topk_indices_buffer: Tensor | None = None,
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    num_heads: int,
    qk_nope_head_dim: int,
    qk_rope_head_dim: int,
    v_head_dim: int,
    q_lora_rank: int | None,
    kv_lora_rank: int,
    rope_theta: float = 10000,
    rope_parameters: dict[str, Any] | None = None,
    max_position_embeddings: int = 8192,
    cache_config: CacheConfig | None = None,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
    topk_indices_buffer: torch.Tensor | None = None,
) -> None:
    super().__init__()
    self.hidden_size = hidden_size
    self.qk_nope_head_dim = qk_nope_head_dim
    self.qk_rope_head_dim = qk_rope_head_dim
    self.qk_head_dim = qk_nope_head_dim + qk_rope_head_dim
    self.v_head_dim = v_head_dim

    self.q_lora_rank = q_lora_rank
    self.kv_lora_rank = kv_lora_rank

    self.num_heads = num_heads
    tp_size = get_tensor_model_parallel_world_size()
    assert num_heads % tp_size == 0
    self.num_local_heads = num_heads // tp_size

    self.scaling = self.qk_head_dim**-0.5
    self.rope_theta = rope_theta
    self.max_position_embeddings = max_position_embeddings

    if self.q_lora_rank is not None:
        self.fused_qkv_a_proj = MergedColumnParallelLinear(
            self.hidden_size,
            [self.q_lora_rank, self.kv_lora_rank + self.qk_rope_head_dim],
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.fused_qkv_a_proj",
            disable_tp=True,
        )
    else:
        self.kv_a_proj_with_mqa = ReplicatedLinear(
            self.hidden_size,
            self.kv_lora_rank + self.qk_rope_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.kv_a_proj_with_mqa",
        )

    if self.q_lora_rank is not None:
        self.q_a_layernorm = RMSNorm(self.q_lora_rank, eps=config.rms_norm_eps)
        self.q_b_proj = ColumnParallelLinear(
            self.q_lora_rank,
            self.num_heads * self.qk_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.q_b_proj",
        )
    else:
        self.q_proj = ColumnParallelLinear(
            self.hidden_size,
            self.num_heads * self.qk_head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.q_proj",
        )
    self.kv_a_layernorm = RMSNorm(self.kv_lora_rank, eps=config.rms_norm_eps)
    self.kv_b_proj = ColumnParallelLinear(
        self.kv_lora_rank,
        self.num_heads * (self.qk_nope_head_dim + self.v_head_dim),
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.kv_b_proj",
    )
    self.o_proj = RowParallelLinear(
        self.num_heads * self.v_head_dim,
        self.hidden_size,
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.o_proj",
    )

    if rope_parameters:
        rope_parameters["rope_type"] = "deepseek_yarn"
    self.rotary_emb = get_rope(
        qk_rope_head_dim,
        rotary_dim=qk_rope_head_dim,
        max_position=max_position_embeddings,
        base=rope_theta,
        rope_parameters=rope_parameters,
        is_neox_style=False,
    )
    if rope_parameters:
        mscale_all_dim = rope_parameters.get("mscale_all_dim", False)
        scaling_factor = rope_parameters["factor"]
        mscale = yarn_get_mscale(scaling_factor, float(mscale_all_dim))
        self.scaling = self.scaling * mscale * mscale

    self.is_v32 = hasattr(config, "index_topk")

    if self.is_v32:
        self.indexer = Indexer(
            vllm_config,
            config,
            hidden_size,
            q_lora_rank,
            quant_config,
            cache_config,
            topk_indices_buffer,
            f"{prefix}.indexer",
        )
    else:
        self.indexer = None

    mla_modules = MLAModules(
        kv_a_layernorm=self.kv_a_layernorm,
        kv_b_proj=self.kv_b_proj,
        rotary_emb=self.rotary_emb,
        o_proj=self.o_proj,
        fused_qkv_a_proj=self.fused_qkv_a_proj
        if self.q_lora_rank is not None
        else None,
        kv_a_proj_with_mqa=self.kv_a_proj_with_mqa
        if self.q_lora_rank is None
        else None,
        q_a_layernorm=self.q_a_layernorm if self.q_lora_rank is not None else None,
        q_b_proj=self.q_b_proj if self.q_lora_rank is not None else None,
        q_proj=self.q_proj if self.q_lora_rank is None else None,
        indexer=self.indexer,
        is_sparse=self.is_v32,
        topk_indices_buffer=topk_indices_buffer,
    )

    self.mla_attn = MultiHeadLatentAttentionWrapper(
        self.hidden_size,
        self.num_local_heads,
        self.scaling,
        self.qk_nope_head_dim,
        self.qk_rope_head_dim,
        self.v_head_dim,
        self.q_lora_rank,
        self.kv_lora_rank,
        mla_modules,
        cache_config,
        quant_config,
        prefix,
    )

forward

forward(positions: Tensor, hidden_states: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
) -> torch.Tensor:
    return self.mla_attn(positions, hidden_states)

DeepseekV2MLP

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2MLP(nn.Module):
    def __init__(
        self,
        hidden_size: int,
        intermediate_size: int,
        hidden_act: str,
        quant_config: QuantizationConfig | None = None,
        reduce_results: bool = True,
        is_sequence_parallel=False,
        prefix: str = "",
    ) -> None:
        super().__init__()

        # If is_sequence_parallel, the input and output tensors are sharded
        # across the ranks within the tp_group. In this case the weights are
        # replicated and no collective ops are needed.
        # Otherwise we use standard TP with an allreduce at the end.
        self.gate_up_proj = MergedColumnParallelLinear(
            hidden_size,
            [intermediate_size] * 2,
            bias=False,
            quant_config=quant_config,
            disable_tp=is_sequence_parallel,
            prefix=f"{prefix}.gate_up_proj",
        )
        self.down_proj = RowParallelLinear(
            intermediate_size,
            hidden_size,
            bias=False,
            quant_config=quant_config,
            reduce_results=reduce_results,
            disable_tp=is_sequence_parallel,
            prefix=f"{prefix}.down_proj",
        )
        if hidden_act != "silu":
            raise ValueError(
                f"Unsupported activation: {hidden_act}. Only silu is supported for now."
            )
        self.act_fn = SiluAndMul()

    def forward(self, x):
        gate_up, _ = self.gate_up_proj(x)
        x = self.act_fn(gate_up)
        x, _ = self.down_proj(x)
        return x

act_fn instance-attribute

act_fn = SiluAndMul()

down_proj instance-attribute

down_proj = RowParallelLinear(
    intermediate_size,
    hidden_size,
    bias=False,
    quant_config=quant_config,
    reduce_results=reduce_results,
    disable_tp=is_sequence_parallel,
    prefix=f"{prefix}.down_proj",
)

gate_up_proj instance-attribute

gate_up_proj = MergedColumnParallelLinear(
    hidden_size,
    [intermediate_size] * 2,
    bias=False,
    quant_config=quant_config,
    disable_tp=is_sequence_parallel,
    prefix=f"{prefix}.gate_up_proj",
)

__init__

__init__(
    hidden_size: int,
    intermediate_size: int,
    hidden_act: str,
    quant_config: QuantizationConfig | None = None,
    reduce_results: bool = True,
    is_sequence_parallel=False,
    prefix: str = "",
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    hidden_size: int,
    intermediate_size: int,
    hidden_act: str,
    quant_config: QuantizationConfig | None = None,
    reduce_results: bool = True,
    is_sequence_parallel=False,
    prefix: str = "",
) -> None:
    super().__init__()

    # If is_sequence_parallel, the input and output tensors are sharded
    # across the ranks within the tp_group. In this case the weights are
    # replicated and no collective ops are needed.
    # Otherwise we use standard TP with an allreduce at the end.
    self.gate_up_proj = MergedColumnParallelLinear(
        hidden_size,
        [intermediate_size] * 2,
        bias=False,
        quant_config=quant_config,
        disable_tp=is_sequence_parallel,
        prefix=f"{prefix}.gate_up_proj",
    )
    self.down_proj = RowParallelLinear(
        intermediate_size,
        hidden_size,
        bias=False,
        quant_config=quant_config,
        reduce_results=reduce_results,
        disable_tp=is_sequence_parallel,
        prefix=f"{prefix}.down_proj",
    )
    if hidden_act != "silu":
        raise ValueError(
            f"Unsupported activation: {hidden_act}. Only silu is supported for now."
        )
    self.act_fn = SiluAndMul()

forward

forward(x)
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(self, x):
    gate_up, _ = self.gate_up_proj(x)
    x = self.act_fn(gate_up)
    x, _ = self.down_proj(x)
    return x

DeepseekV2MixtureOfExperts

Bases: MixtureOfExperts

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2MixtureOfExperts(MixtureOfExperts):
    moe_mlp_layers: list[DeepseekV2MoE]
    """
    List of MoE MLP layers in the model.
    """

    def extract_moe_parameters(self, example_moe: DeepseekV2MoE | None):
        if example_moe is None:
            self.num_moe_layers = 0
            self.num_expert_groups = 0
            self.num_logical_experts = 0
            self.num_physical_experts = 0
            self.num_local_physical_experts = 0
            self.num_routed_experts = 0
            self.num_shared_experts = 0
            self.num_redundant_experts = 0
            logger.warning("DeepSeekV2: No DeepseekV2MoE layer found in model.layers.")
        else:
            self.num_logical_experts = example_moe.n_logical_experts
            self.num_physical_experts = example_moe.n_physical_experts
            self.num_local_physical_experts = example_moe.n_local_physical_experts
            self.num_routed_experts = example_moe.n_routed_experts
            self.num_shared_experts = example_moe.n_shared_experts
            self.num_redundant_experts = example_moe.n_redundant_experts

    def update_physical_experts_metadata(
        self,
        num_physical_experts: int,
        num_local_physical_experts: int,
    ) -> None:
        assert self.num_local_physical_experts == num_local_physical_experts
        self.num_physical_experts = num_physical_experts
        self.num_local_physical_experts = num_local_physical_experts
        self.num_redundant_experts = num_physical_experts - self.num_logical_experts
        for moe in self.moe_mlp_layers:
            moe.n_local_physical_experts = num_local_physical_experts
            moe.n_physical_experts = num_physical_experts
            moe.n_redundant_experts = self.num_redundant_experts
            moe.experts.update_expert_map()

moe_mlp_layers instance-attribute

moe_mlp_layers: list[DeepseekV2MoE]

List of MoE MLP layers in the model.

extract_moe_parameters

extract_moe_parameters(example_moe: DeepseekV2MoE | None)
Source code in vllm/model_executor/models/deepseek_v2.py
def extract_moe_parameters(self, example_moe: DeepseekV2MoE | None):
    if example_moe is None:
        self.num_moe_layers = 0
        self.num_expert_groups = 0
        self.num_logical_experts = 0
        self.num_physical_experts = 0
        self.num_local_physical_experts = 0
        self.num_routed_experts = 0
        self.num_shared_experts = 0
        self.num_redundant_experts = 0
        logger.warning("DeepSeekV2: No DeepseekV2MoE layer found in model.layers.")
    else:
        self.num_logical_experts = example_moe.n_logical_experts
        self.num_physical_experts = example_moe.n_physical_experts
        self.num_local_physical_experts = example_moe.n_local_physical_experts
        self.num_routed_experts = example_moe.n_routed_experts
        self.num_shared_experts = example_moe.n_shared_experts
        self.num_redundant_experts = example_moe.n_redundant_experts

update_physical_experts_metadata

update_physical_experts_metadata(
    num_physical_experts: int,
    num_local_physical_experts: int,
) -> None
Source code in vllm/model_executor/models/deepseek_v2.py
def update_physical_experts_metadata(
    self,
    num_physical_experts: int,
    num_local_physical_experts: int,
) -> None:
    assert self.num_local_physical_experts == num_local_physical_experts
    self.num_physical_experts = num_physical_experts
    self.num_local_physical_experts = num_local_physical_experts
    self.num_redundant_experts = num_physical_experts - self.num_logical_experts
    for moe in self.moe_mlp_layers:
        moe.n_local_physical_experts = num_local_physical_experts
        moe.n_physical_experts = num_physical_experts
        moe.n_redundant_experts = self.num_redundant_experts
        moe.experts.update_expert_map()

DeepseekV2MoE

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV2MoE(nn.Module):
    def __init__(
        self,
        config: DeepseekV2Config | DeepseekV3Config,
        parallel_config: ParallelConfig,
        quant_config: QuantizationConfig | None = None,
        prefix: str = "",
    ):
        super().__init__()
        self.tp_size = get_tensor_model_parallel_world_size()
        self.tp_rank = get_tensor_model_parallel_rank()

        self.routed_scaling_factor = getattr(config, "routed_scaling_factor", 1.0)

        self.ep_group = get_ep_group().device_group
        self.ep_rank = get_ep_group().rank_in_group
        self.ep_size = self.ep_group.size()
        self.n_routed_experts: int = config.n_routed_experts
        self.n_shared_experts: int = config.n_shared_experts

        self.is_sequence_parallel = parallel_config.use_sequence_parallel_moe

        if config.hidden_act != "silu":
            raise ValueError(
                f"Unsupported activation: {config.hidden_act}. "
                "Only silu is supported for now."
            )

        self.gate = ReplicatedLinear(
            config.hidden_size,
            config.n_routed_experts,
            bias=False,
            quant_config=None,
            prefix=f"{prefix}.gate",
        )
        if getattr(config, "topk_method", None) == "noaux_tc":
            self.gate.e_score_correction_bias = nn.Parameter(
                torch.empty(config.n_routed_experts, dtype=torch.float32)
            )
        else:
            self.gate.e_score_correction_bias = None

        # Load balancing settings.
        eplb_config = parallel_config.eplb_config
        self.enable_eplb = parallel_config.enable_eplb

        self.n_redundant_experts = eplb_config.num_redundant_experts
        self.n_logical_experts = self.n_routed_experts
        self.n_physical_experts = self.n_logical_experts + self.n_redundant_experts
        self.n_local_physical_experts = self.n_physical_experts // self.ep_size

        self.physical_expert_start = self.ep_rank * self.n_local_physical_experts
        self.physical_expert_end = (
            self.physical_expert_start + self.n_local_physical_experts
        )

        self.is_rocm_aiter_moe_enabled = rocm_aiter_ops.is_fused_moe_enabled()
        if config.n_shared_experts is None or self.is_rocm_aiter_moe_enabled:
            self.shared_experts = None
        else:
            intermediate_size = config.moe_intermediate_size * config.n_shared_experts

            self.shared_experts = DeepseekV2MLP(
                hidden_size=config.hidden_size,
                intermediate_size=intermediate_size,
                hidden_act=config.hidden_act,
                quant_config=quant_config,
                is_sequence_parallel=self.is_sequence_parallel,
                reduce_results=False,
                prefix=f"{prefix}.shared_experts",
            )

        self.experts = SharedFusedMoE(
            shared_experts=self.shared_experts,
            gate=self.gate,
            num_experts=config.n_routed_experts,
            top_k=config.num_experts_per_tok,
            hidden_size=config.hidden_size,
            intermediate_size=config.moe_intermediate_size,
            reduce_results=False,
            renormalize=config.norm_topk_prob,
            quant_config=quant_config,
            use_grouped_topk=True,
            num_expert_group=getattr(config, "n_group", 1),
            topk_group=getattr(config, "topk_group", 1),
            prefix=f"{prefix}.experts",
            scoring_func=getattr(config, "scoring_func", "softmax"),
            # we do scaling outside, set factor to 1.0 to avoid double mul
            # aiter applies routed_scaling_factor internally
            routed_scaling_factor=1.0
            if not self.is_rocm_aiter_moe_enabled
            else self.routed_scaling_factor,
            e_score_correction_bias=self.gate.e_score_correction_bias,
            enable_eplb=self.enable_eplb,
            num_redundant_experts=self.n_redundant_experts,
            is_sequence_parallel=self.is_sequence_parallel,
            n_shared_experts=config.n_shared_experts
            if rocm_aiter_ops.is_fusion_moe_shared_experts_enabled()
            else None,
        )

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        num_tokens, hidden_dim = hidden_states.shape
        hidden_states = hidden_states.view(-1, hidden_dim)

        # Chunk the hidden states so they aren't replicated across TP ranks.
        # This avoids duplicate computation in self.experts.
        # TODO: We can replace the all_reduce at the end of attn with a
        # reduce_scatter instead of chunking here.
        if self.is_sequence_parallel:
            hidden_states = sequence_parallel_chunk(hidden_states)

        if self.experts.is_internal_router:
            # In this case, the gate/router runs inside the FusedMoE class
            fused_moe_out = self.experts(
                hidden_states=hidden_states, router_logits=hidden_states
            )
        else:
            # router_logits: (num_tokens, n_experts)
            router_logits, _ = self.gate(hidden_states)
            fused_moe_out = self.experts(
                hidden_states=hidden_states, router_logits=router_logits
            )

        shared_output, final_hidden_states = fused_moe_out
        if self.shared_experts is None:
            assert shared_output is None

        # Fix FP16 overflow
        # See DeepseekV2DecoderLayer for more details.
        if hidden_states.dtype != torch.float16:
            if not self.is_rocm_aiter_moe_enabled:
                final_hidden_states *= self.routed_scaling_factor
        elif self.shared_experts is not None:
            assert shared_output is not None
            shared_output *= 1.0 / self.routed_scaling_factor

        if self.shared_experts is not None:
            assert shared_output is not None
            final_hidden_states += shared_output

        if self.is_sequence_parallel:
            final_hidden_states = tensor_model_parallel_all_gather(
                final_hidden_states, 0
            )
            final_hidden_states = final_hidden_states[:num_tokens]
        elif self.tp_size > 1:
            final_hidden_states = self.experts.maybe_all_reduce_tensor_model_parallel(
                final_hidden_states
            )

        return final_hidden_states.view(num_tokens, hidden_dim)

enable_eplb instance-attribute

enable_eplb = enable_eplb

ep_group instance-attribute

ep_group = device_group

ep_rank instance-attribute

ep_rank = rank_in_group

ep_size instance-attribute

ep_size = size()

experts instance-attribute

experts = SharedFusedMoE(
    shared_experts=shared_experts,
    gate=gate,
    num_experts=n_routed_experts,
    top_k=num_experts_per_tok,
    hidden_size=hidden_size,
    intermediate_size=moe_intermediate_size,
    reduce_results=False,
    renormalize=norm_topk_prob,
    quant_config=quant_config,
    use_grouped_topk=True,
    num_expert_group=getattr(config, "n_group", 1),
    topk_group=getattr(config, "topk_group", 1),
    prefix=f"{prefix}.experts",
    scoring_func=getattr(config, "scoring_func", "softmax"),
    routed_scaling_factor=1.0
    if not is_rocm_aiter_moe_enabled
    else routed_scaling_factor,
    e_score_correction_bias=e_score_correction_bias,
    enable_eplb=enable_eplb,
    num_redundant_experts=n_redundant_experts,
    is_sequence_parallel=is_sequence_parallel,
    n_shared_experts=n_shared_experts
    if is_fusion_moe_shared_experts_enabled()
    else None,
)

gate instance-attribute

gate = ReplicatedLinear(
    hidden_size,
    n_routed_experts,
    bias=False,
    quant_config=None,
    prefix=f"{prefix}.gate",
)

is_rocm_aiter_moe_enabled instance-attribute

is_rocm_aiter_moe_enabled = is_fused_moe_enabled()

is_sequence_parallel instance-attribute

is_sequence_parallel = use_sequence_parallel_moe

n_local_physical_experts instance-attribute

n_local_physical_experts = n_physical_experts // ep_size

n_logical_experts instance-attribute

n_logical_experts = n_routed_experts

n_physical_experts instance-attribute

n_physical_experts = n_logical_experts + n_redundant_experts

n_redundant_experts instance-attribute

n_redundant_experts = num_redundant_experts

n_routed_experts instance-attribute

n_routed_experts: int = n_routed_experts

n_shared_experts instance-attribute

n_shared_experts: int = n_shared_experts

physical_expert_end instance-attribute

physical_expert_end = (
    physical_expert_start + n_local_physical_experts
)

physical_expert_start instance-attribute

physical_expert_start = ep_rank * n_local_physical_experts

routed_scaling_factor instance-attribute

routed_scaling_factor = getattr(
    config, "routed_scaling_factor", 1.0
)

shared_experts instance-attribute

shared_experts = None

tp_rank instance-attribute

tp_size instance-attribute

__init__

__init__(
    config: DeepseekV2Config | DeepseekV3Config,
    parallel_config: ParallelConfig,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
)
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    config: DeepseekV2Config | DeepseekV3Config,
    parallel_config: ParallelConfig,
    quant_config: QuantizationConfig | None = None,
    prefix: str = "",
):
    super().__init__()
    self.tp_size = get_tensor_model_parallel_world_size()
    self.tp_rank = get_tensor_model_parallel_rank()

    self.routed_scaling_factor = getattr(config, "routed_scaling_factor", 1.0)

    self.ep_group = get_ep_group().device_group
    self.ep_rank = get_ep_group().rank_in_group
    self.ep_size = self.ep_group.size()
    self.n_routed_experts: int = config.n_routed_experts
    self.n_shared_experts: int = config.n_shared_experts

    self.is_sequence_parallel = parallel_config.use_sequence_parallel_moe

    if config.hidden_act != "silu":
        raise ValueError(
            f"Unsupported activation: {config.hidden_act}. "
            "Only silu is supported for now."
        )

    self.gate = ReplicatedLinear(
        config.hidden_size,
        config.n_routed_experts,
        bias=False,
        quant_config=None,
        prefix=f"{prefix}.gate",
    )
    if getattr(config, "topk_method", None) == "noaux_tc":
        self.gate.e_score_correction_bias = nn.Parameter(
            torch.empty(config.n_routed_experts, dtype=torch.float32)
        )
    else:
        self.gate.e_score_correction_bias = None

    # Load balancing settings.
    eplb_config = parallel_config.eplb_config
    self.enable_eplb = parallel_config.enable_eplb

    self.n_redundant_experts = eplb_config.num_redundant_experts
    self.n_logical_experts = self.n_routed_experts
    self.n_physical_experts = self.n_logical_experts + self.n_redundant_experts
    self.n_local_physical_experts = self.n_physical_experts // self.ep_size

    self.physical_expert_start = self.ep_rank * self.n_local_physical_experts
    self.physical_expert_end = (
        self.physical_expert_start + self.n_local_physical_experts
    )

    self.is_rocm_aiter_moe_enabled = rocm_aiter_ops.is_fused_moe_enabled()
    if config.n_shared_experts is None or self.is_rocm_aiter_moe_enabled:
        self.shared_experts = None
    else:
        intermediate_size = config.moe_intermediate_size * config.n_shared_experts

        self.shared_experts = DeepseekV2MLP(
            hidden_size=config.hidden_size,
            intermediate_size=intermediate_size,
            hidden_act=config.hidden_act,
            quant_config=quant_config,
            is_sequence_parallel=self.is_sequence_parallel,
            reduce_results=False,
            prefix=f"{prefix}.shared_experts",
        )

    self.experts = SharedFusedMoE(
        shared_experts=self.shared_experts,
        gate=self.gate,
        num_experts=config.n_routed_experts,
        top_k=config.num_experts_per_tok,
        hidden_size=config.hidden_size,
        intermediate_size=config.moe_intermediate_size,
        reduce_results=False,
        renormalize=config.norm_topk_prob,
        quant_config=quant_config,
        use_grouped_topk=True,
        num_expert_group=getattr(config, "n_group", 1),
        topk_group=getattr(config, "topk_group", 1),
        prefix=f"{prefix}.experts",
        scoring_func=getattr(config, "scoring_func", "softmax"),
        # we do scaling outside, set factor to 1.0 to avoid double mul
        # aiter applies routed_scaling_factor internally
        routed_scaling_factor=1.0
        if not self.is_rocm_aiter_moe_enabled
        else self.routed_scaling_factor,
        e_score_correction_bias=self.gate.e_score_correction_bias,
        enable_eplb=self.enable_eplb,
        num_redundant_experts=self.n_redundant_experts,
        is_sequence_parallel=self.is_sequence_parallel,
        n_shared_experts=config.n_shared_experts
        if rocm_aiter_ops.is_fusion_moe_shared_experts_enabled()
        else None,
    )

forward

forward(hidden_states: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
    num_tokens, hidden_dim = hidden_states.shape
    hidden_states = hidden_states.view(-1, hidden_dim)

    # Chunk the hidden states so they aren't replicated across TP ranks.
    # This avoids duplicate computation in self.experts.
    # TODO: We can replace the all_reduce at the end of attn with a
    # reduce_scatter instead of chunking here.
    if self.is_sequence_parallel:
        hidden_states = sequence_parallel_chunk(hidden_states)

    if self.experts.is_internal_router:
        # In this case, the gate/router runs inside the FusedMoE class
        fused_moe_out = self.experts(
            hidden_states=hidden_states, router_logits=hidden_states
        )
    else:
        # router_logits: (num_tokens, n_experts)
        router_logits, _ = self.gate(hidden_states)
        fused_moe_out = self.experts(
            hidden_states=hidden_states, router_logits=router_logits
        )

    shared_output, final_hidden_states = fused_moe_out
    if self.shared_experts is None:
        assert shared_output is None

    # Fix FP16 overflow
    # See DeepseekV2DecoderLayer for more details.
    if hidden_states.dtype != torch.float16:
        if not self.is_rocm_aiter_moe_enabled:
            final_hidden_states *= self.routed_scaling_factor
    elif self.shared_experts is not None:
        assert shared_output is not None
        shared_output *= 1.0 / self.routed_scaling_factor

    if self.shared_experts is not None:
        assert shared_output is not None
        final_hidden_states += shared_output

    if self.is_sequence_parallel:
        final_hidden_states = tensor_model_parallel_all_gather(
            final_hidden_states, 0
        )
        final_hidden_states = final_hidden_states[:num_tokens]
    elif self.tp_size > 1:
        final_hidden_states = self.experts.maybe_all_reduce_tensor_model_parallel(
            final_hidden_states
        )

    return final_hidden_states.view(num_tokens, hidden_dim)

DeepseekV2Model

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
@support_torch_compile
class DeepseekV2Model(nn.Module):
    fall_back_to_pt_during_load = False

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()

        config = vllm_config.model_config.hf_config
        quant_config = vllm_config.quant_config
        self.config = config
        self.device = current_platform.device_type

        self.vocab_size = config.vocab_size
        self.is_v32 = hasattr(config, "index_topk")
        if self.is_v32:
            topk_tokens = config.index_topk
            topk_indices_buffer = torch.empty(
                vllm_config.scheduler_config.max_num_batched_tokens,
                topk_tokens,
                dtype=torch.int32,
                device=self.device,
            )
        else:
            topk_indices_buffer = None

        if get_pp_group().is_first_rank:
            self.embed_tokens = VocabParallelEmbedding(
                config.vocab_size,
                config.hidden_size,
                quant_config=quant_config,
                prefix=f"{prefix}.embed_tokens",
            )
        else:
            self.embed_tokens = PPMissingLayer()
        self.start_layer, self.end_layer, self.layers = make_layers(
            config.num_hidden_layers,
            lambda prefix: DeepseekV2DecoderLayer(
                vllm_config, prefix, topk_indices_buffer=topk_indices_buffer
            ),
            prefix=f"{prefix}.layers",
        )

        if get_pp_group().is_last_rank:
            self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        else:
            self.norm = PPMissingLayer()
        self.make_empty_intermediate_tensors = make_empty_intermediate_tensors_factory(
            ["hidden_states", "residual"], config.hidden_size
        )

    def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
        return self.embed_tokens(input_ids)

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None,
        inputs_embeds: torch.Tensor | None = None,
    ) -> torch.Tensor | IntermediateTensors:
        if get_pp_group().is_first_rank:
            if inputs_embeds is not None:
                hidden_states = inputs_embeds
            else:
                hidden_states = self.embed_input_ids(input_ids)
            residual = None
        else:
            assert intermediate_tensors is not None
            hidden_states = intermediate_tensors["hidden_states"]
            residual = intermediate_tensors["residual"]

        for layer in islice(self.layers, self.start_layer, self.end_layer):
            hidden_states, residual = layer(positions, hidden_states, residual)

        if not get_pp_group().is_last_rank:
            return IntermediateTensors(
                {"hidden_states": hidden_states, "residual": residual}
            )

        hidden_states, _ = self.norm(hidden_states, residual)
        return hidden_states

config instance-attribute

config = config

device instance-attribute

device = device_type

embed_tokens instance-attribute

embed_tokens = VocabParallelEmbedding(
    vocab_size,
    hidden_size,
    quant_config=quant_config,
    prefix=f"{prefix}.embed_tokens",
)

fall_back_to_pt_during_load class-attribute instance-attribute

fall_back_to_pt_during_load = False

is_v32 instance-attribute

is_v32 = hasattr(config, 'index_topk')

make_empty_intermediate_tensors instance-attribute

make_empty_intermediate_tensors = (
    make_empty_intermediate_tensors_factory(
        ["hidden_states", "residual"], hidden_size
    )
)

norm instance-attribute

norm = RMSNorm(hidden_size, eps=rms_norm_eps)

vocab_size instance-attribute

vocab_size = vocab_size

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()

    config = vllm_config.model_config.hf_config
    quant_config = vllm_config.quant_config
    self.config = config
    self.device = current_platform.device_type

    self.vocab_size = config.vocab_size
    self.is_v32 = hasattr(config, "index_topk")
    if self.is_v32:
        topk_tokens = config.index_topk
        topk_indices_buffer = torch.empty(
            vllm_config.scheduler_config.max_num_batched_tokens,
            topk_tokens,
            dtype=torch.int32,
            device=self.device,
        )
    else:
        topk_indices_buffer = None

    if get_pp_group().is_first_rank:
        self.embed_tokens = VocabParallelEmbedding(
            config.vocab_size,
            config.hidden_size,
            quant_config=quant_config,
            prefix=f"{prefix}.embed_tokens",
        )
    else:
        self.embed_tokens = PPMissingLayer()
    self.start_layer, self.end_layer, self.layers = make_layers(
        config.num_hidden_layers,
        lambda prefix: DeepseekV2DecoderLayer(
            vllm_config, prefix, topk_indices_buffer=topk_indices_buffer
        ),
        prefix=f"{prefix}.layers",
    )

    if get_pp_group().is_last_rank:
        self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
    else:
        self.norm = PPMissingLayer()
    self.make_empty_intermediate_tensors = make_empty_intermediate_tensors_factory(
        ["hidden_states", "residual"], config.hidden_size
    )

embed_input_ids

embed_input_ids(input_ids: Tensor) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
    return self.embed_tokens(input_ids)

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    intermediate_tensors: IntermediateTensors | None,
    inputs_embeds: Tensor | None = None,
) -> Tensor | IntermediateTensors
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    intermediate_tensors: IntermediateTensors | None,
    inputs_embeds: torch.Tensor | None = None,
) -> torch.Tensor | IntermediateTensors:
    if get_pp_group().is_first_rank:
        if inputs_embeds is not None:
            hidden_states = inputs_embeds
        else:
            hidden_states = self.embed_input_ids(input_ids)
        residual = None
    else:
        assert intermediate_tensors is not None
        hidden_states = intermediate_tensors["hidden_states"]
        residual = intermediate_tensors["residual"]

    for layer in islice(self.layers, self.start_layer, self.end_layer):
        hidden_states, residual = layer(positions, hidden_states, residual)

    if not get_pp_group().is_last_rank:
        return IntermediateTensors(
            {"hidden_states": hidden_states, "residual": residual}
        )

    hidden_states, _ = self.norm(hidden_states, residual)
    return hidden_states

DeepseekV32IndexerCache

Bases: Module, AttentionLayerBase

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV32IndexerCache(torch.nn.Module, AttentionLayerBase):
    def __init__(
        self, head_dim: int, dtype: torch.dtype, prefix: str, cache_config: CacheConfig
    ):
        super().__init__()
        self.kv_cache = [torch.tensor([])]
        self.head_dim = head_dim
        self.prefix = prefix
        self.cache_config = cache_config
        self.dtype = dtype
        compilation_config = get_current_vllm_config().compilation_config
        if prefix in compilation_config.static_forward_context:
            raise ValueError(f"Duplicate layer name: {prefix}")
        compilation_config.static_forward_context[prefix] = self

    def get_kv_cache_spec(self, vllm_config: VllmConfig) -> KVCacheSpec:
        return MLAAttentionSpec(  # Only has one vector instead of K + V
            block_size=self.cache_config.block_size,
            num_kv_heads=1,
            head_size=self.head_dim,
            dtype=self.dtype,
        )

    def forward(self): ...

    def get_attn_backend(self) -> AttentionBackend:
        return DeepseekV32IndexerBackend

cache_config instance-attribute

cache_config = cache_config

dtype instance-attribute

dtype = dtype

head_dim instance-attribute

head_dim = head_dim

kv_cache instance-attribute

kv_cache = [tensor([])]

prefix instance-attribute

prefix = prefix

__init__

__init__(
    head_dim: int,
    dtype: dtype,
    prefix: str,
    cache_config: CacheConfig,
)
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self, head_dim: int, dtype: torch.dtype, prefix: str, cache_config: CacheConfig
):
    super().__init__()
    self.kv_cache = [torch.tensor([])]
    self.head_dim = head_dim
    self.prefix = prefix
    self.cache_config = cache_config
    self.dtype = dtype
    compilation_config = get_current_vllm_config().compilation_config
    if prefix in compilation_config.static_forward_context:
        raise ValueError(f"Duplicate layer name: {prefix}")
    compilation_config.static_forward_context[prefix] = self

forward

forward()
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(self): ...

get_attn_backend

get_attn_backend() -> AttentionBackend
Source code in vllm/model_executor/models/deepseek_v2.py
def get_attn_backend(self) -> AttentionBackend:
    return DeepseekV32IndexerBackend

get_kv_cache_spec

get_kv_cache_spec(vllm_config: VllmConfig) -> KVCacheSpec
Source code in vllm/model_executor/models/deepseek_v2.py
def get_kv_cache_spec(self, vllm_config: VllmConfig) -> KVCacheSpec:
    return MLAAttentionSpec(  # Only has one vector instead of K + V
        block_size=self.cache_config.block_size,
        num_kv_heads=1,
        head_size=self.head_dim,
        dtype=self.dtype,
    )

DeepseekV3ForCausalLM

Bases: DeepseekV2ForCausalLM

Source code in vllm/model_executor/models/deepseek_v2.py
class DeepseekV3ForCausalLM(DeepseekV2ForCausalLM):
    pass

Indexer

Bases: Module

Source code in vllm/model_executor/models/deepseek_v2.py
class Indexer(nn.Module):
    def __init__(
        self,
        vllm_config: VllmConfig,
        config: DeepseekV2Config | DeepseekV3Config,
        hidden_size: int,
        q_lora_rank: int,
        quant_config: QuantizationConfig | None,
        cache_config: CacheConfig | None,
        topk_indices_buffer: torch.Tensor | None,
        prefix: str = "",
    ):
        super().__init__()
        self.vllm_config = vllm_config
        self.config = config
        # self.indexer_cfg = config.attn_module_list_cfg[0]["attn_index"]
        self.topk_tokens = config.index_topk
        self.n_head = config.index_n_heads  # 64
        self.head_dim = config.index_head_dim  # 128
        self.rope_dim = config.qk_rope_head_dim  # 64
        self.q_lora_rank = q_lora_rank  # 1536
        # no tensor parallel, just replicated
        self.wq_b = ReplicatedLinear(
            self.q_lora_rank,
            self.head_dim * self.n_head,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.wq_b",
        )
        self.wk = ReplicatedLinear(
            hidden_size,
            self.head_dim,
            bias=False,
            quant_config=quant_config,
            prefix=f"{prefix}.wk",
        )
        self.k_norm = LayerNorm(self.head_dim, eps=1e-6)
        self.weights_proj = ReplicatedLinear(
            hidden_size, self.n_head, quant_config=None, prefix=f"{prefix}.weights_proj"
        )
        self.softmax_scale = self.head_dim**-0.5

        self.scale_fmt = "ue8m0"
        self.quant_block_size = 128  # TODO: get from config
        self.topk_indices_buffer = topk_indices_buffer

        # NOTE: (zyongye) we use fp8 naive cache,
        #       where we store value in fp8 and scale in fp32
        #       per self.quant_block_size element
        self.k_cache = DeepseekV32IndexerCache(
            head_dim=self.head_dim + self.head_dim // self.quant_block_size * 4,
            dtype=torch.uint8,
            prefix=f"{prefix}.k_cache",
            cache_config=cache_config,
        )
        self.max_model_len = vllm_config.model_config.max_model_len
        self.prefix = prefix
        from vllm.v1.attention.backends.mla.indexer import get_max_prefill_buffer_size

        self.max_total_seq_len = get_max_prefill_buffer_size(vllm_config)

    def forward(
        self, hidden_states: torch.Tensor, qr: torch.Tensor, positions, rotary_emb
    ) -> torch.Tensor:
        q, _ = self.wq_b(qr)
        q = q.view(-1, self.n_head, self.head_dim)
        q_pe, q_nope = torch.split(
            q, [self.rope_dim, self.head_dim - self.rope_dim], dim=-1
        )

        k, _ = self.wk(hidden_states)
        k = self.k_norm(k)
        k_pe, k_nope = torch.split(
            k, [self.rope_dim, self.head_dim - self.rope_dim], dim=-1
        )

        q_pe, k_pe = rotary_emb(positions, q_pe, k_pe.unsqueeze(1))
        q = torch.cat([q_pe, q_nope], dim=-1)
        k = torch.cat([k_pe.squeeze(1), k_nope], dim=-1)

        # we only quant q here since k quant is fused with cache insertion
        q = q.view(-1, self.head_dim)
        q_fp8, q_scale = per_token_group_quant_fp8(
            q,
            self.quant_block_size,
            column_major_scales=False,
            use_ue8m0=self.scale_fmt is not None,
        )
        q_fp8 = q_fp8.view(-1, self.n_head, self.head_dim)
        q_scale = q_scale.view(-1, self.n_head, 1)

        weights, _ = self.weights_proj(hidden_states)
        weights = (
            weights.unsqueeze(-1) * q_scale * self.softmax_scale * self.n_head**-0.5
        )
        weights = weights.squeeze(-1)

        return torch.ops.vllm.sparse_attn_indexer(
            hidden_states,
            self.k_cache.prefix,
            self.k_cache.kv_cache[0],
            q_fp8,
            k,
            weights,
            self.quant_block_size,
            self.scale_fmt,
            self.topk_tokens,
            self.head_dim,
            self.max_model_len,
            self.max_total_seq_len,
            self.topk_indices_buffer,
        )

config instance-attribute

config = config

head_dim instance-attribute

head_dim = index_head_dim

k_cache instance-attribute

k_cache = DeepseekV32IndexerCache(
    head_dim=head_dim + head_dim // quant_block_size * 4,
    dtype=uint8,
    prefix=f"{prefix}.k_cache",
    cache_config=cache_config,
)

k_norm instance-attribute

k_norm = LayerNorm(head_dim, eps=1e-06)

max_model_len instance-attribute

max_model_len = max_model_len

max_total_seq_len instance-attribute

max_total_seq_len = get_max_prefill_buffer_size(vllm_config)

n_head instance-attribute

n_head = index_n_heads

prefix instance-attribute

prefix = prefix

q_lora_rank instance-attribute

q_lora_rank = q_lora_rank

quant_block_size instance-attribute

quant_block_size = 128

rope_dim instance-attribute

rope_dim = qk_rope_head_dim

scale_fmt instance-attribute

scale_fmt = 'ue8m0'

softmax_scale instance-attribute

softmax_scale = head_dim ** -0.5

topk_indices_buffer instance-attribute

topk_indices_buffer = topk_indices_buffer

topk_tokens instance-attribute

topk_tokens = index_topk

vllm_config instance-attribute

vllm_config = vllm_config

weights_proj instance-attribute

weights_proj = ReplicatedLinear(
    hidden_size,
    n_head,
    quant_config=None,
    prefix=f"{prefix}.weights_proj",
)

wk instance-attribute

wk = ReplicatedLinear(
    hidden_size,
    head_dim,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.wk",
)

wq_b instance-attribute

wq_b = ReplicatedLinear(
    q_lora_rank,
    head_dim * n_head,
    bias=False,
    quant_config=quant_config,
    prefix=f"{prefix}.wq_b",
)

__init__

__init__(
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    q_lora_rank: int,
    quant_config: QuantizationConfig | None,
    cache_config: CacheConfig | None,
    topk_indices_buffer: Tensor | None,
    prefix: str = "",
)
Source code in vllm/model_executor/models/deepseek_v2.py
def __init__(
    self,
    vllm_config: VllmConfig,
    config: DeepseekV2Config | DeepseekV3Config,
    hidden_size: int,
    q_lora_rank: int,
    quant_config: QuantizationConfig | None,
    cache_config: CacheConfig | None,
    topk_indices_buffer: torch.Tensor | None,
    prefix: str = "",
):
    super().__init__()
    self.vllm_config = vllm_config
    self.config = config
    # self.indexer_cfg = config.attn_module_list_cfg[0]["attn_index"]
    self.topk_tokens = config.index_topk
    self.n_head = config.index_n_heads  # 64
    self.head_dim = config.index_head_dim  # 128
    self.rope_dim = config.qk_rope_head_dim  # 64
    self.q_lora_rank = q_lora_rank  # 1536
    # no tensor parallel, just replicated
    self.wq_b = ReplicatedLinear(
        self.q_lora_rank,
        self.head_dim * self.n_head,
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.wq_b",
    )
    self.wk = ReplicatedLinear(
        hidden_size,
        self.head_dim,
        bias=False,
        quant_config=quant_config,
        prefix=f"{prefix}.wk",
    )
    self.k_norm = LayerNorm(self.head_dim, eps=1e-6)
    self.weights_proj = ReplicatedLinear(
        hidden_size, self.n_head, quant_config=None, prefix=f"{prefix}.weights_proj"
    )
    self.softmax_scale = self.head_dim**-0.5

    self.scale_fmt = "ue8m0"
    self.quant_block_size = 128  # TODO: get from config
    self.topk_indices_buffer = topk_indices_buffer

    # NOTE: (zyongye) we use fp8 naive cache,
    #       where we store value in fp8 and scale in fp32
    #       per self.quant_block_size element
    self.k_cache = DeepseekV32IndexerCache(
        head_dim=self.head_dim + self.head_dim // self.quant_block_size * 4,
        dtype=torch.uint8,
        prefix=f"{prefix}.k_cache",
        cache_config=cache_config,
    )
    self.max_model_len = vllm_config.model_config.max_model_len
    self.prefix = prefix
    from vllm.v1.attention.backends.mla.indexer import get_max_prefill_buffer_size

    self.max_total_seq_len = get_max_prefill_buffer_size(vllm_config)

forward

forward(
    hidden_states: Tensor, qr: Tensor, positions, rotary_emb
) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def forward(
    self, hidden_states: torch.Tensor, qr: torch.Tensor, positions, rotary_emb
) -> torch.Tensor:
    q, _ = self.wq_b(qr)
    q = q.view(-1, self.n_head, self.head_dim)
    q_pe, q_nope = torch.split(
        q, [self.rope_dim, self.head_dim - self.rope_dim], dim=-1
    )

    k, _ = self.wk(hidden_states)
    k = self.k_norm(k)
    k_pe, k_nope = torch.split(
        k, [self.rope_dim, self.head_dim - self.rope_dim], dim=-1
    )

    q_pe, k_pe = rotary_emb(positions, q_pe, k_pe.unsqueeze(1))
    q = torch.cat([q_pe, q_nope], dim=-1)
    k = torch.cat([k_pe.squeeze(1), k_nope], dim=-1)

    # we only quant q here since k quant is fused with cache insertion
    q = q.view(-1, self.head_dim)
    q_fp8, q_scale = per_token_group_quant_fp8(
        q,
        self.quant_block_size,
        column_major_scales=False,
        use_ue8m0=self.scale_fmt is not None,
    )
    q_fp8 = q_fp8.view(-1, self.n_head, self.head_dim)
    q_scale = q_scale.view(-1, self.n_head, 1)

    weights, _ = self.weights_proj(hidden_states)
    weights = (
        weights.unsqueeze(-1) * q_scale * self.softmax_scale * self.n_head**-0.5
    )
    weights = weights.squeeze(-1)

    return torch.ops.vllm.sparse_attn_indexer(
        hidden_states,
        self.k_cache.prefix,
        self.k_cache.kv_cache[0],
        q_fp8,
        k,
        weights,
        self.quant_block_size,
        self.scale_fmt,
        self.topk_tokens,
        self.head_dim,
        self.max_model_len,
        self.max_total_seq_len,
        self.topk_indices_buffer,
    )

get_spec_layer_idx_from_weight_name

get_spec_layer_idx_from_weight_name(
    config: DeepseekV2Config | DeepseekV3Config,
    weight_name: str,
) -> int | None
Source code in vllm/model_executor/models/deepseek_v2.py
def get_spec_layer_idx_from_weight_name(
    config: DeepseekV2Config | DeepseekV3Config, weight_name: str
) -> int | None:
    if (
        hasattr(config, "num_nextn_predict_layers")
        and config.num_nextn_predict_layers > 0
    ):
        layer_idx = config.num_hidden_layers
        for i in range(config.num_nextn_predict_layers):
            if weight_name.startswith(f"model.layers.{layer_idx + i}."):
                return layer_idx + i
    return None

sparse_attn_indexer

sparse_attn_indexer(
    hidden_states: Tensor,
    k_cache_prefix: str,
    kv_cache: Tensor,
    q_fp8: Tensor,
    k: Tensor,
    weights: Tensor,
    quant_block_size: int,
    scale_fmt: str | None,
    topk_tokens: int,
    head_dim: int,
    max_model_len: int,
    total_seq_lens: int,
    topk_indices_buffer: Tensor | None,
) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def sparse_attn_indexer(
    hidden_states: torch.Tensor,
    k_cache_prefix: str,
    kv_cache: torch.Tensor,
    q_fp8: torch.Tensor,
    k: torch.Tensor,
    weights: torch.Tensor,
    quant_block_size: int,
    scale_fmt: str | None,
    topk_tokens: int,
    head_dim: int,
    max_model_len: int,
    total_seq_lens: int,
    topk_indices_buffer: torch.Tensor | None,
) -> torch.Tensor:
    # careful! this will be None in dummy run
    attn_metadata = get_forward_context().attn_metadata
    # assert isinstance(attn_metadata, dict)
    if not isinstance(attn_metadata, dict):
        return sparse_attn_indexer_fake(
            hidden_states,
            k_cache_prefix,
            kv_cache,
            q_fp8,
            k,
            weights,
            quant_block_size,
            scale_fmt,
            topk_tokens,
            head_dim,
            max_model_len,
            total_seq_lens,
            topk_indices_buffer,
        )
    attn_metadata = attn_metadata[k_cache_prefix]
    assert isinstance(attn_metadata, DeepseekV32IndexerMetadata)
    slot_mapping = attn_metadata.slot_mapping
    has_decode = attn_metadata.num_decodes > 0
    has_prefill = attn_metadata.num_prefills > 0
    num_decode_tokens = attn_metadata.num_decode_tokens

    ops.indexer_k_quant_and_cache(
        k,
        kv_cache,
        slot_mapping,
        quant_block_size,
        scale_fmt,
    )

    topk_indices_buffer[: hidden_states.shape[0]] = -1
    if has_prefill:
        prefill_metadata = attn_metadata.prefill
        for chunk in prefill_metadata.chunks:
            k_fp8 = torch.empty(
                [chunk.total_seq_lens, head_dim],
                device=k.device,
                dtype=torch.float8_e4m3fn,
            )
            k_scale = torch.empty(
                [chunk.total_seq_lens, 4],
                device=k.device,
                dtype=torch.uint8,
            )
            ops.cp_gather_indexer_k_quant_cache(
                kv_cache,
                k_fp8,
                k_scale,
                chunk.block_table,
                chunk.cu_seq_lens,
            )
            logits = fp8_mqa_logits(
                q_fp8[chunk.token_start : chunk.token_end],
                (k_fp8, k_scale.view(torch.float32)),
                weights[chunk.token_start : chunk.token_end],
                chunk.cu_seqlen_ks,
                chunk.cu_seqlen_ke,
            )
            num_rows = logits.shape[0]
            assert topk_tokens == 2048, "top_k_per_row assumes size 2048"
            topk_indices = topk_indices_buffer[
                chunk.token_start : chunk.token_end, :topk_tokens
            ]
            torch.ops._C.top_k_per_row(
                logits,
                chunk.cu_seqlen_ks,
                chunk.cu_seqlen_ke,
                topk_indices,
                num_rows,
                logits.stride(0),
                logits.stride(1),
            )

    if has_decode:
        decode_metadata = attn_metadata.decode
        # kv_cache size requirement [num_block, block_size, n_head, head_dim],
        # we only have [num_block, block_size, head_dim],
        kv_cache = kv_cache.unsqueeze(-2)
        decode_lens = decode_metadata.decode_lens
        if decode_metadata.requires_padding:
            # pad in edge case where we have short chunked prefill length <
            # decode_threshold since we unstrictly split
            # prefill and decode by decode_threshold
            # (currently set to 1 + speculative tokens)
            padded_q_fp8_decode_tokens = pack_seq_triton(
                q_fp8[:num_decode_tokens], decode_lens
            )
        else:
            padded_q_fp8_decode_tokens = q_fp8[:num_decode_tokens].reshape(
                decode_lens.shape[0], -1, *q_fp8.shape[1:]
            )
        # TODO: move and optimize below logic with triton kernels
        batch_size = padded_q_fp8_decode_tokens.shape[0]
        next_n = padded_q_fp8_decode_tokens.shape[1]
        assert batch_size == decode_metadata.seq_lens.shape[0]
        num_padded_tokens = batch_size * next_n
        logits = fp8_paged_mqa_logits(
            padded_q_fp8_decode_tokens,
            kv_cache,
            weights[:num_padded_tokens],
            decode_metadata.seq_lens,
            decode_metadata.block_table,
            decode_metadata.schedule_metadata,
            max_model_len=max_model_len,
        )
        num_rows = logits.shape[0]
        assert topk_tokens == 2048, "top_k_per_row assumes size 2048"
        topk_indices = topk_indices_buffer[:num_decode_tokens, :topk_tokens]

        torch.ops._C.top_k_per_row_decode(
            logits,
            next_n,
            decode_metadata.seq_lens,
            topk_indices,
            num_rows,
            logits.stride(0),
            logits.stride(1),
        )
        if decode_metadata.requires_padding:
            # if padded, we need to unpack
            # the topk indices removing padded tokens
            topk_indices = unpack_seq_triton(
                topk_indices.reshape(batch_size, -1, topk_indices.shape[-1]),
                decode_lens,
            )
            topk_indices_buffer[:num_decode_tokens, : topk_indices.shape[-1]] = (
                topk_indices
            )

    return topk_indices_buffer

sparse_attn_indexer_fake

sparse_attn_indexer_fake(
    hidden_states: Tensor,
    k_cache_prefix: str,
    kv_cache: Tensor,
    q_fp8: Tensor,
    k: Tensor,
    weights: Tensor,
    quant_block_size: int,
    scale_fmt: str | None,
    topk_tokens: int,
    head_dim: int,
    max_model_len: int,
    total_seq_lens: int,
    topk_indices_buffer: Tensor | None,
) -> Tensor
Source code in vllm/model_executor/models/deepseek_v2.py
def sparse_attn_indexer_fake(
    hidden_states: torch.Tensor,
    k_cache_prefix: str,
    kv_cache: torch.Tensor,
    q_fp8: torch.Tensor,
    k: torch.Tensor,
    weights: torch.Tensor,
    quant_block_size: int,
    scale_fmt: str | None,
    topk_tokens: int,
    head_dim: int,
    max_model_len: int,
    total_seq_lens: int,
    topk_indices_buffer: torch.Tensor | None,
) -> torch.Tensor:
    # profile run
    # NOTE(Chen): create the max possible flattened_kv. So that
    # profile_run can get correct memory usage.
    _flattened_kv = torch.empty(
        [total_seq_lens, head_dim + 4], device=k.device, dtype=torch.uint8
    )
    _k_fp8 = _flattened_kv[..., :head_dim].view(torch.float8_e4m3fn).contiguous()
    _k_scale = _flattened_kv[..., head_dim:].view(torch.float32).contiguous()
    return topk_indices_buffer

yarn_get_mscale

yarn_get_mscale(
    scale: float = 1, mscale: float = 1
) -> float
Source code in vllm/model_executor/models/deepseek_v2.py
def yarn_get_mscale(scale: float = 1, mscale: float = 1) -> float:
    import math

    if scale <= 1:
        return 1.0
    return 0.1 * mscale * math.log(scale) + 1.0