Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions tests/basic_correctness/test_basic_correctness.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,9 @@ def test_models_distributed(
and distributed_executor_backend == "ray"
and attention_backend == ""
and test_suite == "L4"
and enable_prompt_embeds
): # noqa
if enable_prompt_embeds:
pytest.skip("enable_prompt_embeds does not work with ray compiled dag.")
monkeypatch_context.setenv("VLLM_USE_RAY_SPMD_WORKER", "1")
monkeypatch_context.setenv("VLLM_USE_RAY_COMPILED_DAG", "1")
pytest.skip("enable_prompt_embeds does not work with ray compiled dag.")

if attention_backend:
monkeypatch_context.setenv(
Expand Down
2 changes: 1 addition & 1 deletion tests/distributed/test_multi_node_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

from vllm import initialize_ray_cluster
from vllm.config import ParallelConfig
from vllm.executor.ray_utils import _wait_until_pg_removed
from vllm.utils.network_utils import get_ip
from vllm.v1.executor.ray_utils import _wait_until_pg_removed

VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"

Expand Down
4 changes: 1 addition & 3 deletions tests/distributed/test_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,8 @@ def _compare_tp(
common_args.extend(["--max-num-seqs", f"{max_num_seqs}"])

if distributed_backend == "ray":
# For V1, test Ray Compiled Graph for all the tests
# Test Ray Compiled Graph for all the tests
pp_env = {
"VLLM_USE_RAY_COMPILED_DAG": "1",
"VLLM_USE_RAY_SPMD_WORKER": "1",
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1",
}
# Temporary. Currently when zeromq + SPMD is used, it does not properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from vllm.model_executor.model_loader import tensorizer as tensorizer_mod
from vllm.model_executor.model_loader.tensorizer import TensorizerConfig
from vllm.utils.network_utils import get_distributed_init_method, get_ip, get_open_port
from vllm.v1.executor.abstract import UniProcExecutor
from vllm.v1.executor import UniProcExecutor
from vllm.v1.worker.worker_base import WorkerWrapperBase

MODEL_REF = "facebook/opt-125m"
Expand Down
3 changes: 2 additions & 1 deletion tests/v1/engine/test_engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from vllm.utils.torch_utils import set_default_torch_num_threads
from vllm.v1.engine import EngineCoreRequest
from vllm.v1.engine.core import EngineCore
from vllm.v1.executor.abstract import Executor, UniProcExecutor
from vllm.v1.executor.abstract import Executor
from vllm.v1.executor.uniproc_executor import UniProcExecutor
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.outputs import ModelRunnerOutput

Expand Down
8 changes: 4 additions & 4 deletions tools/pre_commit/check_pickle_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# add to this list if absolutely necessary and after careful security review.
ALLOWED_FILES = {
# pickle
"vllm/v1/serial_utils.py",
"vllm/v1/executor/multiproc_executor.py",
"vllm/multimodal/hasher.py",
"vllm/transformers_utils/config.py",
"vllm/model_executor/models/registry.py",
Expand All @@ -38,11 +36,13 @@
"benchmarks/cutlass_benchmarks/w8a8_benchmarks.py",
"benchmarks/cutlass_benchmarks/sparse_benchmarks.py",
# cloudpickle
"vllm/executor/mp_distributed_executor.py",
"vllm/executor/ray_distributed_executor.py",
"vllm/v1/executor/multiproc_executor.py",
"vllm/v1/executor/ray_executor.py",
"vllm/entrypoints/llm.py",
"vllm/utils/__init__.py",
"tests/utils.py",
# pickle and cloudpickle
"vllm/v1/serial_utils.py",
}

PICKLE_RE = re.compile(
Expand Down
4 changes: 2 additions & 2 deletions vllm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"AsyncLLMEngine": ".engine.async_llm_engine:AsyncLLMEngine",
"LLMEngine": ".engine.llm_engine:LLMEngine",
"LLM": ".entrypoints.llm:LLM",
"initialize_ray_cluster": ".executor.ray_utils:initialize_ray_cluster",
"initialize_ray_cluster": ".v1.executor.ray_utils:initialize_ray_cluster",
"PromptType": ".inputs:PromptType",
"TextPrompt": ".inputs:TextPrompt",
"TokensPrompt": ".inputs:TokensPrompt",
Expand All @@ -45,7 +45,6 @@
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.engine.llm_engine import LLMEngine
from vllm.entrypoints.llm import LLM
from vllm.executor.ray_utils import initialize_ray_cluster
from vllm.inputs import PromptType, TextPrompt, TokensPrompt
from vllm.model_executor.models import ModelRegistry
from vllm.outputs import (
Expand All @@ -62,6 +61,7 @@
)
from vllm.pooling_params import PoolingParams
from vllm.sampling_params import SamplingParams
from vllm.v1.executor.ray_utils import initialize_ray_cluster

from ._bc_linter import bc_linter_include, bc_linter_skip
else:
Expand Down
22 changes: 14 additions & 8 deletions vllm/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from ray.runtime_env import RuntimeEnv
from ray.util.placement_group import PlacementGroup

from vllm.executor.executor_base import ExecutorBase
from vllm.v1.executor import Executor
else:
RuntimeEnv = Any
PlacementGroup = Any
ExecutorBase = Any
Executor = Any

logger = init_logger(__name__)

Expand Down Expand Up @@ -189,7 +189,7 @@ class ParallelConfig:
"""ray distributed model workers placement group."""

distributed_executor_backend: (
str | DistributedExecutorBackend | type[ExecutorBase] | None
str | DistributedExecutorBackend | type[Executor] | None
) = None
"""Backend to use for distributed model
workers, either "ray" or "mp" (multiprocessing). If the product
Expand Down Expand Up @@ -511,7 +511,7 @@ def __post_init__(self) -> None:
# We use multiprocessing by default if world_size fits on the
# current node and we aren't in a ray placement group.

from vllm.executor import ray_utils
from vllm.v1.executor import ray_utils

backend: DistributedExecutorBackend = "mp"
ray_found = ray_utils.ray_is_available()
Expand Down Expand Up @@ -553,6 +553,12 @@ def __post_init__(self) -> None:
if self.distributed_executor_backend is None and self.world_size == 1:
self.distributed_executor_backend = "uni"

if self.max_parallel_loading_workers is not None:
logger.warning(
"max_parallel_loading_workers is currently "
"not supported and will be ignored."
)

@property
def use_ray(self) -> bool:
return self.distributed_executor_backend == "ray" or (
Expand All @@ -563,7 +569,7 @@ def use_ray(self) -> bool:
@model_validator(mode="after")
def _verify_args(self) -> Self:
# Lazy import to avoid circular import
from vllm.executor.executor_base import ExecutorBase
from vllm.v1.executor import Executor

# Enable batch invariance settings if requested
if vllm_is_batch_invariant():
Expand All @@ -574,17 +580,17 @@ def _verify_args(self) -> Self:
and not isinstance(self.distributed_executor_backend, str)
and not (
isinstance(self.distributed_executor_backend, type)
and issubclass(self.distributed_executor_backend, ExecutorBase)
and issubclass(self.distributed_executor_backend, Executor)
)
):
raise ValueError(
"Unrecognized distributed executor backend "
f"{self.distributed_executor_backend}. Supported "
"values are 'ray', 'mp' 'uni', 'external_launcher', "
" custom ExecutorBase subclass or its import path."
" custom Executor subclass or its import path."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably no backwards compat is needed for custom subclasses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ExecutorBase was the superclass for V0 and V1 executors, now that V0 is gone any that extended ExecutorBase directly wouldn't work anyhow.

)
if self.use_ray:
from vllm.executor import ray_utils
from vllm.v1.executor import ray_utils

ray_utils.assert_ray_available()

Expand Down
6 changes: 0 additions & 6 deletions vllm/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,6 @@ class SchedulerConfig:
NOTE: This is not currently configurable. It will be overridden by
max_num_batched_tokens in case max multimodal embedding size is larger."""

send_delta_data: bool = False
"""Private API. If used, scheduler sends delta data to
workers instead of an entire data. It should be enabled only
when SPMD worker architecture is enabled. I.e.,
VLLM_USE_RAY_SPMD_WORKER=1"""

policy: SchedulerPolicy = "fcfs"
"""The scheduling policy to use:\n
- "fcfs" means first come first served, i.e. requests are handled in order
Expand Down
2 changes: 1 addition & 1 deletion vllm/distributed/device_communicators/tpu_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)

if USE_RAY:
from vllm.executor import ray_utils
from vllm.v1.executor import ray_utils


class TpuCommunicator(DeviceCommunicatorBase):
Expand Down
7 changes: 3 additions & 4 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@
from vllm.v1.sample.logits_processor import LogitsProcessor

if TYPE_CHECKING:
from vllm.executor.executor_base import ExecutorBase
from vllm.model_executor.layers.quantization import QuantizationMethods
from vllm.model_executor.model_loader import LoadFormats
from vllm.usage.usage_lib import UsageContext
from vllm.v1.executor import Executor
else:
ExecutorBase = Any
Executor = Any
QuantizationMethods = Any
LoadFormats = Any
UsageContext = Any
Expand Down Expand Up @@ -369,7 +369,7 @@ class EngineArgs:
# is intended for expert use only. The API may change without
# notice.
distributed_executor_backend: (
str | DistributedExecutorBackend | type[ExecutorBase] | None
str | DistributedExecutorBackend | type[Executor] | None
) = ParallelConfig.distributed_executor_backend
# number of P/D disaggregation (or other disaggregation) workers
pipeline_parallel_size: int = ParallelConfig.pipeline_parallel_size
Expand Down Expand Up @@ -1549,7 +1549,6 @@ def create_engine_config(
disable_chunked_mm_input=self.disable_chunked_mm_input,
is_multimodal_model=model_config.is_multimodal_model,
is_encoder_decoder=model_config.is_encoder_decoder,
send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER and parallel_config.use_ray),
policy=self.scheduling_policy,
scheduler_cls=self.scheduler_cls,
max_num_partial_prefills=self.max_num_partial_prefills,
Expand Down
2 changes: 1 addition & 1 deletion vllm/entrypoints/cli/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from vllm.utils.network_utils import get_tcp_uri
from vllm.v1.engine.core import EngineCoreProc
from vllm.v1.engine.utils import CoreEngineProcManager, launch_core_engines
from vllm.v1.executor.abstract import Executor
from vllm.v1.executor import Executor
from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus
from vllm.v1.utils import APIServerProcessManager, wait_for_completion_or_failure

Expand Down
23 changes: 1 addition & 22 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
VLLM_XLA_CHECK_RECOMPILATION: bool = False
VLLM_FUSED_MOE_CHUNK_SIZE: int = 64 * 1024
VLLM_ENABLE_FUSED_MOE_ACTIVATION_CHUNKING: bool = True
VLLM_USE_RAY_SPMD_WORKER: bool = False
VLLM_USE_RAY_COMPILED_DAG: bool = False
VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: Literal["auto", "nccl", "shm"] = "auto"
VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM: bool = False
VLLM_USE_RAY_WRAPPED_PP_COMM: bool = True
Expand Down Expand Up @@ -623,43 +621,24 @@ def get_vllm_port() -> int | None:
"VLLM_CPU_MOE_PREPACK": lambda: bool(int(os.getenv("VLLM_CPU_MOE_PREPACK", "1"))),
# (CPU backend only) whether to use SGL kernels, optimized for small batch.
"VLLM_CPU_SGL_KERNEL": lambda: bool(int(os.getenv("VLLM_CPU_SGL_KERNEL", "0"))),
# If the env var is set, then all workers will execute as separate
# processes from the engine, and we use the same mechanism to trigger
# execution on all workers.
# Run vLLM with VLLM_USE_RAY_SPMD_WORKER=1 to enable it.
"VLLM_USE_RAY_SPMD_WORKER": lambda: bool(
int(os.getenv("VLLM_USE_RAY_SPMD_WORKER", "0"))
),
# If the env var is set, it uses the Ray's Compiled Graph
# (previously known as ADAG) API which optimizes the
# control plane overhead.
# Run vLLM with VLLM_USE_RAY_COMPILED_DAG=1 to enable it.
# Note that this variable is set to 1 in V1 by default
# when ray distributed executor is used.
"VLLM_USE_RAY_COMPILED_DAG": lambda: bool(
int(os.getenv("VLLM_USE_RAY_COMPILED_DAG", "0"))
),
# If the env var is set, Ray Compiled Graph uses the specified
# channel type to communicate between workers belonging to
# different pipeline-parallel stages.
# Available options:
# - "auto": use the default channel type
# - "nccl": use NCCL for communication
# - "shm": use shared memory and gRPC for communication
# This flag is ignored if VLLM_USE_RAY_COMPILED_DAG is not set.
"VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE": env_with_choices(
"VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE", "auto", ["auto", "nccl", "shm"]
),
# If the env var is set, it enables GPU communication overlap
# (experimental feature) in Ray's Compiled Graph. This flag is ignored if
# VLLM_USE_RAY_COMPILED_DAG is not set.
# (experimental feature) in Ray's Compiled Graph.
"VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM": lambda: bool(
int(os.getenv("VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM", "0"))
),
# If the env var is set, it uses a Ray Communicator wrapping
# vLLM's pipeline parallelism communicator to interact with Ray's
# Compiled Graph. Otherwise, it uses Ray's NCCL communicator.
# This flag is ignored if VLLM_USE_RAY_COMPILED_DAG is not set.
"VLLM_USE_RAY_WRAPPED_PP_COMM": lambda: bool(
int(os.getenv("VLLM_USE_RAY_WRAPPED_PP_COMM", "1"))
),
Expand Down
Empty file removed vllm/executor/__init__.py
Empty file.
Loading