diff --git a/baybe/acquisition/__init__.py b/baybe/acquisition/__init__.py index baf1f15ad0..b19b74893f 100644 --- a/baybe/acquisition/__init__.py +++ b/baybe/acquisition/__init__.py @@ -3,6 +3,7 @@ from baybe.acquisition.acqfs import ( ExpectedImprovement, LogExpectedImprovement, + MultiFidelityUpperConfidenceBound, PosteriorMean, PosteriorStandardDeviation, ProbabilityOfImprovement, @@ -13,6 +14,7 @@ qLogNoisyExpectedHypervolumeImprovement, qLogNoisyExpectedImprovement, qLogNParEGO, + qMultiFidelityKnowledgeGradient, qNegIntegratedPosteriorVariance, qNoisyExpectedHypervolumeImprovement, qNoisyExpectedImprovement, @@ -30,6 +32,7 @@ EI = ExpectedImprovement qEI = qExpectedImprovement qKG = qKnowledgeGradient +qMFKG = qMultiFidelityKnowledgeGradient LogEI = LogExpectedImprovement qLogEI = qLogExpectedImprovement qNEI = qNoisyExpectedImprovement @@ -38,6 +41,7 @@ PI = ProbabilityOfImprovement qPI = qProbabilityOfImprovement UCB = UpperConfidenceBound +MFUCB = MultiFidelityUpperConfidenceBound qUCB = qUpperConfidenceBound qTS = qThompsonSampling qNEHVI = qNoisyExpectedHypervolumeImprovement @@ -47,6 +51,7 @@ ######################### Acquisition functions # Knowledge Gradient "qKnowledgeGradient", + "qMultiFidelityKnowledgeGradient", # Posterior Statistics "PosteriorMean", "PosteriorStandardDeviation", @@ -67,6 +72,7 @@ # Upper Confidence Bound "UpperConfidenceBound", "qUpperConfidenceBound", + "MultiFidelityUpperConfidenceBound", # Thompson Sampling "qThompsonSampling", # Hypervolume Improvement @@ -77,6 +83,7 @@ ######################### Abbreviations # Knowledge Gradient "qKG", + "qMFKG", # Posterior Statistics "PM", "PSTD", @@ -97,6 +104,7 @@ # Upper Confidence Bound "UCB", "qUCB", + "MFUCB", # Thompson Sampling "qTS", # Hypervolume Improvement diff --git a/baybe/acquisition/_builder.py b/baybe/acquisition/_builder.py index 8c92185752..d34303b24c 100644 --- a/baybe/acquisition/_builder.py +++ b/baybe/acquisition/_builder.py @@ -11,7 +11,13 @@ from attrs import Attribute, asdict, define, field, fields from attrs.validators import instance_of, optional from botorch.acquisition import AcquisitionFunction as BoAcquisitionFunction -from botorch.acquisition.monte_carlo import MCAcquisitionObjective +from botorch.acquisition.analytic import AnalyticAcquisitionFunction +from botorch.acquisition.base import MultiObjectiveMCAcquisitionFunction +from botorch.acquisition.cost_aware import CostAwareUtility +from botorch.acquisition.monte_carlo import ( + MCAcquisitionFunction, + MCAcquisitionObjective, +) from botorch.acquisition.objective import PosteriorTransform from botorch.models.model import Model from botorch.utils.multi_objective.box_decompositions.box_decomposition import ( @@ -22,12 +28,19 @@ from baybe.acquisition.acqfs import ( _ExpectedHypervolumeImprovement, qExpectedHypervolumeImprovement, + qKnowledgeGradient, qLogExpectedHypervolumeImprovement, + qMultiFidelityKnowledgeGradient, qNegIntegratedPosteriorVariance, qThompsonSampling, ) from baybe.acquisition.base import AcquisitionFunction, _get_botorch_acqf_class -from baybe.acquisition.utils import make_partitioning +from baybe.acquisition.cost_aware_utils import ( + make_cost_aware_utility, + wrap_cost_aware_abjective, +) +from baybe.acquisition.custom_acqfs import MultiFidelityUpperConfidenceBound +from baybe.acquisition.utils import make_MFUCB_dicts, make_partitioning from baybe.exceptions import ( IncompatibilityError, IncompleteMeasurementsError, @@ -36,7 +49,7 @@ from baybe.objectives.base import Objective from baybe.objectives.desirability import DesirabilityObjective from baybe.objectives.single import SingleTargetObjective -from baybe.searchspace.core import SearchSpace +from baybe.searchspace.core import SearchSpace, SearchSpaceCostType, SearchSpaceTaskType from baybe.surrogates.base import SurrogateProtocol from baybe.targets.binary import BinaryTarget from baybe.targets.numerical import NumericalTarget @@ -75,16 +88,22 @@ class BotorchAcquisitionArgs: # Optional, depending on the specific acquisition function being used best_f: float | None = _OPT_FIELD beta: float | None = _OPT_FIELD + costs_dict: dict[Any, tuple[float, ...]] = _OPT_FIELD + cost_aware_utility: CostAwareUtility | None = _OPT_FIELD + current_value: Tensor | None = _OPT_FIELD + fidelities_dict: dict[Any, tuple[Any, ...]] = _OPT_FIELD maximize: bool | None = _OPT_FIELD mc_points: Tensor | None = _OPT_FIELD num_fantasies: int | None = _OPT_FIELD objective: MCAcquisitionObjective | None = _OPT_FIELD partitioning: BoxDecomposition | None = _OPT_FIELD posterior_transform: PosteriorTransform | None = _OPT_FIELD + project: Callable[[Tensor], Tensor] | None = _OPT_FIELD prune_baseline: bool | None = _OPT_FIELD ref_point: Tensor | None = _OPT_FIELD X_baseline: Tensor | None = _OPT_FIELD X_pending: Tensor | None = _OPT_FIELD + zetas_dict: dict[Any, tuple[float, ...]] = _OPT_FIELD def collect(self) -> dict[str, Any]: """Collect the assigned arguments into a dictionary.""" @@ -202,6 +221,10 @@ def build(self) -> BoAcquisitionFunction: self._set_mc_points() self._set_ref_point() self._set_partitioning() + self._set_current_value() + self._set_projection() + self._set_MFUCB_dicts() + self._set_cost_aware_wrapper() botorch_acqf = self._botorch_acqf_cls(**self._args.collect()) self.set_default_sample_shape(botorch_acqf) @@ -264,6 +287,149 @@ def _set_best_f(self) -> None: case _: raise NotImplementedError("This line should be impossible to reach.") + def _set_current_value(self) -> None: + """Set current value maximising posterior mean in qMFKG.""" + if not isinstance( + self.acqf, (qMultiFidelityKnowledgeGradient, qKnowledgeGradient) + ): + return + + from botorch.optim import optimize_acqf_mixed + + if isinstance(self.acqf, qMultiFidelityKnowledgeGradient): + from botorch.acquisition import PosteriorMean + from botorch.acquisition.fixed_feature import ( + FixedFeatureAcquisitionFunction, + ) + + curr_val_acqf = FixedFeatureAcquisitionFunction( + acq_function=PosteriorMean(self._botorch_surrogate), + d=len(self.searchspace.parameters), + columns=[ + self.searchspace.fidelity_idx, + ], + values=[ + 1.0, + ], + ) + + elif isinstance(self.acqf, qKnowledgeGradient): + curr_val_acqf = PosteriorMean(self._botorch_surrogate) + + # Jordan MHS NOTE for discussion: This is a potentially fast-and-loose use of + # mixed space optimization. + candidates_comp = self.searchspace.discrete.comp_rep + num_comp_columns = len(candidates_comp.columns) + candidates_comp.columns = list(range(num_comp_columns)) # type: ignore + candidates_comp_dict = candidates_comp.to_dict("records") + + # Possible TODO. Align num_restarts and raw_samples with that defined by the + # user for the main acquisition function. + _, current_value = optimize_acqf_mixed( + acq_function=curr_val_acqf, + bounds=torch.from_numpy(self.searchspace.comp_rep_bounds.values), + fixed_features_list=candidates_comp_dict, # type: ignore[arg-type] + q=1, + num_restarts=10, + raw_samples=64, + ) + + self._args.current_value = current_value + + def _set_projection(self) -> None: + """Set projection to the target fidelity for qMFKG.""" + if not isinstance(self.acqf, (qMultiFidelityKnowledgeGradient)): + return + + # qMFKG for cost aware, single fidelity knowledge gradient + if self.searchspace.task_type != SearchSpaceTaskType.NUMERICALFIDELITY: + return + + assert self.searchspace.fidelity_idx is not None # for mypy + + # qMFKG for multi fidelity + target_fidelities = {self.searchspace.fidelity_idx: 1.0} + + num_dims = len(self.searchspace.parameters) + + def target_fidelity_projection(X: Tensor) -> Tensor: + from botorch.acquisition.utils import project_to_target_fidelity + + return project_to_target_fidelity(X, target_fidelities, num_dims) + + self._args.project = target_fidelity_projection + + def _set_MFUCB_dicts(self) -> None: + """Set value, fidelities and cost dictionaries for MFUCB.""" + if not isinstance(self.acqf, MultiFidelityUpperConfidenceBound): + return + + fidelities_dict, costs_dict, zetas_dict = make_MFUCB_dicts(self.searchspace) + + self._args.fidelities_dict = fidelities_dict + self._args.costs_dict = costs_dict + self._args.zetas_dict = zetas_dict + + def _set_cost_aware_wrapper(self) -> None: + """Set a cost aware wrapper for acquisition with costly design choices. + + Cost aware wrappers act either at the MC sample level for most Monte-Carlo + approaches or at the fantasy level for knowledge gradient. + + Note that qKnowledgeGradient is incompatible with fantasy-level cost adjustment. + For this reason, qMultiFidelityKnowledgeGradient should be used for + single-fidelity cost aware optimization. + + Raises: + NotImplementedError: cost awareness attempted for a multi objective acqf. + NotImplementedError: cost awareness attempted for an analytic acqf. + ValueError: acquisition function is qKnowledgeGradient. + ValueError: acquisition function not considered for cost awareness. + """ + # Possible TODO: train a learned cost posterior as a cost model option. + # By using objective and cost_aware_utility cost division is done at the sample + # or fantasy level so this would be a valid approach. + if self.searchspace.cost_type == SearchSpaceCostType.NOCOSTADJUSTMENT: + return + + if issubclass(self._botorch_acqf_cls, MultiObjectiveMCAcquisitionFunction): + raise NotImplementedError( + "Cost aware multi-objective Bayesian optimization is currently not " + "supported." + ) + + # Possible TODO: implement cost awareness for analytic acquisition function. + elif issubclass(self._botorch_acqf_cls, AnalyticAcquisitionFunction): + raise NotImplementedError( + "Cost aware Bayesian optimization is currently not supported for " + "analytic acquisiton functions, please use MC-based acquisition " + "functions, e.g., qEI instead of EI." + ) + + elif isinstance(self.acqf, qKnowledgeGradient): + raise ValueError( + "qKnowledgeGradient cannot be used for cost-aware " + " acquisition. Instead set the acquisition function to " + "qMultiFidelityKnowledgeGradient." + ) + + elif issubclass(self._botorch_acqf_cls, MCAcquisitionFunction): + self._args.objective = wrap_cost_aware_abjective( + self._args.objective, self._args.self.searchspace + ) + + elif isinstance(self.acqf, qMultiFidelityKnowledgeGradient): + if self._args.cost_aware_utility is None: + self._args.cost_aware_utility = make_cost_aware_utility( + self.searchspace + ) + + else: + raise ValueError( + f"Cost aware wrapping is not implemented for acquisiton function class" + f"{self._botorch_acqf_cls}." + ) + def set_default_sample_shape(self, acqf: BoAcquisitionFunction, /): """Apply temporary workaround for Thompson sampling.""" # TODO: Needs redesign once bandits are supported more generally diff --git a/baybe/acquisition/acqfs.py b/baybe/acquisition/acqfs.py index 5e3b08b1cf..eb854d98b8 100644 --- a/baybe/acquisition/acqfs.py +++ b/baybe/acquisition/acqfs.py @@ -13,7 +13,7 @@ from attr.converters import optional as optional_c from attr.validators import optional as optional_v from attrs import AttrsInstance, define, field, fields -from attrs.validators import gt, instance_of, le +from attrs.validators import ge, gt, instance_of, le from typing_extensions import override from baybe.acquisition.base import AcquisitionFunction @@ -156,6 +156,22 @@ class qKnowledgeGradient(AcquisitionFunction): memory footprint and wall time.""" +@define(frozen=True) +class qMultiFidelityKnowledgeGradient(AcquisitionFunction): + """Monte Carlo based knowledge gradient. + + This acquisition function currently only supports purely continuous spaces. + """ + + abbreviation: ClassVar[str] = "qMFKG" + + num_fantasies: int = field(validator=[instance_of(int), gt(0)], default=128) + """Number of fantasies to draw for approximating the knowledge gradient. + + More samples result in a better approximation, at the expense of both increased + memory footprint and wall time.""" + + ######################################################################################## ### Posterior Statistics @define(frozen=True) @@ -289,6 +305,31 @@ class qUpperConfidenceBound(AcquisitionFunction): """See :paramref:`UpperConfidenceBound.beta`.""" +@define(frozen=True) +class MultiFidelityUpperConfidenceBound(AcquisitionFunction): + """Two stage acquisition function of Kandasamy et al (2016). + + Stage 1: Choose design features based on argmax_x (softmin_m (UCB_m(x) + zeta_m)). + + Stage 2: Choose cheapest fidelity satisfying a cost-aware informativeness threshold. + """ + + abbreviation: ClassVar[str] = "MFUCB" + + softmin_temperature: float = field( + converter=float, validator=[finite_float, ge(0.0)], default=1e-2 + ) + """Softmin smoothing parameter.""" + + beta: float = field(converter=float, validator=finite_float, default=0.2) + """See :paramref:`UpperConfidenceBound.beta`.""" + + @override + @classproperty + def supports_batching(cls) -> bool: + return False + + ######################################################################################## ### ThompsonSampling @define(frozen=True) diff --git a/baybe/acquisition/base.py b/baybe/acquisition/base.py index 115ef1d551..013337f494 100644 --- a/baybe/acquisition/base.py +++ b/baybe/acquisition/base.py @@ -165,11 +165,14 @@ def _get_botorch_acqf_class( """Extract the BoTorch acquisition class for the given BayBE acquisition class.""" import botorch + from baybe.acquisition import custom_acqfs + for cls in baybe_acqf_cls.mro(): if ( acqf_cls := getattr(botorch.acquisition, cls.__name__, False) or getattr(botorch.acquisition.multi_objective, cls.__name__, False) or getattr(botorch.acquisition.multi_objective.parego, cls.__name__, False) + or getattr(custom_acqfs, cls.__name__, False) ): if is_abstract(acqf_cls): continue diff --git a/baybe/acquisition/cost_aware_utils.py b/baybe/acquisition/cost_aware_utils.py new file mode 100644 index 0000000000..fad0545faf --- /dev/null +++ b/baybe/acquisition/cost_aware_utils.py @@ -0,0 +1,144 @@ +"""Cost aware wrapping utilities for single-fidelity MC-based acquisiton functions.""" + +from typing import TYPE_CHECKING + +from attrs import field +from attrs.validators import instance_of +from attrs.validators import optional as optional_v +from botorch.acquisition.cost_aware import InverseCostWeightedUtility +from botorch.acquisition.monte_carlo import MCAcquisitionObjective +from botorch.models.deterministic import DeterministicModel +from typing_extensions import override + +from baybe.parameters.fidelity import NumericalDiscreteFidelityParameter +from baybe.searchspace.core import SearchSpace, SearchSpaceCostType + +if TYPE_CHECKING: + from torch import Tensor + + +# Jordan MHS TODO: typing for fidelities_dict awkward since integer values in +# comp_df not explicitly typed. Seek help here. +def make_cost_tensor(searchspace: SearchSpace, /) -> tuple[Tensor, Tensor]: + """Construct column indices, comp_df values and costs of costly parameters.""" + import torch + + if searchspace.cost_type == SearchSpaceCostType.MULTIFIDELITY: + params = ( + p + for p in searchspace.parameters + if isinstance(p, NumericalDiscreteFidelityParameter) + ) + + elif searchspace.cost_type == SearchSpaceCostType.FIXEDDISCRETECOSTS: + params = (p for p in searchspace.parameters if p.is_costly) + + values_dict = {i: tuple(p.comp_df.iloc[:, 0]) for i, p in enumerate(params)} + + costs_dict = { + i: p.costs + if getattr(p, "costs", None) is not None + else tuple(0 for _ in p.values) + for i, p in enumerate(params) + } + + num_params = len(params) + max_len = max(len(p.comp_df.iloc[:, 0]) for p in params) + + costly_indices = values_dict.keys() + + values = torch.full((num_params, max_len), float("nan")) + costs = torch.zeros((num_params, max_len)) + + for i in range(num_params): + v = torch.tensor(values_dict[i]) + c = torch.tensor(costs_dict[i]) + + values[i, : len(v)] = v + costs[i, : len(c)] = c + + return costly_indices, values, costs + + +class discrete_cost_model(DeterministicModel): + """A fixed cost model which matches discrete parameter values to their costs. + + A sum is taken over all costly parameters. + """ + + # Possible TODO: add validation that values and costs have the + # same shape and are compatible with costly_indices. + # Then move discrete_cost_model to its own file for custom + # post-search space definition discrete cost models. + costly_indices: Tensor = field(validator=instance_of(Tensor)) + """Computational representation of the parameters which are costly.""" + + values: Tensor = field(validator=instance_of(Tensor)) + """Values for each costly parameter, padded with repeats to make rectangular.""" + + costs: Tensor = field(validator=instance_of(Tensor)) + """Costs for each parameter, padded with repeats to make rectangular.""" + + @override + def foward(self, X: Tensor) -> Tensor: + """Compute costs over a set of candidate values. + + Args: + X: A `(b1 x ... bk) x q x d`-dim tensor of `d`-dim design. + + Returns: + A `(b1 x ... bk) x q`-dim tensor of of evaluated costs. + """ + X_sub = X.squeeze(-2)[..., self.costly_indices] + + matches = X_sub.unsqueeze(-1) == self.values + + cost_per_dim = (matches * self.costs).sum(dim=-1) + + return cost_per_dim.sum(dim=-1, keepdim=True) + + +class InverseCostWeightedAcquisitionObjective(MCAcquisitionObjective): + """Wrapper for computing cost aware acquisiton values at the MC sample level.""" + + base_objective: MCAcquisitionObjective | None = field( + validator=optional_v(instance_of(MCAcquisitionObjective)), default=None + ) + """Base objective to be wrapped with cost adjustment.""" + + cost_model: DeterministicModel = field( + validator=[instance_of(DeterministicModel)], + ) + """Deterministic model of design choice.""" + + def forward(self, samples: Tensor, X: Tensor | None = None) -> Tensor: + """Compute cost aware acquisition value within MC sampling.""" + cost = self.cost_model(samples) + + if self.base_objective is None: + return cost + else: + base = self.base_objective(samples, X) + return base / cost + + +def make_inverse_cost_utility( + searchspace: SearchSpace, / +) -> InverseCostWeightedUtility: + """Make an inverse cost wrapping utility from a searchspace, suitable for qMFKG.""" + costly_indices, values, costs = make_cost_tensor(searchspace) + + cost_model = discrete_cost_model(costly_indices, values, costs) + + return InverseCostWeightedUtility(cost_model=cost_model) + + +def wrap_cost_aware_objective( + objective: MCAcquisitionObjective, searchspace: SearchSpace +): + """Make an inverse cost-wrapped acquisition objective.""" + costly_indices, values, costs = make_cost_tensor(searchspace) + + cost_model = discrete_cost_model(costly_indices, values, costs) + + return InverseCostWeightedAcquisitionObjective(objective, cost_model) diff --git a/baybe/acquisition/custom_acqfs/__init__.py b/baybe/acquisition/custom_acqfs/__init__.py new file mode 100644 index 0000000000..43a27e0c2c --- /dev/null +++ b/baybe/acquisition/custom_acqfs/__init__.py @@ -0,0 +1,10 @@ +"""Custom acquisition functions.""" + +from baybe.acquisition.custom_acqfs.two_stage import ( + MultiFidelityUpperConfidenceBound, +) + +__all__ = [ + # Multi fidelity acquisition functions + "MultiFidelityUpperConfidenceBound", +] diff --git a/baybe/acquisition/custom_acqfs/mfucb.py b/baybe/acquisition/custom_acqfs/mfucb.py new file mode 100644 index 0000000000..faf5e661a1 --- /dev/null +++ b/baybe/acquisition/custom_acqfs/mfucb.py @@ -0,0 +1,282 @@ +"""Custom Botorch AnalyticAcquisitionFunction for multi-fidelity optimization.""" + +from __future__ import annotations + +from collections.abc import Callable, Mapping +from itertools import pairwise as iter_pairwise +from itertools import product as iter_product +from typing import Any + +import torch +from attrs import Attribute, define, field, fields_dict +from attrs.validators import deep_iterable, deep_mapping, ge, instance_of, or_ +from botorch.acquisition.analytic import AnalyticAcquisitionFunction +from botorch.acquisition.objective import PosteriorTransform +from botorch.models.model import Model +from botorch.utils.transforms import ( + average_over_ensemble_models, + t_batch_mode_transform, +) +from gpytorch.likelihoods import GaussianLikelihood +from torch import Tensor +from typing_extensions import override + +from baybe.parameters.validation import validate_contains_exactly_one +from baybe.utils.validation import finite_float + +_neg_inv_sqrt2 = -0.7071067811865476 +_log_sqrt_pi_div_2 = 0.2257913526447274 + + +def validate_dict_shape( + reference_name: str, / +) -> Callable[[Any, Attribute, Mapping[Any, Any]], None]: + """Make validator to check attribute keys/lengths against a reference attribute.""" + + def validator(obj: Any, attribute: Attribute, value: Mapping[Any, Any]) -> None: # noqa: DOC101, DOC103 + """Validate that the input has the same keys/lengths as the reference attribute. + + Raises: + ValueError: If the keys of the two attributes mismatch. + ValueError: If the tuple lengths of the two attributes mismatch at any key. + """ + other_attr = fields_dict(type(obj))[reference_name] + other_instance = getattr(obj, reference_name) + + if not ( + different_keys := set(value.keys()).symmetric_difference( + set(other_instance.keys()) + ) + ): + raise ValueError( + f"{attribute.name} and {other_attr.alias} differ in keys in " + f"{obj.name}, with the following {different_keys} in only one." + ) + + for k, tup in value.items(): + other_tup = other_instance[k] + + if len(tup) != len(other_tup): + raise ValueError( + f"The lengths of the attributes '{other_attr.alias}' and " + f"'{attribute.alias}' do not match for '{obj.name}' at the key {k}." + f"Length of '{other_attr.alias}' at key {k}: {len(other_tup)}. " + f"Length of '{attribute.alias}' at key {k}: {len(tup)}." + ) + + return validator + + +@define +class MultiFidelityUpperConfidenceBound(AnalyticAcquisitionFunction): + r"""Two-stage Multi Fidelity Upper Confidence Bound (UCB). + + First stage selects the design parameter choice through a discrepancy-parameter + adjusted upper confidence bound. Selection is done by gradient-based optimization + of a softmin over each fidelity-adjusted UCB. + Second stage makes a cost-aware decision of the fidelity parameter to be queried, by + searching through each fidelity at the chosen design parameter, which balances cost + of querying with fidelity-specific UCB. + + Only supports the case of `q=1` (i.e. greedy, non-batch + selection of design points). The model must be single-outcome. + """ + + # Declaring attribute types for variables defined via _register_buffer. + fidelity_columns: Tensor + fidelity_combinations: Tensor + zetas_comb: Tensor + costs_comb: Tensor + + model: Model = field(validator=instance_of(Model)) + """A fitted single-outcome GP model.""" + + beta: float | Tensor = field(validator=or_(instance_of(float), instance_of(Tensor))) + """Trade-off parameter between mean and covariance.""" + + fidelities: dict[int, tuple[float, ...]] = field( + validator=deep_mapping( + key_validator=instance_of(int), + value_validator=deep_iterable( + member_validator=instance_of(float), + iterable_validator=instance_of(tuple), + ), + mapping_validator=instance_of(dict), + ) + ) + """Computational representation of fidelity values.""" + + costs: dict[int, tuple[float, ...]] = field( + validator=deep_mapping( + key_validator=instance_of(int), + value_validator=deep_iterable( + member_validator=(instance_of(float), ge(0.0)), + iterable_validator=( + instance_of(tuple), + validate_contains_exactly_one(0.0), + ), + ), + mapping_validator=(instance_of(dict), validate_dict_shape("fidelities")), + ) + ) + """Cost of querying each fidelity parameter at each fidelity. Costs between + fidelity parameters are summed. + """ + + zetas: dict[int, tuple[float, ...]] = field( + validator=deep_mapping( + key_validator=instance_of(int), + value_validator=deep_iterable( + member_validator=(instance_of(float), ge(0.0)), + iterable_validator=( + instance_of(tuple), + validate_contains_exactly_one(0.0), + ), + ), + mapping_validator=(instance_of(dict), validate_dict_shape("fidelities")), + ) + ) + """Maximum absolute discrepancy between each fidelity and the + highest fidelity output. + """ + + softmin_temperature: float = field( + converter=float, validator=[finite_float, ge(0.0)], default=1e-2 + ) + """Smoothing parameter for gradient-based optimization of the design.""" + + posterior_transform: PosteriorTransform | None = field(default=None) + """PosteriorTransform used to convert multi-output posteriors to + single-output posteriors if necessary. + """ + + maximize: bool = field(default=True) + """If True, treat the problem as a maximization problem.""" + + def __post_attrs_init__(self) -> None: + super().__init__(model=self.model, posterior_transform=self.posterior_transform) + + self.register_buffer("beta", torch.as_tensor(self.beta)) + + self.register_buffer( + "softmin_temperature", torch.as_tensor(self.softmin_temperature) + ) + + self.register_buffer( + "fidelity_columns", + torch.tensor(list(self.fidelities.keys()), dtype=torch.long), + ) + + self.register_buffer( + "fidelity_combinations", + torch.tensor( + list(iter_product(*self.fidelities.values())), dtype=torch.double + ), + ) + + self.register_buffer( + "zetas_comb", + torch.tensor(list(iter_product(*self.zetas.values())), dtype=torch.double), + ) + + self.register_buffer( + "costs_comb", + torch.tensor(list(iter_product(*self.costs.values())), dtype=torch.double), + ) + + # Jordan MHS NOTE: mypy typing errors for these decorators with on + # subclasses of AcquistionFunction appear in Botorch as well as here. + @override + @t_batch_mode_transform(expected_q=1) # type: ignore + @average_over_ensemble_models # type: ignore + def forward(self, X: Tensor) -> Tensor: + r"""First optimization stage: choose optimal design design to query. + + Args: + X: A `(b1 x ... bk) x 1 x d`-dim tensor of `d`-dim design/fidelity points. + + Returns: + A `(b1 x ... bk)`-dim tensor of Upper Confidence Bound values at the + given design and fidelity points `X`. + """ + batch_size, q, d = X.shape + + n_comb, k = self.fidelity_combinations.shape + + X_extended = X.clone().unsqueeze(1).repeat(1, n_comb, 1, 1) + X_extended[..., :, self.fidelity_columns] = self.fidelity_combinations.view( + 1, n_comb, 1, k + ) + + zetas_comb_sum = self.zetas_comb.sum(dim=-1) + zetas_comb_sum = zetas_comb_sum.view(1, n_comb, 1, 1) + zetas_extended = zetas_comb_sum.expand(batch_size, n_comb, q, 1) + + X_eval = X_extended.reshape(batch_size * n_comb, q, d) + means, sigmas = self._mean_and_sigma(X_eval) + + means = means.view(batch_size, n_comb, q, 1) + # Jordan MHS NOTE: typing workaround to ignore possibility for botorch + # AnalyticAcquisitionFunction _mean_and_sigma to have compute_sigma=False. + sigmas = sigmas.view(batch_size, n_comb, q, 1) # type: ignore + + sign = 1 if self.maximize else -1 + indiv_ucbs = sign * means + (self.beta**0.5) * sigmas + zetas_extended + + ucb_mins, _ = indiv_ucbs.min(dim=1, keepdim=True) + + T = self.softmin_temperature + + acq_values = ( + ( + -T + * torch.log(torch.sum(torch.exp(-(indiv_ucbs - ucb_mins) / T), dim=1)) + + ucb_mins.squeeze(-1) + ) + .squeeze(-1) + .squeeze(-1) + ) + + return acq_values + + def optimize_stage_two(self, X: Tensor) -> Tensor: + r"""Second optimisation stage: choose optimal fidelity to query.""" + if isinstance(self.model.likelihood, GaussianLikelihood): + aleatoric_uncertainty = torch.sqrt(self.model.likelihood.noise) + else: + aleatoric_uncertainty = torch.tensor(0.0) + + found_suitable_lower_fid = False + + total_costs_comb = self.costs_comb.sum(dim=-1) + increasing_cost_order = torch.argsort(total_costs_comb) + + for prev_i, curr_i in iter_pairwise(increasing_cost_order): + prev_fid = self.fidelity_combinations[prev_i].clone() + prev_cost = self.costs_comb.sum(dim=-1)[prev_i] + curr_cost = self.costs_comb.sum(dim=-1)[curr_i] + prev_zeta = self.zetas_comb.sum(dim=-1)[prev_i] + + X_prev_fid = X.clone() + X_prev_fid[:, self.fidelity_columns] = prev_fid + + _, prev_posterior_uncertainty = self._mean_and_sigma(X_prev_fid) + + # Jordan MHS NOTE: workaround poor typing in Botorch. + # _mean_and_sigma always returns two values unless the argument + # compute_sigma is set to False. + assert prev_posterior_uncertainty is not None, "This shouldn't be accesible" + + if (self.beta**0.5) * prev_posterior_uncertainty >= ( + aleatoric_uncertainty + prev_zeta + ) * torch.sqrt(prev_cost / curr_cost): + found_suitable_lower_fid = True + optimal_X = X_prev_fid + break + + if not found_suitable_lower_fid: + optimal_X = X.clone() + last_fid = self.fidelity_combinations[curr_i].clone() + optimal_X[:, self.fidelity_columns] = last_fid + + return optimal_X diff --git a/baybe/acquisition/utils.py b/baybe/acquisition/utils.py index 5c504fa389..e3c9eb40b3 100644 --- a/baybe/acquisition/utils.py +++ b/baybe/acquisition/utils.py @@ -2,9 +2,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from baybe.acquisition.base import AcquisitionFunction +from baybe.parameters import CategoricalFidelityParameter if TYPE_CHECKING: from botorch.utils.multi_objective.box_decompositions.box_decomposition import ( @@ -12,6 +13,8 @@ ) from torch import Tensor + from baybe.searchspace import SearchSpace + def str_to_acqf(name: str, /) -> AcquisitionFunction: """Create an ACQF object from a given ACQF name.""" @@ -82,3 +85,38 @@ def make_partitioning( return FastNondominatedPartitioning(ref_point=ref_point, Y=predictions) return NondominatedPartitioning(ref_point=ref_point, Y=predictions, alpha=alpha) + + +# Jordan MHS TODO: typing for fidelities_dict awkward since integer values in +# comp_df not explicitly typed. Seek help here. +def make_MFUCB_dicts( + searchspace: SearchSpace, / +) -> tuple[ + dict[Any, tuple[Any, ...]], + dict[int, tuple[float, ...]], + dict[int, tuple[float, ...]], +]: + """Construct column indices and values of costs, fidelities and values for MFUCB.""" + fidelity_params = ( + p for p in searchspace.parameters if isinstance(p, CategoricalFidelityParameter) + ) + + fidelities_dict = { + i: tuple(p.comp_df.iloc[:, 0]) for i, p in enumerate(fidelity_params) + } + + costs_dict = { + i: p.costs + if getattr(p, "costs", None) is not None + else tuple(0 for _ in p.values) + for i, p in enumerate(fidelity_params) + } + + zetas_dict = { + i: p.zeta + if getattr(p, "zeta", None) is not None + else tuple(0 for _ in p.values) + for i, p in enumerate(fidelity_params) + } + + return fidelities_dict, costs_dict, zetas_dict diff --git a/baybe/kernels/__init__.py b/baybe/kernels/__init__.py index 9323a2b631..4e50c5728c 100644 --- a/baybe/kernels/__init__.py +++ b/baybe/kernels/__init__.py @@ -5,6 +5,7 @@ """ from baybe.kernels.basic import ( + IndexKernel, LinearKernel, MaternKernel, PeriodicKernel, @@ -18,6 +19,7 @@ __all__ = [ "AdditiveKernel", + "IndexKernel", "LinearKernel", "MaternKernel", "PeriodicKernel", diff --git a/baybe/parameters/base.py b/baybe/parameters/base.py index 8fbd489a41..7e454df52e 100644 --- a/baybe/parameters/base.py +++ b/baybe/parameters/base.py @@ -85,6 +85,14 @@ def _kind(self) -> _ParameterKind: return _ParameterKind.from_parameter(self) + @property + def is_costly(self) -> bool: + """Does the parameter have costs specified for each value. + + Used only for design parameters, not fidelity parameters. + """ + return False + @property @abstractmethod def comp_rep_columns(self) -> tuple[str, ...]: diff --git a/baybe/parameters/categorical.py b/baybe/parameters/categorical.py index bffecd5682..1cdaf6f8df 100644 --- a/baybe/parameters/categorical.py +++ b/baybe/parameters/categorical.py @@ -3,15 +3,22 @@ import gc from functools import cached_property +import cattrs import numpy as np import pandas as pd from attrs import Converter, define, field -from attrs.validators import deep_iterable, instance_of, min_len +from attrs.converters import optional as optional_c +from attrs.validators import deep_iterable, ge, instance_of, min_len +from attrs.validators import optional as optional_v from typing_extensions import override from baybe.parameters.base import _DiscreteLabelLikeParameter from baybe.parameters.enum import CategoricalEncoding -from baybe.parameters.validation import validate_unique_values +from baybe.parameters.validation import ( + validate_equal_length, + validate_is_finite, + validate_unique_values, +) from baybe.settings import active_settings from baybe.utils.conversion import nonstring_to_tuple @@ -54,12 +61,30 @@ class CategoricalParameter(_DiscreteLabelLikeParameter): ) # See base class. + costs: tuple[float, ...] | None = field( + converter=optional_c(lambda x: cattrs.structure(x, tuple[float, ...])), + validator=optional_v( + [ + validate_equal_length("_values"), + validate_is_finite, + deep_iterable(member_validator=ge(0.0)), + ] + ), + ) + """Fixed cost of querying the parameter at each value.""" + @override @property def values(self) -> tuple: """The values of the parameter.""" return self._values + @override + @property + def is_costly(self) -> bool: + """Does the parameter have costs specified for each value.""" + return self.costs is not None + @override @cached_property def comp_df(self) -> pd.DataFrame: diff --git a/baybe/parameters/fidelity.py b/baybe/parameters/fidelity.py index c230e940fd..dbb17292d8 100644 --- a/baybe/parameters/fidelity.py +++ b/baybe/parameters/fidelity.py @@ -88,7 +88,7 @@ class CategoricalFidelityParameter(_DiscreteLabelLikeParameter): discrepancy ``zeta``, 2 * ``zeta``, and so on.""" def __attrs_post_init__(self) -> None: - """Sort attribute values according to lexographic fidelity values.""" + """Sort attribute values according to lexicographic fidelity values.""" # Because categories can be str or bool, we sort by (type, value) idx = sorted( range(len(self._values)), @@ -103,6 +103,28 @@ def __attrs_post_init__(self) -> None: def values(self) -> tuple[str | bool, ...]: return self._values + @property + def highest_fidelity(self) -> str: + """The fidelity with discrepancy value of zero.""" + highest_fid = next( + value for value, zeta in zip(self.values, self.zeta) if zeta == 0 + ) + + assert isinstance(highest_fid, str) # for mypy + + return highest_fid + + @property + def highest_fidelity_cost(self) -> int: + """Cost of querying the fidelity with discrepancy value of zero.""" + highest_cost = next( + cost for cost, zeta in zip(self.costs, self.zeta) if zeta == 0 + ) + + assert isinstance(highest_cost, int) # for mypy + + return highest_cost + @override @cached_property def comp_df(self) -> pd.DataFrame: diff --git a/baybe/parameters/numerical.py b/baybe/parameters/numerical.py index 2032bc6eb3..8b14d8d68e 100644 --- a/baybe/parameters/numerical.py +++ b/baybe/parameters/numerical.py @@ -8,12 +8,18 @@ import numpy as np import pandas as pd from attrs import define, field -from attrs.validators import min_len +from attrs.converters import optional as optional_c +from attrs.validators import deep_iterable, ge, min_len +from attrs.validators import optional as optional_v from typing_extensions import override from baybe.exceptions import NumericalUnderflowError from baybe.parameters.base import ContinuousParameter, DiscreteParameter -from baybe.parameters.validation import validate_is_finite, validate_unique_values +from baybe.parameters.validation import ( + validate_equal_length, + validate_is_finite, + validate_unique_values, +) from baybe.settings import active_settings from baybe.utils.interval import InfiniteIntervalError, Interval @@ -41,6 +47,18 @@ class NumericalDiscreteParameter(DiscreteParameter): ) """The values the parameter can take.""" + costs: tuple[float, ...] | None = field( + converter=optional_c(lambda x: cattrs.structure(x, tuple[float, ...])), + validator=optional_v( + [ + validate_equal_length("_values"), + validate_is_finite, + deep_iterable(member_validator=ge(0.0)), + ] + ), + ) + """Fixed cost of querying the parameter at each value.""" + tolerance: float = field(default=0.0) """The absolute tolerance used for deciding whether a value is in range. A tolerance larger than half the minimum distance between parameter values is not allowed @@ -89,6 +107,12 @@ def _validate_tolerance( # noqa: DOC101, DOC103 def values(self) -> tuple: return tuple(active_settings.DTypeFloatNumpy(itm) for itm in self._values) + @override + @property + def is_costly(self) -> bool: + """Does the parameter have costs specified for each value.""" + return self.costs is not None + @override @cached_property def comp_df(self) -> pd.DataFrame: diff --git a/baybe/recommenders/pure/bayesian/base.py b/baybe/recommenders/pure/bayesian/base.py index db163a8556..dca2afad44 100644 --- a/baybe/recommenders/pure/bayesian/base.py +++ b/baybe/recommenders/pure/bayesian/base.py @@ -11,7 +11,7 @@ from attrs.converters import optional from typing_extensions import override -from baybe.acquisition import qLogEI, qLogNEHVI +from baybe.acquisition import MFUCB, qLogEI, qLogNEHVI, qMFKG from baybe.acquisition.base import AcquisitionFunction from baybe.acquisition.utils import convert_acqf from baybe.exceptions import ( @@ -19,7 +19,8 @@ ) from baybe.objectives.base import Objective from baybe.recommenders.pure.base import PureRecommender -from baybe.searchspace import SearchSpace +from baybe.recommenders.pure.bayesian.utils import restricted_fidelity_searchspace +from baybe.searchspace import SearchSpace, SearchSpaceTaskType from baybe.settings import Settings from baybe.surrogates import GaussianProcessSurrogate from baybe.surrogates.base import ( @@ -67,9 +68,17 @@ class BayesianRecommender(PureRecommender, ABC): _botorch_acqf = field(default=None, init=False, eq=False) """The induced BoTorch acquisition function.""" - def _get_acquisition_function(self, objective: Objective) -> AcquisitionFunction: + def _get_acquisition_function( + self, objective: Objective, searchspace: SearchSpace + ) -> AcquisitionFunction: """Select the appropriate default acquisition function for the given context.""" if self.acquisition_function is None: + if searchspace.task_type == SearchSpaceTaskType.NUMERICALFIDELITY: + return qMFKG() + + elif searchspace.task_type == SearchSpaceTaskType.CATEGORICALFIDELITY: + return MFUCB() + return qLogNEHVI() if objective.is_multi_output else qLogEI() return self.acquisition_function @@ -93,7 +102,7 @@ def _setup_botorch_acqf( ) -> None: """Create the acquisition function for the current training data.""" # noqa: E501 self._objective = objective - acqf = self._get_acquisition_function(objective) + acqf = self._get_acquisition_function(objective, searchspace) if objective.is_multi_output and not acqf.supports_multi_output: raise IncompatibleAcquisitionFunctionError( @@ -166,10 +175,14 @@ def recommend( self._setup_botorch_acqf( searchspace, objective, measurements, pending_experiments ) + acqf = self._get_acquisition_function(objective, searchspace) try: with Settings(preprocess_dataframes=False): - return super().recommend( + if isinstance(acqf, MFUCB): + searchspace = restricted_fidelity_searchspace(searchspace) + + recommendation = super().recommend( batch_size=batch_size, searchspace=searchspace, objective=objective, @@ -196,6 +209,12 @@ def recommend( else: raise + return ( + recommendation + if not isinstance(acqf, MFUCB) + else self._botorch_acqf.optimize_stage_two(recommendation) + ) + def acquisition_values( self, candidates: pd.DataFrame, @@ -225,7 +244,9 @@ def acquisition_values( A series of individual acquisition values, one for each candidate. """ surrogate = self.get_surrogate(searchspace, objective, measurements) - acqf = acquisition_function or self._get_acquisition_function(objective) + acqf = acquisition_function or self._get_acquisition_function( + objective, searchspace + ) return acqf.evaluate( candidates, surrogate, @@ -253,7 +274,9 @@ def joint_acquisition_value( # noqa: DOC101, DOC103 The joint acquisition value of the batch. """ surrogate = self.get_surrogate(searchspace, objective, measurements) - acqf = acquisition_function or self._get_acquisition_function(objective) + acqf = acquisition_function or self._get_acquisition_function( + objective, searchspace + ) return acqf.evaluate( candidates, surrogate, diff --git a/baybe/recommenders/pure/bayesian/botorch/core.py b/baybe/recommenders/pure/bayesian/botorch/core.py index 7953d5ca74..eb5290cfcb 100644 --- a/baybe/recommenders/pure/bayesian/botorch/core.py +++ b/baybe/recommenders/pure/bayesian/botorch/core.py @@ -203,10 +203,13 @@ def _recommend_continuous( Returns: A dataframe containing the recommendations as individual rows. """ + searchspace = SearchSpace(continuous=subspace_continuous) assert self._objective is not None if ( batch_size > 1 - and not self._get_acquisition_function(self._objective).supports_batching + and not self._get_acquisition_function( + self._objective, searchspace + ).supports_batching ): raise IncompatibleAcquisitionFunctionError( f"The '{self.__class__.__name__}' only works with Monte Carlo " diff --git a/baybe/recommenders/pure/bayesian/botorch/discrete.py b/baybe/recommenders/pure/bayesian/botorch/discrete.py index a5f92d04d0..572ae63782 100644 --- a/baybe/recommenders/pure/bayesian/botorch/discrete.py +++ b/baybe/recommenders/pure/bayesian/botorch/discrete.py @@ -9,7 +9,7 @@ import numpy.typing as npt import pandas as pd -from baybe.searchspace import SubspaceDiscrete +from baybe.searchspace import SearchSpace, SubspaceDiscrete from baybe.utils.dataframe import to_tensor if TYPE_CHECKING: @@ -106,7 +106,8 @@ def recommend_discrete_without_subsets( ) assert recommender._objective is not None - acqf = recommender._get_acquisition_function(recommender._objective) + searchspace = SearchSpace(discrete=subspace_discrete) + acqf = recommender._get_acquisition_function(recommender._objective, searchspace) if batch_size > 1 and not acqf.supports_batching: raise IncompatibleAcquisitionFunctionError( f"The '{recommender.__class__.__name__}' only works with Monte Carlo " diff --git a/baybe/recommenders/pure/bayesian/botorch/hybrid.py b/baybe/recommenders/pure/bayesian/botorch/hybrid.py index 0b017a92dd..c08bea8ca0 100644 --- a/baybe/recommenders/pure/bayesian/botorch/hybrid.py +++ b/baybe/recommenders/pure/bayesian/botorch/hybrid.py @@ -71,7 +71,7 @@ def recommend_hybrid_without_subsets( if ( batch_size > 1 and not recommender._get_acquisition_function( - recommender._objective + recommender._objective, searchspace ).supports_batching ): raise IncompatibleAcquisitionFunctionError( diff --git a/baybe/recommenders/pure/bayesian/utils.py b/baybe/recommenders/pure/bayesian/utils.py new file mode 100644 index 0000000000..be71e7014a --- /dev/null +++ b/baybe/recommenders/pure/bayesian/utils.py @@ -0,0 +1,31 @@ +"""Utils for Bayesian recommenders.""" + +from attrs import evolve + +from baybe.parameters import CategoricalFidelityParameter +from baybe.searchspace import SearchSpace + + +def restricted_fidelity_searchspace(searchspace: SearchSpace, /) -> SearchSpace: + """Evolve a multi-fidelity searchspace so the fidelity is fixed to the highest.""" + discrete_parameters_fixed_fidelities = tuple( + evolve( + p, + values=(p.highest_fidelity,), + costs=(p.highest_fidelity_cost,), + zeta=(0.0,), + ) + if isinstance(p, CategoricalFidelityParameter) + else p + for p in searchspace.discrete.parameters + ) + + discrete_subspace_fixed_fidelities = evolve( + searchspace.discrete, parameters=discrete_parameters_fixed_fidelities + ) + + fixed_fidelity_searchspace = evolve( + searchspace, discrete=discrete_subspace_fixed_fidelities + ) + + return fixed_fidelity_searchspace diff --git a/baybe/searchspace/__init__.py b/baybe/searchspace/__init__.py index d78f7fafee..3f5d61fa9f 100644 --- a/baybe/searchspace/__init__.py +++ b/baybe/searchspace/__init__.py @@ -3,6 +3,7 @@ from baybe.searchspace.continuous import SubspaceContinuous from baybe.searchspace.core import ( SearchSpace, + SearchSpaceTaskType, SearchSpaceType, validate_searchspace_from_config, ) @@ -11,6 +12,7 @@ __all__ = [ "validate_searchspace_from_config", "SearchSpace", + "SearchSpaceTaskType", "SearchSpaceType", "SubspaceDiscrete", "SubspaceContinuous", diff --git a/baybe/searchspace/core.py b/baybe/searchspace/core.py index 8d75cae927..fc340080b6 100644 --- a/baybe/searchspace/core.py +++ b/baybe/searchspace/core.py @@ -19,6 +19,10 @@ from baybe.exceptions import InfeasibilityError from baybe.parameters import TaskParameter from baybe.parameters.base import Parameter +from baybe.parameters.fidelity import ( + CategoricalFidelityParameter, + NumericalDiscreteFidelityParameter, +) from baybe.searchspace.continuous import SubspaceContinuous from baybe.searchspace.discrete import ( MemorySize, @@ -55,6 +59,42 @@ class SearchSpaceType(Enum): """Flag for hybrid search spaces resp. compatibility with hybrid search spaces.""" +class SearchSpaceTaskType(Enum): + """Enum class for different types of task and/or fidelity subspaces.""" + + SINGLETASK = "SINGLETASK" + """Flag for search spaces with a single task, meaning no task parameter.""" + + CATEGORICALMULTITASK = "CATEGORICALMULTITASK" + """Flag for search spaces with a categorical task parameter.""" + + +class SearchSpaceFidelityType(Enum): + """Enum class for different types of task and/or fidelity subspaces.""" + + SINGLEFIDELITY = "SINGLEFIDELITY" + """Flag for search spaces with a single fidelity, meaning no fidelity parameter.""" + + NUMERICALDISCRETEMULTIFIDELITY = "NUMERICALDISCRETEMULTIFIDELITY" + """Flag for search spaces with a discrete numerical (ordered) fidelity parameter.""" + + CATEGORICALMULTIFIDELITY = "CATEGORICALMULTIFIDELITY" + """Flag for search spaces with a categorical (unordered) fidelity parameter.""" + + +class SearchSpaceCostType(Enum): + """Enum class for different types of task and/or fidelity subspaces.""" + + NOCOSTADJUSTMENT = "NOCOSTADJUSTMENT" + """Flag for searchspace with no cost model adjustment.""" + + FIXEDDISCRETECOSTS = "FIXEDDISCRETECOSTS" + """Flag for single-fidelity searchspace with discrete parameters of known cost.""" + + MULTIFIDELITY = "MULTIFIDELITY" + """Flag for searchspaces with a fidelity parameter.""" + + @define class SearchSpace(SerialMixin): """Class for managing the overall search space. @@ -268,12 +308,31 @@ def _task_parameter(self) -> TaskParameter | None: assert len(params) == 1 # currently ensured by parameter validation step return params[0] + @property + def _fidelity_parameter( + self, + ) -> NumericalDiscreteFidelityParameter | CategoricalFidelityParameter | None: + """The (single) fidelity parameter of the space, if it exists.""" + # Currently private, see comment above + fidelity_parameters = ( + NumericalDiscreteFidelityParameter, + CategoricalFidelityParameter, + ) + + params = [p for p in self.parameters if isinstance(p, fidelity_parameters)] + + if not params: + return None + + assert len(params) == 1 # currently ensured by parameter validation step + return params[0] + @property def task_idx(self) -> int | None: - """The column index of the task parameter in computational representation.""" + """Column index of the task parameter in computational representation.""" if (task_param := self._task_parameter) is None: return None - # TODO[11611]: The current approach has three limitations: + # TODO [11611]: The current approach has three limitations: # 1. It matches by column name and thus assumes that the parameter name # is used as the column name. # 2. It relies on the current implementation detail that discrete parameters @@ -282,6 +341,14 @@ def task_idx(self) -> int | None: # --> Fix this when refactoring the data return cast(int, self.discrete.comp_rep.columns.get_loc(task_param.name)) + @property + def fidelity_idx(self) -> int | None: + """Column index of the fidelity parameter in computational representation.""" + if (fidelity_param := self._fidelity_parameter) is None: + return None + # See TODO [11611] above + return cast(int, self.discrete.comp_rep.columns.get_loc(fidelity_param.name)) + @property def n_tasks(self) -> int: """The number of tasks encoded in the search space.""" @@ -294,6 +361,61 @@ def n_tasks(self) -> int: return 1 return len(task_param.values) + @property + def n_fidelities(self) -> int: + """The number of fidelities encoded in the search space.""" + # See TODO [16932] above + if (fidelity_param := self._fidelity_parameter) is None: + # When there are no task parameters, we effectively have a single task + return 1 + return len(fidelity_param.values) + + @property + def is_design_cost_aware(self): + """Boolean indicating whether cost-aware parameters present.""" + params = [p for p in self.parameters if p.is_costly] + + return params is not None + + @property + def task_type(self) -> SearchSpaceTaskType: + """Return the task type of the search space.""" + if self._task_parameter is None: + return SearchSpaceTaskType.SINGLETASK + return SearchSpaceTaskType.CATEGORICALMULTITASK + + @property + def fidelity_type(self) -> SearchSpaceFidelityType: + """Return the fidelity type of the search space.""" + if (fidelity_param := self._fidelity_parameter) is None: + return SearchSpaceFidelityType.SINGLEFIDELITY + if isinstance(fidelity_param, CategoricalFidelityParameter): + return SearchSpaceFidelityType.CATEGORICALMULTIFIDELITY + if isinstance(fidelity_param, NumericalDiscreteFidelityParameter): + return SearchSpaceFidelityType.NUMERICALDISCRETEMULTIFIDELITY + raise RuntimeError("This line should be impossible to reach.") + + def cost_type(self): + """Type of cost adjustment associated with the SearchSpace.""" + fidelity_task_types = ( + SearchSpaceTaskType.CATEGORICALFIDELITY, + SearchSpaceTaskType.NUMERICALFIDELITY, + ) + + if self.task_type in fidelity_task_types: + if self.is_design_cost_aware: + raise ValueError( + "Search space must not contain costly design" + "parameters and fidelity parameters." + ) + else: + return SearchSpaceCostType.MULTIFIDELITY + + if self.is_design_cost_aware: + return SearchSpaceCostType.FIXEDDISCRETECOSTS + else: + return SearchSpaceCostType.NOCOSTADJUSTMENT + @property def n_subsets(self) -> int: """Total number of subset configurations. @@ -499,7 +621,7 @@ def transform( @property def constraints_augmentable(self) -> tuple[Constraint, ...]: - """The searchspace constraints that can be considered during augmentation.""" + """The search space constraints that can be considered during augmentation.""" return tuple(c for c in self.constraints if c.eval_during_augmentation) def get_parameters_by_name(self, names: Sequence[str]) -> tuple[Parameter, ...]: diff --git a/baybe/surrogates/bandit.py b/baybe/surrogates/bandit.py index ad6563cc43..f003f44c64 100644 --- a/baybe/surrogates/bandit.py +++ b/baybe/surrogates/bandit.py @@ -3,7 +3,7 @@ from __future__ import annotations import gc -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any from attrs import define, field from typing_extensions import override @@ -29,9 +29,6 @@ class BetaBernoulliMultiArmedBanditSurrogate(Surrogate): """A multi-armed bandit model with Bernoulli likelihood and beta prior.""" - supports_transfer_learning: ClassVar[bool] = False - # See base class. - prior: BetaPrior = field(factory=lambda: BetaPrior(1, 1)) """The beta prior for the win rates of the bandit arms. Uniform by default.""" diff --git a/baybe/surrogates/base.py b/baybe/surrogates/base.py index 597f22a140..8812c8211c 100644 --- a/baybe/surrogates/base.py +++ b/baybe/surrogates/base.py @@ -82,10 +82,14 @@ def to_botorch(self) -> Model: class Surrogate(ABC, SurrogateProtocol, SerialMixin): """Abstract base class for all surrogate models.""" - supports_transfer_learning: ClassVar[bool] + supports_transfer_learning: ClassVar[bool] = False """Class variable encoding whether or not the surrogate supports transfer learning.""" + supports_multi_fidelity: ClassVar[bool] = False + """Class variable encoding whether or not the surrogate supports multi fidelity + Bayesian optimization.""" + supports_multi_output: ClassVar[bool] = False """Class variable encoding whether or not the surrogate is multi-output compatible.""" @@ -431,6 +435,14 @@ def fit( f"support transfer learning." ) + # Check if multi fidelity capabilities are needed + if (searchspace.n_fidelities > 1) and (not self.supports_multi_fidelity): + raise ValueError( + f"The search space contains fidelity parameters but the selected " + f"surrogate model type ({self.__class__.__name__}) does not " + f"support multi fidelity Bayesian optimisation." + ) + # Block partial measurements handle_missing_values(measurements, [t.name for t in objective.targets]) @@ -476,6 +488,11 @@ def __str__(self) -> str: self.supports_transfer_learning, single_line=True, ), + to_string( + "Supports Multi Fidelity", + self.supports_multi_fidelity, + single_line=True, + ), ] return to_string(self.__class__.__name__, *fields) diff --git a/baybe/surrogates/custom.py b/baybe/surrogates/custom.py index 83ad0b4df4..6fbb481690 100644 --- a/baybe/surrogates/custom.py +++ b/baybe/surrogates/custom.py @@ -11,7 +11,7 @@ from __future__ import annotations import gc -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any import cattrs from attrs import define, field, validators @@ -57,9 +57,6 @@ class CustomONNXSurrogate(IndependentGaussianSurrogate): Note that these surrogates cannot be retrained. """ - supports_transfer_learning: ClassVar[bool] = False - # See base class. - onnx_input_name: str = field(validator=validators.instance_of(str)) """The input name used for constructing the ONNX str.""" diff --git a/baybe/surrogates/gaussian_process/components/kernel.py b/baybe/surrogates/gaussian_process/components/kernel.py index 1ea78aae58..ecde69299b 100644 --- a/baybe/surrogates/gaussian_process/components/kernel.py +++ b/baybe/surrogates/gaussian_process/components/kernel.py @@ -19,6 +19,7 @@ from baybe.objectives.base import Objective from baybe.parameters.categorical import TaskParameter from baybe.parameters.enum import _ParameterKind +from baybe.parameters.fidelity import CategoricalFidelityParameter from baybe.parameters.selectors import ( ParameterSelectorProtocol, TypeSelector, @@ -264,7 +265,10 @@ def _default_base_kernel_factory(self) -> KernelFactoryProtocol: ) return _BayBENumericalKernelFactory( - TypeSelector((TaskParameter,), exclude=True) + TypeSelector( + (TaskParameter, CategoricalFidelityParameter), + exclude=True, + ) ) @task_kernel_factory.default @@ -273,7 +277,9 @@ def _default_task_kernel_factory(self) -> KernelFactoryProtocol: _BayBETaskKernelFactory, ) - return _BayBETaskKernelFactory() + return _BayBETaskKernelFactory( + TypeSelector((TaskParameter, CategoricalFidelityParameter)) + ) @base_kernel_factory.validator def _validate_base_kernel_factory(self, _, factory: KernelFactoryProtocol): diff --git a/baybe/surrogates/gaussian_process/core.py b/baybe/surrogates/gaussian_process/core.py index cec34a32a5..52dec213de 100644 --- a/baybe/surrogates/gaussian_process/core.py +++ b/baybe/surrogates/gaussian_process/core.py @@ -8,18 +8,15 @@ from functools import partial from typing import TYPE_CHECKING, ClassVar -import pandas as pd from attrs import Converter, define, field from attrs.converters import pipe -from attrs.validators import instance_of, is_callable +from attrs.validators import is_callable from typing_extensions import Self, override from baybe.exceptions import DeprecationError from baybe.kernels.base import Kernel -from baybe.objectives.base import Objective from baybe.parameters.base import Parameter from baybe.parameters.categorical import TaskParameter -from baybe.searchspace.core import SearchSpace from baybe.surrogates.base import Surrogate from baybe.surrogates.gaussian_process.components.fit_criterion import ( FitCriterion, @@ -46,6 +43,7 @@ BayBELikelihoodFactory, BayBEMeanFactory, ) +from baybe.surrogates.gaussian_process.utils import _ModelContext from baybe.utils.boolean import strtobool from baybe.utils.conversion import to_string @@ -60,57 +58,6 @@ from torch import Tensor -@define -class _ModelContext: - """Model context for :class:`GaussianProcessSurrogate`.""" - - searchspace: SearchSpace = field(validator=instance_of(SearchSpace)) - """The search space the model is trained on.""" - - objective: Objective = field(validator=instance_of(Objective)) - """The objective for which the model is trained.""" - - measurements: pd.DataFrame = field(validator=instance_of(pd.DataFrame)) - """The training data in experimental representation.""" - - @property - def task_idx(self) -> int | None: - """The computational column index of the task parameter, if available.""" - return self.searchspace.task_idx - - @property - def is_multitask(self) -> bool: - """Indicates if model is to be operated in a multi-task context.""" - return self.n_task_dimensions > 0 - - @property - def n_task_dimensions(self) -> int: - """The number of task dimensions.""" - # TODO: Generalize to multiple task parameters - return 1 if self.task_idx is not None else 0 - - @property - def n_tasks(self) -> int: - """The number of tasks.""" - return self.searchspace.n_tasks - - @property - def parameter_bounds(self) -> Tensor: - """Get the search space parameter bounds in BoTorch Format.""" - import torch - - return torch.from_numpy(self.searchspace.scaling_bounds.to_numpy(copy=True)) - - @property - def numerical_indices(self) -> list[int]: - """The indices of the regular numerical model inputs.""" - return [ - i - for i in range(len(self.searchspace.comp_rep_columns)) - if i != self.task_idx - ] - - def _mark_custom_kernel( value: Kernel | KernelFactoryProtocol, self: GaussianProcessSurrogate ) -> Kernel | KernelFactoryProtocol: @@ -143,6 +90,9 @@ class GaussianProcessSurrogate(Surrogate): supports_transfer_learning: ClassVar[bool] = True # See base class. + supports_multi_fidelity = True + # See base class. + _custom_kernel: bool = field(init=False, default=False, repr=False, eq=False) # For deprecation only! diff --git a/baybe/surrogates/gaussian_process/multi_fidelity.py b/baybe/surrogates/gaussian_process/multi_fidelity.py new file mode 100644 index 0000000000..6e5fa3cdef --- /dev/null +++ b/baybe/surrogates/gaussian_process/multi_fidelity.py @@ -0,0 +1,101 @@ +"""Multi-fidelity Gaussian process surrogates.""" + +from __future__ import annotations + +import gc +from typing import TYPE_CHECKING, ClassVar + +from attrs import define, field +from typing_extensions import override + +from baybe.parameters.base import Parameter +from baybe.surrogates.base import Surrogate +from baybe.surrogates.gaussian_process.utils import _ModelContext + +if TYPE_CHECKING: + from botorch.models.gpytorch import GPyTorchModel + from botorch.models.transforms.input import InputTransform + from botorch.models.transforms.outcome import OutcomeTransform + from botorch.posteriors import Posterior + from torch import Tensor + + +@define +class GaussianProcessSurrogateSTMF(Surrogate): + """Botorch's single task multi fidelity Gaussian process.""" + + supports_multi_fidelity: ClassVar[bool] = True + # See base class. + + _model = field(init=False, default=None, eq=False) + """The actual model.""" + + @override + def to_botorch(self) -> GPyTorchModel: + return self._model + + @override + @staticmethod + def _make_parameter_scaler_factory( + parameter: Parameter, + ) -> type[InputTransform] | None: + return None + + @override + @staticmethod + def _make_target_scaler_factory() -> type[OutcomeTransform] | None: + # For GPs, we let botorch handle the scaling. See [Scaling Workaround] above. + return None + + @override + def _posterior(self, candidates_comp_scaled: Tensor, /) -> Posterior: + return self._model.posterior(candidates_comp_scaled) + + @override + def _fit(self, train_x: Tensor, train_y: Tensor) -> None: + import botorch + import gpytorch + + assert self._searchspace is not None + + context = _ModelContext(self._searchspace) + + assert context.n_fidelity_dimensions > 0, ( + f"{self.__class__.__name__} can only be fit on multi fidelity searchspaces." + ) + + # For GPs, we let botorch handle the scaling. See [Scaling Workaround] above. + input_transform = botorch.models.transforms.Normalize( # type: ignore[attr-defined] + train_x.shape[-1], + bounds=context.parameter_bounds, + indices=context.numerical_indices, + ) + outcome_transform = botorch.models.transforms.Standardize(train_y.shape[-1]) # type: ignore[attr-defined] + + # construct and fit the Gaussian process + self._model = botorch.models.SingleTaskMultiFidelityGP( + train_x, + train_y, + input_transform=input_transform, + outcome_transform=outcome_transform, + data_fidelities=None + if context.fidelity_idx is None + else (context.fidelity_idx,), + ) + + mll = gpytorch.ExactMarginalLogLikelihood(self._model.likelihood, self._model) + + botorch.fit.fit_gpytorch_mll(mll) + + @override + def __str__(self) -> str: + return ( + "Wrapper for a" + ":class:`~botorch.models.gp_regression_fidelity.SingleTaskMultiFidelityGP`," + "used as the default GP for discrete numerical fidelity parameters in," + "e.g., multi fidelity knowledge gradient." + ) + + +# Collect leftover original slotted classes processed by `attrs.define` +gc.collect() diff --git a/baybe/surrogates/gaussian_process/utils.py b/baybe/surrogates/gaussian_process/utils.py new file mode 100644 index 0000000000..ca20037e88 --- /dev/null +++ b/baybe/surrogates/gaussian_process/utils.py @@ -0,0 +1,79 @@ +"""Gaussian process utilities.""" + +from __future__ import annotations + +import gc +from typing import TYPE_CHECKING + +from attrs import define, field +from attrs.validators import instance_of + +from baybe.searchspace.core import SearchSpace + +if TYPE_CHECKING: + from torch import Tensor + + +@define +class _ModelContext: + """Model context for Gaussian process surrogates.""" + + searchspace: SearchSpace = field(validator=instance_of(SearchSpace)) + """The search space the model is trained on.""" + + @property + def task_idx(self) -> int | None: + """The computational column index of the task parameter, if available.""" + return self.searchspace.task_idx + + @property + def is_multitask(self) -> bool: + """Indicates if model is to be operated in a multi-task context.""" + return self.n_task_dimensions > 0 + + @property + def n_task_dimensions(self) -> int: + """The number of task dimensions.""" + # TODO: Generalize to multiple task parameters + return 1 if self.task_idx is not None else 0 + + @property + def n_tasks(self) -> int: + """The number of tasks.""" + return self.searchspace.n_tasks + + @property + def n_fidelity_dimensions(self) -> int: + """The number of fidelity dimensions.""" + # TODO: Generalize to multiple fidelity parameters + return 1 if self.searchspace.fidelity_idx is not None else 0 + + @property + def fidelity_idx(self) -> int | None: + """The computational column index of the fidelity parameter, if available.""" + return self.searchspace.fidelity_idx + + @property + def n_fidelities(self) -> int: + """The number of fidelities.""" + return self.searchspace.n_fidelities + + @property + def parameter_bounds(self) -> Tensor: + """Get the search space parameter bounds in BoTorch Format.""" + import torch + + return torch.from_numpy(self.searchspace.scaling_bounds.values) + + @property + def numerical_indices(self) -> list[int]: + """The indices of the regular numerical model inputs.""" + return [ + i + for i in range(len(self.searchspace.comp_rep_columns)) + if i not in (self.task_idx, self.fidelity_idx) + ] + + +# Collect leftover original slotted classes processed by `attrs.define` +gc.collect() diff --git a/baybe/surrogates/linear.py b/baybe/surrogates/linear.py index 99d847f0cd..adea6d7e61 100644 --- a/baybe/surrogates/linear.py +++ b/baybe/surrogates/linear.py @@ -3,7 +3,7 @@ from __future__ import annotations import gc -from typing import TYPE_CHECKING, ClassVar, TypedDict +from typing import TYPE_CHECKING, TypedDict from attrs import define, field from typing_extensions import override @@ -41,9 +41,6 @@ class _ARDRegressionParams(TypedDict, total=False): class BayesianLinearSurrogate(IndependentGaussianSurrogate): """A Bayesian linear regression surrogate model.""" - supports_transfer_learning: ClassVar[bool] = False - # See base class. - model_params: _ARDRegressionParams = field( factory=dict, converter=dict, diff --git a/baybe/surrogates/naive.py b/baybe/surrogates/naive.py index 3912c6b128..ef7d624c5b 100644 --- a/baybe/surrogates/naive.py +++ b/baybe/surrogates/naive.py @@ -3,7 +3,7 @@ from __future__ import annotations import gc -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING from attrs import define, field from typing_extensions import override @@ -23,9 +23,6 @@ class MeanPredictionSurrogate(IndependentGaussianSurrogate): as posterior mean and a (data-independent) constant posterior variance. """ - supports_transfer_learning: ClassVar[bool] = False - # See base class. - _model: float | None = field(init=False, default=None, eq=False) """The estimated posterior mean value of the training targets.""" diff --git a/baybe/surrogates/ngboost.py b/baybe/surrogates/ngboost.py index 5d3c6dcc3b..fc6df88755 100644 --- a/baybe/surrogates/ngboost.py +++ b/baybe/surrogates/ngboost.py @@ -46,9 +46,6 @@ class _NGBRegressorParams(TypedDict, total=False): class NGBoostSurrogate(IndependentGaussianSurrogate): """A natural-gradient-boosting surrogate model.""" - supports_transfer_learning: ClassVar[bool] = False - # See base class. - _default_model_params: ClassVar[dict] = {"n_estimators": 25, "verbose": False} """Class variable encoding the default model parameters.""" diff --git a/baybe/surrogates/random_forest.py b/baybe/surrogates/random_forest.py index 69f8588a0d..1c2f4da069 100644 --- a/baybe/surrogates/random_forest.py +++ b/baybe/surrogates/random_forest.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections.abc import Collection -from typing import TYPE_CHECKING, ClassVar, Literal, Protocol, TypedDict +from typing import TYPE_CHECKING, Literal, Protocol, TypedDict import numpy as np import numpy.typing as npt @@ -61,9 +61,6 @@ def predict(self, x: np.ndarray, /) -> np.ndarray: ... class RandomForestSurrogate(Surrogate): """A random forest surrogate model.""" - supports_transfer_learning: ClassVar[bool] = False - # See base class. - model_params: _RandomForestRegressorParams = field( factory=dict, converter=dict,