Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contributing/AUTOSCALING.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- STEP 7: `scale_run_replicas` terminates or starts replicas.
- `SUBMITTED` and `PROVISIONING` replicas get terminated before `RUNNING`.
- Replicas are terminated by descending `replica_num` and launched by ascending `replica_num`.
- For services with `replica_groups`, only groups with autoscaling ranges (min != max) participate in scaling.
- Scale operations respect per-group minimum and maximum constraints.

## RPSAutoscaler

Expand Down
2 changes: 1 addition & 1 deletion contributing/RUNS-AND-JOBS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Runs are created from run configurations. There are three types of run configura
2. `task` — runs the user's bash script until completion.
3. `service` — runs the user's bash script and exposes a port through [dstack-proxy](PROXY.md).

A run can spawn one or multiple jobs, depending on the configuration. A task that specifies multiple `nodes` spawns a job for every node (a multi-node task). A service that specifies multiple `replicas` spawns a job for every replica. A job submission is always assigned to one particular instance. If a job fails and the configuration allows retrying, the server creates a new job submission for the job.
A run can spawn one or multiple jobs, depending on the configuration. A task that specifies multiple `nodes` spawns a job for every node (a multi-node task). A service that specifies multiple `replicas` or `replica_groups` spawns a job for every replica. Each job in a replica group is tagged with `replica_group_name` to track which group it belongs to. A job submission is always assigned to one particular instance. If a job fails and the configuration allows retrying, the server creates a new job submission for the job.

## Run's Lifecycle

Expand Down
60 changes: 60 additions & 0 deletions docs/docs/concepts/services.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,66 @@ Setting the minimum number of replicas to `0` allows the service to scale down t

> The `scaling` property requires creating a [gateway](gateways.md).

### Replica Groups (Advanced)

For advanced use cases, you can define multiple **replica groups** with different instance types, resources, and configurations within a single service. This is useful when you want to:

- Run different GPU types in the same service (e.g., H100 for primary, RTX5090 for overflow)
- Configure different backends or regions per replica type
- Set different autoscaling behavior per group

<div editor-title="service.dstack.yml">

```yaml
type: service
name: llama31-service

python: 3.12
env:
- HF_TOKEN
commands:
- uv pip install vllm
- vllm serve meta-llama/Meta-Llama-3.1-8B-Instruct --max-model-len 4096
port: 8000

# Define multiple replica groups with different configurations
replica_groups:
- name: primary
replicas: 1 # Always 1 H100 (fixed)
resources:
gpu: H100:1
backends: [aws]
regions: [us-west-2]

- name: overflow
replicas: 0..5 # Autoscales 0-5 RTX5090s
resources:
gpu: RTX5090:1
backends: [runpod]

scaling:
metric: rps
target: 10
```

</div>

In this example:

- The `primary` group always runs 1 H100 replica on AWS (fixed, never scaled)
- The `overflow` group scales 0-5 RTX5090 replicas on RunPod based on load
- Scale operations only affect groups with autoscaling ranges (min != max)

Each replica group can override any [profile parameter](../reference/profiles.yml.md) including `backends`, `regions`, `instance_types`, `spot_policy`, etc. Group-level settings override service-level settings.

> **Note:** When using `replica_groups`, you cannot use the simple `replicas` field. They are mutually exclusive.

**When to use replica groups:**

- You need different GPU types in the same service
- Different replicas should run in different regions or clouds
- Some replicas should be fixed while others autoscale

### Model

If the service is running a chat model with an OpenAI-compatible interface,
Expand Down
16 changes: 16 additions & 0 deletions docs/docs/reference/dstack.yml/service.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ The `service` configuration type allows running [services](../../concepts/servic
type:
required: true

### `replica_groups`

Define multiple replica groups with different configurations within a single service.

> **Note:** Cannot be used together with `replicas`.

#### `replica_groups[n]`

#SCHEMA# dstack._internal.core.models.configurations.ReplicaGroup
overrides:
show_root_heading: false
type:
required: true

Each replica group inherits from [ProfileParams](../profiles.yml.md) and can override any profile parameter including `backends`, `regions`, `instance_types`, `spot_policy`, etc.

### `model` { data-toc-label="model" }

=== "OpenAI"
Expand Down
256 changes: 218 additions & 38 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,55 @@ def th(s: str) -> str:
if include_run_properties:
props.add_row(th("Configuration"), run_spec.configuration_path)
props.add_row(th("Type"), run_spec.configuration.type)
props.add_row(th("Resources"), pretty_req)
props.add_row(th("Spot policy"), spot_policy)
props.add_row(th("Max price"), max_price)

from dstack._internal.core.models.configurations import ServiceConfiguration

has_replica_groups = (
include_run_properties
and isinstance(run_spec.configuration, ServiceConfiguration)
and run_spec.configuration.replica_groups
)

if has_replica_groups:
groups_info = []
for group in run_spec.configuration.replica_groups:
group_parts = [f"[cyan]{group.name}[/cyan]"]

# Replica count
if group.replicas.min == group.replicas.max:
group_parts.append(f"×{group.replicas.max}")
else:
group_parts.append(f"×{group.replicas.min}..{group.replicas.max}")
group_parts.append("[dim](autoscalable)[/dim]")

# Resources
group_parts.append(f"[dim]({group.resources.pretty_format()})[/dim]")

# Group-specific overrides
overrides = []
if group.spot_policy is not None:
overrides.append(f"spot={group.spot_policy.value}")
if group.regions:
regions_str = ",".join(group.regions[:2]) # Show first 2
if len(group.regions) > 2:
regions_str += f",+{len(group.regions) - 2}"
overrides.append(f"regions={regions_str}")
if group.backends:
backends_str = ",".join([b.value for b in group.backends[:2]])
if len(group.backends) > 2:
backends_str += f",+{len(group.backends) - 2}"
overrides.append(f"backends={backends_str}")

if overrides:
group_parts.append(f"[dim]({'; '.join(overrides)})[/dim]")

groups_info.append(" ".join(group_parts))

props.add_row(th("Replica groups"), "\n".join(groups_info))
else:
props.add_row(th("Resources"), pretty_req)
props.add_row(th("Spot policy"), spot_policy)
props.add_row(th("Max price"), max_price)
if include_run_properties:
props.add_row(th("Retry policy"), retry)
props.add_row(th("Creation policy"), creation_policy)
Expand All @@ -139,44 +185,172 @@ def th(s: str) -> str:
offers.add_column("PRICE", style="grey58", ratio=1)
offers.add_column()

job_plan.offers = job_plan.offers[:max_offers] if max_offers else job_plan.offers

for i, offer in enumerate(job_plan.offers, start=1):
r = offer.instance.resources

availability = ""
if offer.availability in {
InstanceAvailability.NOT_AVAILABLE,
InstanceAvailability.NO_QUOTA,
InstanceAvailability.IDLE,
InstanceAvailability.BUSY,
}:
availability = offer.availability.value.replace("_", " ").lower()
instance = offer.instance.name
if offer.total_blocks > 1:
instance += f" ({offer.blocks}/{offer.total_blocks})"
offers.add_row(
f"{i}",
offer.backend.replace("remote", "ssh") + " (" + offer.region + ")",
r.pretty_format(include_spot=True),
instance,
f"${offer.price:.4f}".rstrip("0").rstrip("."),
availability,
style=None if i == 1 or not include_run_properties else "secondary",
)
if job_plan.total_offers > len(job_plan.offers):
offers.add_row("", "...", style="secondary")
# For replica groups, show offers from all job plans
if len(run_plan.job_plans) > 1:
# Multiple jobs - ensure fair representation of all groups
groups_with_no_offers = []
groups_with_offers = {}
total_offers_count = 0

# Collect offers per group
for jp in run_plan.job_plans:
group_name = jp.job_spec.replica_group_name or "default"
if jp.total_offers == 0:
groups_with_no_offers.append(group_name)
else:
groups_with_offers[group_name] = jp.offers
total_offers_count += jp.total_offers

# Strategy: Show at least min_per_group offers from each group, then fill with cheapest
num_groups = len(groups_with_offers)
if num_groups > 0 and max_offers:
min_per_group = max(
1, max_offers // (num_groups * 2)
) # At least 1, aim for ~half distribution
remaining_slots = max_offers
else:
min_per_group = None
remaining_slots = None

selected_offers = []

# First pass: Take min_per_group from each group (cheapest from each)
if min_per_group:
for group_name, group_offers in groups_with_offers.items():
sorted_group_offers = sorted(group_offers, key=lambda x: x.price)
take_count = min(min_per_group, len(sorted_group_offers), remaining_slots)
for offer in sorted_group_offers[:take_count]:
selected_offers.append((group_name, offer))
remaining_slots -= take_count

# Second pass: Fill remaining slots with cheapest offers globally
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel the urge to sum up a piece of code with a comment, you might often refactor it into a function with a proper name for better readability.

if remaining_slots and remaining_slots > 0:
all_remaining = []
for group_name, group_offers in groups_with_offers.items():
sorted_group_offers = sorted(group_offers, key=lambda x: x.price)
# Skip offers already selected
for offer in sorted_group_offers[min_per_group:]:
all_remaining.append((group_name, offer))

# Sort remaining by price and take the cheapest
all_remaining.sort(key=lambda x: x[1].price)
selected_offers.extend(all_remaining[:remaining_slots])

# If no max_offers limit, show all
if not max_offers:
selected_offers = []
for group_name, group_offers in groups_with_offers.items():
for offer in group_offers:
selected_offers.append((group_name, offer))

# Sort final selection by price for display
selected_offers.sort(key=lambda x: x[1].price)

# Show groups with no offers FIRST
for group_name in groups_with_no_offers:
offers.add_row(
"",
f"[cyan]{group_name}[/cyan]:",
"[red]No matching instance offers available.[/red]\n"
"Possible reasons: https://dstack.ai/docs/guides/troubleshooting/#no-offers",
"",
"",
"",
style="secondary",
)

# Then show selected offers
for i, (group_name, offer) in enumerate(selected_offers, start=1):
r = offer.instance.resources

availability = ""
if offer.availability in {
InstanceAvailability.NOT_AVAILABLE,
InstanceAvailability.NO_QUOTA,
InstanceAvailability.IDLE,
InstanceAvailability.BUSY,
}:
availability = offer.availability.value.replace("_", " ").lower()
instance = offer.instance.name
if offer.total_blocks > 1:
instance += f" ({offer.blocks}/{offer.total_blocks})"

# Add group name prefix for multi-group display
backend_display = f"[cyan]{group_name}[/cyan]: {offer.backend.replace('remote', 'ssh')} ({offer.region})"

offers.add_row(
f"{i}",
backend_display,
r.pretty_format(include_spot=True),
instance,
f"${offer.price:.4f}".rstrip("0").rstrip("."),
availability,
style=None if i == 1 or not include_run_properties else "secondary",
)

if total_offers_count > len(selected_offers):
offers.add_row("", "...", style="secondary")
else:
# Single job - original logic
job_plan.offers = job_plan.offers[:max_offers] if max_offers else job_plan.offers

for i, offer in enumerate(job_plan.offers, start=1):
r = offer.instance.resources

availability = ""
if offer.availability in {
InstanceAvailability.NOT_AVAILABLE,
InstanceAvailability.NO_QUOTA,
InstanceAvailability.IDLE,
InstanceAvailability.BUSY,
}:
availability = offer.availability.value.replace("_", " ").lower()
instance = offer.instance.name
if offer.total_blocks > 1:
instance += f" ({offer.blocks}/{offer.total_blocks})"
offers.add_row(
f"{i}",
offer.backend.replace("remote", "ssh") + " (" + offer.region + ")",
r.pretty_format(include_spot=True),
instance,
f"${offer.price:.4f}".rstrip("0").rstrip("."),
availability,
style=None if i == 1 or not include_run_properties else "secondary",
)
if job_plan.total_offers > len(job_plan.offers):
offers.add_row("", "...", style="secondary")

console.print(props)
console.print()
if len(job_plan.offers) > 0:

# Check if we have offers to display
has_offers = False
if len(run_plan.job_plans) > 1:
has_offers = any(len(jp.offers) > 0 for jp in run_plan.job_plans)
else:
has_offers = len(job_plan.offers) > 0

if has_offers:
console.print(offers)
if job_plan.total_offers > len(job_plan.offers):
console.print(
f"[secondary] Shown {len(job_plan.offers)} of {job_plan.total_offers} offers, "
f"${job_plan.max_price:3f}".rstrip("0").rstrip(".")
+ "max[/]"
)
# Show summary for multi-job plans
if len(run_plan.job_plans) > 1:
if total_offers_count > len(selected_offers):
max_price_overall = max(
(jp.max_price for jp in run_plan.job_plans if jp.max_price), default=None
)
if max_price_overall:
console.print(
f"[secondary] Shown {len(selected_offers)} of {total_offers_count} offers, "
f"${max_price_overall:3f}".rstrip("0").rstrip(".")
+ " max[/]"
)
else:
if job_plan.total_offers > len(job_plan.offers):
console.print(
f"[secondary] Shown {len(job_plan.offers)} of {job_plan.total_offers} offers, "
f"${job_plan.max_price:3f}".rstrip("0").rstrip(".")
+ " max[/]"
)
console.print()
else:
console.print(NO_OFFERS_WARNING)
Expand Down Expand Up @@ -233,8 +407,14 @@ def get_runs_table(
if verbose and latest_job_submission.inactivity_secs:
inactive_for = format_duration_multiunit(latest_job_submission.inactivity_secs)
status += f" (inactive for {inactive_for})"

job_name_parts = [f" replica={job.job_spec.replica_num}"]
if job.job_spec.replica_group_name:
job_name_parts.append(f"[cyan]group={job.job_spec.replica_group_name}[/cyan]")
job_name_parts.append(f"job={job.job_spec.job_num}")

job_row: Dict[Union[str, int], Any] = {
"NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}"
"NAME": " ".join(job_name_parts)
+ (
f" deployment={latest_job_submission.deployment_num}"
if show_deployment_num
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/core/compatibility/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType:
configuration_excludes["schedule"] = True
if profile is not None and profile.schedule is None:
profile_excludes.add("schedule")
# Exclude replica_groups for backward compatibility with older servers
if isinstance(configuration, ServiceConfiguration) and configuration.replica_groups is None:
configuration_excludes["replica_groups"] = True
configuration_excludes["repos"] = True

if configuration_excludes:
Expand Down
Loading