From 35b09770d7272b3df472f4422fcbf75e27c76ad9 Mon Sep 17 00:00:00 2001 From: Omswastik-11 Date: Sun, 4 Jan 2026 17:51:30 +0530 Subject: [PATCH 1/4] reduce complexity of un_on_flow func --- openml/runs/functions.py | 358 +++++++++++++++++++++++++++++---------- 1 file changed, 271 insertions(+), 87 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 666b75c37..f301c4ba5 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -7,7 +7,7 @@ from collections import OrderedDict from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, NamedTuple import numpy as np import pandas as pd @@ -54,6 +54,242 @@ ERROR_CODE = 512 +def _validate_flow_and_task_inputs( + flow: OpenMLFlow | OpenMLTask, + task: OpenMLTask | OpenMLFlow, + flow_tags: list[str] | None, + avoid_duplicate_runs: bool | None, +) -> tuple[OpenMLFlow, OpenMLTask, bool]: + """Validate and normalize inputs for flow and task execution. + + Parameters + ---------- + flow : OpenMLFlow or OpenMLTask + The flow object (may be swapped with task for backward compatibility). + task : OpenMLTask or OpenMLFlow + The task object (may be swapped with flow for backward compatibility). + flow_tags : List[str] or None + A list of tags that the flow should have at creation. + avoid_duplicate_runs : bool or None + Whether to check for duplicate runs on the server. + + Returns + ------- + Tuple[OpenMLFlow, OpenMLTask, bool] + The validated flow, task, and avoid_duplicate_runs flag. + + Raises + ------ + ValueError + If flow_tags is not a list or task is not published. + """ + if flow_tags is not None and not isinstance(flow_tags, list): + raise ValueError("flow_tags should be a list") + + if avoid_duplicate_runs is None: + avoid_duplicate_runs = openml.config.avoid_duplicate_runs + + # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). + # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). + if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): + # We want to allow either order of argument (to avoid confusion). + warnings.warn( + "The old argument order (Flow, model) is deprecated and " + "will not be supported in the future. Please use the " + "order (model, Flow).", + DeprecationWarning, + stacklevel=3, + ) + task, flow = flow, task + + if not isinstance(flow, OpenMLFlow): + raise TypeError("Flow must be OpenMLFlow after validation") + + if not isinstance(task, OpenMLTask): + raise TypeError("Task must be OpenMLTask after validation") + + if task.task_id is None: + raise ValueError("The task should be published at OpenML") + + return flow, task, avoid_duplicate_runs + + +def _sync_flow_with_server( + flow: OpenMLFlow, + task: OpenMLTask, + *, + upload_flow: bool, + avoid_duplicate_runs: bool, +) -> int | None: + """Synchronize flow with server and check for duplicate runs. + + Parameters + ---------- + flow : OpenMLFlow + The flow to synchronize. + task : OpenMLTask + The task to check for duplicate runs. + upload_flow : bool + Whether to upload the flow if it doesn't exist. + avoid_duplicate_runs : bool + Whether to check for duplicate runs. + + Returns + ------- + int or None + The flow_id if synced with server, None otherwise. + + Raises + ------ + PyOpenMLError + If flow_id mismatch or flow doesn't exist when expected. + OpenMLRunsExistError + If duplicate runs exist and avoid_duplicate_runs is True. + """ + # We only need to sync with the server right now if we want to upload the flow, + # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. + flow_id = None + if upload_flow or avoid_duplicate_runs: + flow_id = flow_exists(flow.name, flow.external_version) + if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: + if flow_id is not False: + raise PyOpenMLError( + f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", + ) + raise PyOpenMLError( + "Flow does not exist on the server, but 'flow.flow_id' is not None." + ) + if upload_flow and flow_id is False: + flow.publish() + flow_id = flow.flow_id + elif flow_id: + flow_from_server = get_flow(flow_id) + _copy_server_fields(flow_from_server, flow) + if avoid_duplicate_runs: + flow_from_server.model = flow.model + setup_id = setup_exists(flow_from_server) + task_id = task.task_id + if task_id is None: + raise ValueError("The task should be published at OpenML") + ids = run_exists(task_id, setup_id) + if ids: + error_message = ( + "One or more runs of this setup were already performed on the task." + ) + raise OpenMLRunsExistError(ids, error_message) + else: + # Flow does not exist on server and we do not want to upload it. + # No sync with the server happens. + flow_id = None + + return flow_id + + +def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]: + """Prepare run environment information and tags. + + Parameters + ---------- + flow : OpenMLFlow + The flow to get version information from. + + Returns + ------- + Tuple[List[str], List[str]] + A tuple of (tags, run_environment). + """ + run_environment = flow.extension.get_version_information() + tags = ["openml-python", run_environment[1]] + return tags, run_environment + + +class _RunResults(NamedTuple): + data_content: list[list] + trace: OpenMLRunTrace | None + fold_evaluations: OrderedDict[str, OrderedDict] + sample_evaluations: OrderedDict[str, OrderedDict] + + +def _create_run_from_results( # noqa: PLR0913 + *, + task: OpenMLTask, + flow: OpenMLFlow, + flow_id: int | None, + results: _RunResults, + tags: list[str], + run_environment: list[str], + upload_flow: bool, + avoid_duplicate_runs: bool, +) -> OpenMLRun: + """Create an OpenMLRun object from execution results. + + Parameters + ---------- + task : OpenMLTask + The task that was executed. + flow : OpenMLFlow + The flow that was executed. + flow_id : int or None + The flow ID if synced with server. + data_content : List[List] + The prediction data content. + trace : OpenMLRunTrace or None + The execution trace if available. + fold_evaluations : OrderedDict + The fold-based evaluation measures. + sample_evaluations : OrderedDict + The sample-based evaluation measures. + tags : List[str] + Tags to attach to the run. + run_environment : List[str] + Environment information. + upload_flow : bool + Whether the flow was uploaded. + avoid_duplicate_runs : bool + Whether duplicate runs were checked. + + Returns + ------- + OpenMLRun + The created run object. + """ + dataset = task.get_dataset() + fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] + generated_description = "\n".join(fields) + + task_id = task.task_id + if task_id is None: + raise ValueError("The task should be published at OpenML") + + run = OpenMLRun( + task_id=task_id, + flow_id=flow_id, + dataset_id=dataset.dataset_id, + model=flow.model, + flow_name=flow.name, + tags=tags, + trace=results.trace, + data_content=results.data_content, + flow=flow, + setup_string=flow.extension.create_setup_string(flow.model), + description_text=generated_description, + ) + + if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: + # We only extract the parameter settings if a sync happened with the server. + # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. + # Otherwise, we will do this at upload time. + run.parameter_settings = flow.extension.obtain_parameter_values(flow) + + # now we need to attach the detailed evaluations + if task.task_type_id == TaskType.LEARNING_CURVE: + run.sample_evaluations = results.sample_evaluations + else: + run.fold_evaluations = results.fold_evaluations + + return run + + # TODO(eddiebergman): Could potentially overload this but # it seems very big to do so def run_model_on_task( # noqa: PLR0913 @@ -175,7 +411,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask: return run -def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 +def run_flow_on_task( # noqa: PLR0913 flow: OpenMLFlow, task: OpenMLTask, avoid_duplicate_runs: bool | None = None, @@ -222,71 +458,28 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 run : OpenMLRun Result of the run. """ - if flow_tags is not None and not isinstance(flow_tags, list): - raise ValueError("flow_tags should be a list") - - if avoid_duplicate_runs is None: - avoid_duplicate_runs = openml.config.avoid_duplicate_runs - - # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). - # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). - if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): - # We want to allow either order of argument (to avoid confusion). - warnings.warn( - "The old argument order (Flow, model) is deprecated and " - "will not be supported in the future. Please use the " - "order (model, Flow).", - DeprecationWarning, - stacklevel=2, - ) - task, flow = flow, task - - if task.task_id is None: - raise ValueError("The task should be published at OpenML") + # 1. Validate inputs + flow, task, avoid_duplicate_runs = _validate_flow_and_task_inputs( + flow, task, flow_tags, avoid_duplicate_runs + ) + # 2. Prepare the model if flow.model is None: flow.model = flow.extension.flow_to_model(flow) - flow.model = flow.extension.seed_model(flow.model, seed=seed) - # We only need to sync with the server right now if we want to upload the flow, - # or ensure no duplicate runs exist. Otherwise it can be synced at upload time. - flow_id = None - if upload_flow or avoid_duplicate_runs: - flow_id = flow_exists(flow.name, flow.external_version) - if isinstance(flow.flow_id, int) and flow_id != flow.flow_id: - if flow_id is not False: - raise PyOpenMLError( - f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'", - ) - raise PyOpenMLError( - "Flow does not exist on the server, but 'flow.flow_id' is not None." - ) - if upload_flow and flow_id is False: - flow.publish() - flow_id = flow.flow_id - elif flow_id: - flow_from_server = get_flow(flow_id) - _copy_server_fields(flow_from_server, flow) - if avoid_duplicate_runs: - flow_from_server.model = flow.model - setup_id = setup_exists(flow_from_server) - ids = run_exists(task.task_id, setup_id) - if ids: - error_message = ( - "One or more runs of this setup were already performed on the task." - ) - raise OpenMLRunsExistError(ids, error_message) - else: - # Flow does not exist on server and we do not want to upload it. - # No sync with the server happens. - flow_id = None - - dataset = task.get_dataset() + # 3. Sync with server and check for duplicates + flow_id = _sync_flow_with_server( + flow, + task, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, + ) - run_environment = flow.extension.get_version_information() - tags = ["openml-python", run_environment[1]] + # 4. Prepare run environment + tags, run_environment = _prepare_run_environment(flow) + # 5. Check if model is already fitted if flow.extension.check_if_model_fitted(flow.model): warnings.warn( "The model is already fitted! This might cause inconsistency in comparison of results.", @@ -294,8 +487,8 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 stacklevel=2, ) - # execute the run - res = _run_task_get_arffcontent( + # 6. Execute the run (parallel processing happens here) + data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent( model=flow.model, task=task, extension=flow.extension, @@ -303,35 +496,26 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913 n_jobs=n_jobs, ) - data_content, trace, fold_evaluations, sample_evaluations = res - fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] - generated_description = "\n".join(fields) - run = OpenMLRun( - task_id=task.task_id, - flow_id=flow_id, - dataset_id=dataset.dataset_id, - model=flow.model, - flow_name=flow.name, - tags=tags, - trace=trace, + results = _RunResults( data_content=data_content, - flow=flow, - setup_string=flow.extension.create_setup_string(flow.model), - description_text=generated_description, + trace=trace, + fold_evaluations=fold_evaluations, + sample_evaluations=sample_evaluations, ) - if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None: - # We only extract the parameter settings if a sync happened with the server. - # I.e. when the flow was uploaded or we found it in the avoid_duplicate check. - # Otherwise, we will do this at upload time. - run.parameter_settings = flow.extension.obtain_parameter_values(flow) - - # now we need to attach the detailed evaluations - if task.task_type_id == TaskType.LEARNING_CURVE: - run.sample_evaluations = sample_evaluations - else: - run.fold_evaluations = fold_evaluations + # 7. Create run from results + run = _create_run_from_results( + task=task, + flow=flow, + flow_id=flow_id, + results=results, + tags=tags, + run_environment=run_environment, + upload_flow=upload_flow, + avoid_duplicate_runs=avoid_duplicate_runs, + ) + # 8. Log completion message if flow_id: message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}" else: From 93aa8770680ca808f1845146c985d5016252da96 Mon Sep 17 00:00:00 2001 From: Omswastik-11 Date: Tue, 13 Jan 2026 12:30:48 +0530 Subject: [PATCH 2/4] refactor the helping functions for un_on_flow func Signed-off-by: Omswastik-11 --- openml/runs/functions.py | 68 +++++++++++++++------------------------- 1 file changed, 25 insertions(+), 43 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index f301c4ba5..21cec1edc 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -7,7 +7,7 @@ from collections import OrderedDict from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, Any, NamedTuple +from typing import TYPE_CHECKING, Any, cast import numpy as np import pandas as pd @@ -58,8 +58,7 @@ def _validate_flow_and_task_inputs( flow: OpenMLFlow | OpenMLTask, task: OpenMLTask | OpenMLFlow, flow_tags: list[str] | None, - avoid_duplicate_runs: bool | None, -) -> tuple[OpenMLFlow, OpenMLTask, bool]: +) -> tuple[OpenMLFlow, OpenMLTask]: """Validate and normalize inputs for flow and task execution. Parameters @@ -70,13 +69,11 @@ def _validate_flow_and_task_inputs( The task object (may be swapped with flow for backward compatibility). flow_tags : List[str] or None A list of tags that the flow should have at creation. - avoid_duplicate_runs : bool or None - Whether to check for duplicate runs on the server. Returns ------- - Tuple[OpenMLFlow, OpenMLTask, bool] - The validated flow, task, and avoid_duplicate_runs flag. + Tuple[OpenMLFlow, OpenMLTask] + The validated flow and task. Raises ------ @@ -86,9 +83,6 @@ def _validate_flow_and_task_inputs( if flow_tags is not None and not isinstance(flow_tags, list): raise ValueError("flow_tags should be a list") - if avoid_duplicate_runs is None: - avoid_duplicate_runs = openml.config.avoid_duplicate_runs - # TODO: At some point in the future do not allow for arguments in old order (changed 6-2018). # Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019). if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow): @@ -111,7 +105,7 @@ def _validate_flow_and_task_inputs( if task.task_id is None: raise ValueError("The task should be published at OpenML") - return flow, task, avoid_duplicate_runs + return flow, task def _sync_flow_with_server( @@ -121,7 +115,7 @@ def _sync_flow_with_server( upload_flow: bool, avoid_duplicate_runs: bool, ) -> int | None: - """Synchronize flow with server and check for duplicate runs. + """Synchronize flow with server and check if setup/task combination is already present. Parameters ---------- @@ -203,23 +197,18 @@ def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]: return tags, run_environment -class _RunResults(NamedTuple): - data_content: list[list] - trace: OpenMLRunTrace | None - fold_evaluations: OrderedDict[str, OrderedDict] - sample_evaluations: OrderedDict[str, OrderedDict] - - def _create_run_from_results( # noqa: PLR0913 - *, task: OpenMLTask, flow: OpenMLFlow, flow_id: int | None, - results: _RunResults, + data_content: list[list], + trace: OpenMLRunTrace | None, + fold_evaluations: OrderedDict[str, OrderedDict], + sample_evaluations: OrderedDict[str, OrderedDict], tags: list[str], run_environment: list[str], - upload_flow: bool, - avoid_duplicate_runs: bool, + upload_flow: bool, # noqa: FBT001 + avoid_duplicate_runs: bool, # noqa: FBT001 ) -> OpenMLRun: """Create an OpenMLRun object from execution results. @@ -257,19 +246,15 @@ def _create_run_from_results( # noqa: PLR0913 fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"] generated_description = "\n".join(fields) - task_id = task.task_id - if task_id is None: - raise ValueError("The task should be published at OpenML") - run = OpenMLRun( - task_id=task_id, + task_id=cast(int, task.task_id), flow_id=flow_id, dataset_id=dataset.dataset_id, model=flow.model, flow_name=flow.name, tags=tags, - trace=results.trace, - data_content=results.data_content, + trace=trace, + data_content=data_content, flow=flow, setup_string=flow.extension.create_setup_string(flow.model), description_text=generated_description, @@ -283,9 +268,9 @@ def _create_run_from_results( # noqa: PLR0913 # now we need to attach the detailed evaluations if task.task_type_id == TaskType.LEARNING_CURVE: - run.sample_evaluations = results.sample_evaluations + run.sample_evaluations = sample_evaluations else: - run.fold_evaluations = results.fold_evaluations + run.fold_evaluations = fold_evaluations return run @@ -458,10 +443,11 @@ def run_flow_on_task( # noqa: PLR0913 run : OpenMLRun Result of the run. """ + if avoid_duplicate_runs is None: + avoid_duplicate_runs = openml.config.avoid_duplicate_runs + # 1. Validate inputs - flow, task, avoid_duplicate_runs = _validate_flow_and_task_inputs( - flow, task, flow_tags, avoid_duplicate_runs - ) + flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags) # 2. Prepare the model if flow.model is None: @@ -496,19 +482,15 @@ def run_flow_on_task( # noqa: PLR0913 n_jobs=n_jobs, ) - results = _RunResults( - data_content=data_content, - trace=trace, - fold_evaluations=fold_evaluations, - sample_evaluations=sample_evaluations, - ) - # 7. Create run from results run = _create_run_from_results( task=task, flow=flow, flow_id=flow_id, - results=results, + data_content=data_content, + trace=trace, + fold_evaluations=fold_evaluations, + sample_evaluations=sample_evaluations, tags=tags, run_environment=run_environment, upload_flow=upload_flow, From 6771fb4d51ec8798f1b22b4409b6e0cf67f0654a Mon Sep 17 00:00:00 2001 From: Omswastik-11 Date: Tue, 13 Jan 2026 14:53:21 +0530 Subject: [PATCH 3/4] remove redudandent checkings --- openml/runs/functions.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 21cec1edc..7d225785d 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -163,9 +163,7 @@ def _sync_flow_with_server( flow_from_server.model = flow.model setup_id = setup_exists(flow_from_server) task_id = task.task_id - if task_id is None: - raise ValueError("The task should be published at OpenML") - ids = run_exists(task_id, setup_id) + ids = run_exists(cast(int, task_id), setup_id) if ids: error_message = ( "One or more runs of this setup were already performed on the task." From 04a6e0f35428c70446c22d9636fd003e57b6ea1c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 14 Jan 2026 11:15:28 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- openml/runs/functions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 0c6867ec1..2c01f5a81 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -163,7 +163,7 @@ def _sync_flow_with_server( flow_from_server.model = flow.model setup_id = setup_exists(flow_from_server) task_id = task.task_id - ids = run_exists(cast(int, task_id), setup_id) + ids = run_exists(cast("int", task_id), setup_id) if ids: error_message = ( "One or more runs of this setup were already performed on the task." @@ -205,8 +205,8 @@ def _create_run_from_results( # noqa: PLR0913 sample_evaluations: OrderedDict[str, OrderedDict], tags: list[str], run_environment: list[str], - upload_flow: bool, # noqa: FBT001 - avoid_duplicate_runs: bool, # noqa: FBT001 + upload_flow: bool, + avoid_duplicate_runs: bool, ) -> OpenMLRun: """Create an OpenMLRun object from execution results. @@ -245,7 +245,7 @@ def _create_run_from_results( # noqa: PLR0913 generated_description = "\n".join(fields) run = OpenMLRun( - task_id=cast(int, task.task_id), + task_id=cast("int", task.task_id), flow_id=flow_id, dataset_id=dataset.dataset_id, model=flow.model,