diff --git a/src/guidellm/__main__.py b/src/guidellm/__main__.py index 7cba6a7c..ff67fbf1 100644 --- a/src/guidellm/__main__.py +++ b/src/guidellm/__main__.py @@ -160,9 +160,21 @@ def benchmark(): "For rate-type=sweep, this is the number of benchmarks it runs in the sweep. " "For rate-type=concurrent, this is the number of concurrent requests. " "For rate-type=async,constant,poisson, this is the rate requests per second. " - "For rate-type=synchronous,throughput, this must not be set." + "For rate-type=synchronous,throughput,steps, this must not be set." ), ) +@click.option( + "--steps-duration", + type=str, + default=None, + help="Comma-separated list of durations for each step in seconds. Only used with --rate-type=steps.", +) +@click.option( + "--steps-rate", + type=str, + default=None, + help="Comma-separated list of rates for each step in requests per second. Only used with --rate-type=steps.", +) @click.option( "--max-seconds", type=float, @@ -260,6 +272,8 @@ def run( data_sampler, rate_type, rate, + steps_duration, + steps_rate, max_seconds, max_requests, warmup_percent, @@ -287,6 +301,8 @@ def run( data_sampler=data_sampler, rate_type=rate_type, rate=rate, + steps_duration=steps_duration, + steps_rate=steps_rate, max_seconds=max_seconds, max_requests=max_requests, warmup_percent=warmup_percent, diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 2ef85c3e..8562bcd4 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -51,6 +51,8 @@ async def benchmark_generative_text( data_sampler: Optional[Literal["random"]], rate_type: Union[StrategyType, ProfileType], rate: Optional[Union[float, list[float]]], + steps_duration: Optional[list[int]], + steps_rate: Optional[list[float]], max_seconds: Optional[float], max_requests: Optional[int], warmup_percent: Optional[float], @@ -97,7 +99,13 @@ async def benchmark_generative_text( else f"Created loader with unknown number unique requests from {data}.\n\n" ) - profile = create_profile(rate_type=rate_type, rate=rate) + profile = create_profile( + rate_type=rate_type, + rate=rate, + steps_duration=steps_duration, + steps_rate=steps_rate, + random_seed=random_seed, + ) benchmarker = GenerativeBenchmarker( backend=backend, request_loader=request_loader, diff --git a/src/guidellm/benchmark/profile.py b/src/guidellm/benchmark/profile.py index 642cb7a8..1229b403 100644 --- a/src/guidellm/benchmark/profile.py +++ b/src/guidellm/benchmark/profile.py @@ -11,6 +11,7 @@ AsyncPoissonStrategy, ConcurrentStrategy, SchedulingStrategy, + StepsStrategy, StrategyType, SynchronousStrategy, ThroughputStrategy, @@ -24,10 +25,13 @@ "SweepProfile", "SynchronousProfile", "ThroughputProfile", + "StepsProfile", "create_profile", ] -ProfileType = Literal["synchronous", "concurrent", "throughput", "async", "sweep"] +ProfileType = Literal[ + "synchronous", "concurrent", "throughput", "async", "sweep", "steps" +] class Profile(StandardBaseModel): @@ -363,9 +367,60 @@ def from_standard_args( # type: ignore[override] return SweepProfile(sweep_size=int(rate), random_seed=random_seed, **kwargs) +class StepsProfile(Profile): + type_: Literal["steps"] = "steps" # type: ignore[assignment] + + steps_duration: list[int] + steps_rate: list[float] + + @property + def strategy_types(self) -> list[StrategyType]: + return [self.type_] + + def next_strategy(self) -> Optional[SchedulingStrategy]: + if self.completed_strategies >= 1: + return None + + return StepsStrategy( + steps_duration=self.steps_duration, + steps_rate=self.steps_rate, + ) + + @staticmethod + def from_standard_args( + rate_type: Union[StrategyType, ProfileType], + rate: Optional[Union[float, Sequence[float]]], + steps_duration: Optional[list[int]], + steps_rate: Optional[list[float]], + **kwargs, + ) -> "StepsProfile": + if rate_type != "steps": + raise ValueError("Rate type must be 'steps' for steps profile.") + if rate is not None: + raise ValueError( + "Rate does not apply to steps profile, it must be set to None." + ) + if not steps_duration: + raise ValueError("steps_duration must be provided for steps profile.") + if not steps_rate: + raise ValueError("steps_rate must be provided for steps profile.") + if len(steps_duration) != len(steps_rate): + raise ValueError("steps_duration and steps_rate must have the same length.") + + if kwargs: + raise ValueError("No additional arguments are allowed for steps profile.") + + return StepsProfile( + steps_duration=steps_duration, + steps_rate=steps_rate, + ) + + def create_profile( rate_type: Union[StrategyType, ProfileType], rate: Optional[Union[float, Sequence[float]]], + steps_duration: Optional[list[int]] = None, + steps_rate: Optional[list[float]] = None, random_seed: int = 42, **kwargs, ) -> "Profile": @@ -406,4 +461,13 @@ def create_profile( **kwargs, ) + if rate_type == "steps": + return StepsProfile.from_standard_args( + rate_type=rate_type, + rate=rate, + steps_duration=steps_duration, + steps_rate=steps_rate, + **kwargs, + ) + raise ValueError(f"Invalid profile type: {rate_type}") diff --git a/src/guidellm/benchmark/scenario.py b/src/guidellm/benchmark/scenario.py index af43e426..c3a680ef 100644 --- a/src/guidellm/benchmark/scenario.py +++ b/src/guidellm/benchmark/scenario.py @@ -4,7 +4,14 @@ from typing import Annotated, Any, Literal, Optional, TypeVar, Union from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict -from pydantic import BeforeValidator, Field, NonNegativeInt, PositiveFloat, PositiveInt +from pydantic import ( + BeforeValidator, + Field, + NonNegativeInt, + PositiveFloat, + PositiveInt, + model_validator, +) from transformers.tokenization_utils_base import ( # type: ignore[import] PreTrainedTokenizerBase, ) @@ -46,6 +53,29 @@ def parse_float_list(value: Union[str, float, list[float]]) -> list[float]: ) from err +def parse_int_list(value: Union[str, float, list[float]]) -> Optional[list[float]]: + """ + Parse a comma separated string to a list of float + or convert single float list of one or pass float + list through. + """ + if value is None: + return None + if isinstance(value, (int, float)): + return [float(value)] + elif isinstance(value, list): + return [float(v) for v in value] + + values = value.split(",") if "," in value else [value] + + try: + return [float(val) for val in values] + except ValueError as err: + raise ValueError( + "must be a number or comma-separated list of numbers." + ) from err + + T = TypeVar("T", bound="Scenario") @@ -96,9 +126,31 @@ class Config: rate: Annotated[ Optional[list[PositiveFloat]], BeforeValidator(parse_float_list) ] = None + steps_duration: Annotated[ + Optional[list[PositiveFloat]], BeforeValidator(parse_int_list) + ] = None + steps_rate: Annotated[ + Optional[list[PositiveFloat]], BeforeValidator(parse_float_list) + ] = None max_seconds: Optional[PositiveFloat] = None max_requests: Optional[PositiveInt] = None warmup_percent: Annotated[Optional[float], Field(gt=0, le=1)] = None cooldown_percent: Annotated[Optional[float], Field(gt=0, le=1)] = None output_sampling: Optional[NonNegativeInt] = None random_seed: int = 42 + + @model_validator(mode="after") + def validate_steps(self) -> "GenerativeTextScenario": + if self.rate_type == "steps": + if not self.steps_duration: + raise ValueError("steps_duration is required for rate_type 'steps'") + if not self.steps_rate: + raise ValueError("steps_rate is required for rate_type 'steps'") + if len(self.steps_duration) != len(self.steps_rate): + raise ValueError( + "steps_duration and steps_rate must have the same length" + ) + + self.max_seconds = sum(self.steps_duration) + + return self diff --git a/src/guidellm/logger.py b/src/guidellm/logger.py index 527d66ff..e4a2d8c8 100644 --- a/src/guidellm/logger.py +++ b/src/guidellm/logger.py @@ -72,7 +72,7 @@ def configure_logger(config: LoggingSettings = settings.logging): sys.stdout, level=config.console_log_level.upper(), format="{time:YY-MM-DD HH:mm:ss}|{level: <8} \ - |{name}:{function}:{line} - {message}" + |{name}:{function}:{line} - {message}", ) if config.log_file or config.log_file_level: diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index d3aa0aab..35870a4d 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -11,6 +11,7 @@ ConcurrentStrategy, SchedulingStrategy, StrategyType, + StepsStrategy, SynchronousStrategy, ThroughputStrategy, strategy_display_str, @@ -39,6 +40,7 @@ "SchedulerRunInfo", "SchedulingStrategy", "StrategyType", + "StepsStrategy", "SynchronousStrategy", "ThroughputStrategy", "WorkerDescription", diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 74d19266..02d5e2e6 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -21,11 +21,14 @@ "StrategyType", "SynchronousStrategy", "ThroughputStrategy", + "StepsStrategy", "strategy_display_str", ] -StrategyType = Literal["synchronous", "concurrent", "throughput", "constant", "poisson"] +StrategyType = Literal[ + "synchronous", "concurrent", "throughput", "constant", "poisson", "steps" +] class SchedulingStrategy(StandardBaseModel): @@ -481,6 +484,63 @@ def request_times(self) -> Generator[float, None, None]: yield init_time +class StepsStrategy(ThroughputStrategy): + """ + A class representing a steps scheduling strategy. + This strategy schedules requests asynchronously at different rates for different durations. + + :param type_: The steps StrategyType to schedule requests asynchronously. + :param steps_duration: A list of durations for each step in seconds. + :param steps_rate: A list of rates for each step in requests per second. + """ + + type_: Literal["steps"] = "steps" # type: ignore[assignment] + + steps_duration: list[int] = Field( + description="A list of durations for each step in seconds.", + ) + steps_rate: list[float] = Field( + description="A list of rates for each step in requests per second.", + ) + + def request_times(self) -> Generator[float, None, None]: + """ + A generator that yields timestamps for when requests should be sent. + This method schedules requests asynchronously at different rates for different durations. + + :return: A generator that yields timestamps for request scheduling. + """ + step_start_time = self.start_time + + for i, duration in enumerate(self.steps_duration): + rate = self.steps_rate[i] + step_end_time = step_start_time + duration + + if rate <= 0: + # If rate is 0, we just wait for the duration of the step. + # No requests are yielded. + step_start_time = step_end_time + continue + + increment = 1.0 / rate + + # initial burst for the step + burst_count = math.floor(rate) + for _ in range(burst_count): + yield step_start_time + + request_time = step_start_time + increment + counter = 0 + while True: + next_request_time = request_time + increment * counter + if next_request_time >= step_end_time: + break + yield next_request_time + counter += 1 + + step_start_time = step_end_time + + def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> str: strategy_type = strategy if isinstance(strategy, str) else strategy.type_ strategy_instance = strategy if isinstance(strategy, SchedulingStrategy) else None