Skip to content

vllm.v1.worker.ec_connector_model_runner_mixin

Define EC connector functionality mixin for model runners.

logger module-attribute

logger = init_logger(__name__)

ECConnectorModelRunnerMixin

Source code in vllm/v1/worker/ec_connector_model_runner_mixin.py
class ECConnectorModelRunnerMixin:
    @staticmethod
    def maybe_save_ec_to_connector(
        encoder_cache: dict[str, torch.Tensor],
        mm_hash: str,
    ):
        if not has_ec_transfer():
            logger.debug("Not have ec transfer please check")
            return
        connector = get_ec_transfer()
        connector.save_caches(encoder_cache=encoder_cache, mm_hash=mm_hash)

    @staticmethod
    def get_finished_ec_transfers(
        scheduler_output: "SchedulerOutput",
    ) -> tuple[set[str] | None, set[str] | None]:
        if has_ec_transfer():
            return get_ec_transfer().get_finished(scheduler_output.finished_req_ids)
        return None, None

    @staticmethod
    def maybe_get_ec_connector_output(
        scheduler_output: "SchedulerOutput",
        encoder_cache: dict[str, torch.Tensor],
        **kwargs,
    ) -> AbstractContextManager[ECConnectorOutput | None]:
        return (
            ECConnectorModelRunnerMixin._get_ec_connector_output(
                scheduler_output, encoder_cache, **kwargs
            )
            if has_ec_transfer()
            else nullcontext()
        )

    # This context manager must be used within an active forward context.
    # It encapsulates the entire EC conector lifecycle within execute_model
    @staticmethod
    @contextmanager
    def _get_ec_connector_output(
        scheduler_output: "SchedulerOutput",
        encoder_cache: dict[str, torch.Tensor],
        **kwargs,
    ) -> Generator[ECConnectorOutput, None, None]:
        output = ECConnectorOutput()

        ec_connector = get_ec_transfer()
        assert isinstance(ec_connector, ECConnectorBase)
        assert scheduler_output.ec_connector_metadata is not None
        ec_connector.bind_connector_metadata(scheduler_output.ec_connector_metadata)

        if not ec_connector.is_producer:
            ec_connector.start_load_caches(encoder_cache, **kwargs)

        try:
            yield output
        finally:
            output.finished_sending, output.finished_recving = (
                ec_connector.get_finished(scheduler_output.finished_req_ids)
            )

            ec_connector.clear_connector_metadata()

_get_ec_connector_output staticmethod

_get_ec_connector_output(
    scheduler_output: SchedulerOutput,
    encoder_cache: dict[str, Tensor],
    **kwargs,
) -> Generator[ECConnectorOutput, None, None]
Source code in vllm/v1/worker/ec_connector_model_runner_mixin.py
@staticmethod
@contextmanager
def _get_ec_connector_output(
    scheduler_output: "SchedulerOutput",
    encoder_cache: dict[str, torch.Tensor],
    **kwargs,
) -> Generator[ECConnectorOutput, None, None]:
    output = ECConnectorOutput()

    ec_connector = get_ec_transfer()
    assert isinstance(ec_connector, ECConnectorBase)
    assert scheduler_output.ec_connector_metadata is not None
    ec_connector.bind_connector_metadata(scheduler_output.ec_connector_metadata)

    if not ec_connector.is_producer:
        ec_connector.start_load_caches(encoder_cache, **kwargs)

    try:
        yield output
    finally:
        output.finished_sending, output.finished_recving = (
            ec_connector.get_finished(scheduler_output.finished_req_ids)
        )

        ec_connector.clear_connector_metadata()

get_finished_ec_transfers staticmethod

get_finished_ec_transfers(
    scheduler_output: SchedulerOutput,
) -> tuple[set[str] | None, set[str] | None]
Source code in vllm/v1/worker/ec_connector_model_runner_mixin.py
@staticmethod
def get_finished_ec_transfers(
    scheduler_output: "SchedulerOutput",
) -> tuple[set[str] | None, set[str] | None]:
    if has_ec_transfer():
        return get_ec_transfer().get_finished(scheduler_output.finished_req_ids)
    return None, None

maybe_get_ec_connector_output staticmethod

maybe_get_ec_connector_output(
    scheduler_output: SchedulerOutput,
    encoder_cache: dict[str, Tensor],
    **kwargs,
) -> AbstractContextManager[ECConnectorOutput | None]
Source code in vllm/v1/worker/ec_connector_model_runner_mixin.py
@staticmethod
def maybe_get_ec_connector_output(
    scheduler_output: "SchedulerOutput",
    encoder_cache: dict[str, torch.Tensor],
    **kwargs,
) -> AbstractContextManager[ECConnectorOutput | None]:
    return (
        ECConnectorModelRunnerMixin._get_ec_connector_output(
            scheduler_output, encoder_cache, **kwargs
        )
        if has_ec_transfer()
        else nullcontext()
    )

maybe_save_ec_to_connector staticmethod

maybe_save_ec_to_connector(
    encoder_cache: dict[str, Tensor], mm_hash: str
)
Source code in vllm/v1/worker/ec_connector_model_runner_mixin.py
@staticmethod
def maybe_save_ec_to_connector(
    encoder_cache: dict[str, torch.Tensor],
    mm_hash: str,
):
    if not has_ec_transfer():
        logger.debug("Not have ec transfer please check")
        return
    connector = get_ec_transfer()
    connector.save_caches(encoder_cache=encoder_cache, mm_hash=mm_hash)