Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 248 additions & 84 deletions openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import OrderedDict
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -54,6 +54,225 @@
ERROR_CODE = 512


def _validate_flow_and_task_inputs(
flow: OpenMLFlow | OpenMLTask,
task: OpenMLTask | OpenMLFlow,
flow_tags: list[str] | None,
) -> tuple[OpenMLFlow, OpenMLTask]:
"""Validate and normalize inputs for flow and task execution.

Parameters
----------
flow : OpenMLFlow or OpenMLTask
The flow object (may be swapped with task for backward compatibility).
task : OpenMLTask or OpenMLFlow
The task object (may be swapped with flow for backward compatibility).
flow_tags : List[str] or None
A list of tags that the flow should have at creation.

Returns
-------
Tuple[OpenMLFlow, OpenMLTask]
The validated flow and task.

Raises
------
ValueError
If flow_tags is not a list or task is not published.
"""
if flow_tags is not None and not isinstance(flow_tags, list):
raise ValueError("flow_tags should be a list")

# TODO: At some point in the future do not allow for arguments in old order (changed 6-2018).
# Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow):
# We want to allow either order of argument (to avoid confusion).
warnings.warn(
"The old argument order (Flow, model) is deprecated and "
"will not be supported in the future. Please use the "
"order (model, Flow).",
DeprecationWarning,
stacklevel=3,
)
task, flow = flow, task

if not isinstance(flow, OpenMLFlow):
raise TypeError("Flow must be OpenMLFlow after validation")

if not isinstance(task, OpenMLTask):
raise TypeError("Task must be OpenMLTask after validation")

if task.task_id is None:
raise ValueError("The task should be published at OpenML")

return flow, task


def _sync_flow_with_server(
flow: OpenMLFlow,
task: OpenMLTask,
*,
upload_flow: bool,
avoid_duplicate_runs: bool,
) -> int | None:
"""Synchronize flow with server and check if setup/task combination is already present.

Parameters
----------
flow : OpenMLFlow
The flow to synchronize.
task : OpenMLTask
The task to check for duplicate runs.
upload_flow : bool
Whether to upload the flow if it doesn't exist.
avoid_duplicate_runs : bool
Whether to check for duplicate runs.

Returns
-------
int or None
The flow_id if synced with server, None otherwise.

Raises
------
PyOpenMLError
If flow_id mismatch or flow doesn't exist when expected.
OpenMLRunsExistError
If duplicate runs exist and avoid_duplicate_runs is True.
"""
# We only need to sync with the server right now if we want to upload the flow,
# or ensure no duplicate runs exist. Otherwise it can be synced at upload time.
flow_id = None
if upload_flow or avoid_duplicate_runs:
flow_id = flow_exists(flow.name, flow.external_version)
if isinstance(flow.flow_id, int) and flow_id != flow.flow_id:
if flow_id is not False:
raise PyOpenMLError(
f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'",
)
raise PyOpenMLError(
"Flow does not exist on the server, but 'flow.flow_id' is not None."
)
if upload_flow and flow_id is False:
flow.publish()
flow_id = flow.flow_id
elif flow_id:
flow_from_server = get_flow(flow_id)
_copy_server_fields(flow_from_server, flow)
if avoid_duplicate_runs:
flow_from_server.model = flow.model
setup_id = setup_exists(flow_from_server)
task_id = task.task_id
ids = run_exists(cast(int, task_id), setup_id)
if ids:
error_message = (
"One or more runs of this setup were already performed on the task."
)
raise OpenMLRunsExistError(ids, error_message)
else:
# Flow does not exist on server and we do not want to upload it.
# No sync with the server happens.
flow_id = None

return flow_id


def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]:
"""Prepare run environment information and tags.

Parameters
----------
flow : OpenMLFlow
The flow to get version information from.

Returns
-------
Tuple[List[str], List[str]]
A tuple of (tags, run_environment).
"""
run_environment = flow.extension.get_version_information()
tags = ["openml-python", run_environment[1]]
return tags, run_environment


def _create_run_from_results( # noqa: PLR0913
task: OpenMLTask,
flow: OpenMLFlow,
flow_id: int | None,
data_content: list[list],
trace: OpenMLRunTrace | None,
fold_evaluations: OrderedDict[str, OrderedDict],
sample_evaluations: OrderedDict[str, OrderedDict],
tags: list[str],
run_environment: list[str],
upload_flow: bool, # noqa: FBT001
avoid_duplicate_runs: bool, # noqa: FBT001
) -> OpenMLRun:
"""Create an OpenMLRun object from execution results.

Parameters
----------
task : OpenMLTask
The task that was executed.
flow : OpenMLFlow
The flow that was executed.
flow_id : int or None
The flow ID if synced with server.
data_content : List[List]
The prediction data content.
trace : OpenMLRunTrace or None
The execution trace if available.
fold_evaluations : OrderedDict
The fold-based evaluation measures.
sample_evaluations : OrderedDict
The sample-based evaluation measures.
tags : List[str]
Tags to attach to the run.
run_environment : List[str]
Environment information.
upload_flow : bool
Whether the flow was uploaded.
avoid_duplicate_runs : bool
Whether duplicate runs were checked.

Returns
-------
OpenMLRun
The created run object.
"""
dataset = task.get_dataset()
fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"]
generated_description = "\n".join(fields)

run = OpenMLRun(
task_id=cast(int, task.task_id),
flow_id=flow_id,
dataset_id=dataset.dataset_id,
model=flow.model,
flow_name=flow.name,
tags=tags,
trace=trace,
data_content=data_content,
flow=flow,
setup_string=flow.extension.create_setup_string(flow.model),
description_text=generated_description,
)

if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None:
# We only extract the parameter settings if a sync happened with the server.
# I.e. when the flow was uploaded or we found it in the avoid_duplicate check.
# Otherwise, we will do this at upload time.
run.parameter_settings = flow.extension.obtain_parameter_values(flow)

# now we need to attach the detailed evaluations
if task.task_type_id == TaskType.LEARNING_CURVE:
run.sample_evaluations = sample_evaluations
else:
run.fold_evaluations = fold_evaluations

return run


# TODO(eddiebergman): Could potentially overload this but
# it seems very big to do so
def run_model_on_task( # noqa: PLR0913
Expand Down Expand Up @@ -175,7 +394,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask:
return run


def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913
def run_flow_on_task( # noqa: PLR0913
flow: OpenMLFlow,
task: OpenMLTask,
avoid_duplicate_runs: bool | None = None,
Expand Down Expand Up @@ -222,116 +441,61 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913
run : OpenMLRun
Result of the run.
"""
if flow_tags is not None and not isinstance(flow_tags, list):
raise ValueError("flow_tags should be a list")

if avoid_duplicate_runs is None:
avoid_duplicate_runs = openml.config.avoid_duplicate_runs

# TODO: At some point in the future do not allow for arguments in old order (changed 6-2018).
# Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow):
# We want to allow either order of argument (to avoid confusion).
warnings.warn(
"The old argument order (Flow, model) is deprecated and "
"will not be supported in the future. Please use the "
"order (model, Flow).",
DeprecationWarning,
stacklevel=2,
)
task, flow = flow, task

if task.task_id is None:
raise ValueError("The task should be published at OpenML")
# 1. Validate inputs
flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags)

# 2. Prepare the model
if flow.model is None:
flow.model = flow.extension.flow_to_model(flow)

flow.model = flow.extension.seed_model(flow.model, seed=seed)

# We only need to sync with the server right now if we want to upload the flow,
# or ensure no duplicate runs exist. Otherwise it can be synced at upload time.
flow_id = None
if upload_flow or avoid_duplicate_runs:
flow_id = flow_exists(flow.name, flow.external_version)
if isinstance(flow.flow_id, int) and flow_id != flow.flow_id:
if flow_id is not False:
raise PyOpenMLError(
f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'",
)
raise PyOpenMLError(
"Flow does not exist on the server, but 'flow.flow_id' is not None."
)
if upload_flow and flow_id is False:
flow.publish()
flow_id = flow.flow_id
elif flow_id:
flow_from_server = get_flow(flow_id)
_copy_server_fields(flow_from_server, flow)
if avoid_duplicate_runs:
flow_from_server.model = flow.model
setup_id = setup_exists(flow_from_server)
ids = run_exists(task.task_id, setup_id)
if ids:
error_message = (
"One or more runs of this setup were already performed on the task."
)
raise OpenMLRunsExistError(ids, error_message)
else:
# Flow does not exist on server and we do not want to upload it.
# No sync with the server happens.
flow_id = None

dataset = task.get_dataset()
# 3. Sync with server and check for duplicates
flow_id = _sync_flow_with_server(
flow,
task,
upload_flow=upload_flow,
avoid_duplicate_runs=avoid_duplicate_runs,
)

run_environment = flow.extension.get_version_information()
tags = ["openml-python", run_environment[1]]
# 4. Prepare run environment
tags, run_environment = _prepare_run_environment(flow)

# 5. Check if model is already fitted
if flow.extension.check_if_model_fitted(flow.model):
warnings.warn(
"The model is already fitted! This might cause inconsistency in comparison of results.",
RuntimeWarning,
stacklevel=2,
)

# execute the run
res = _run_task_get_arffcontent(
# 6. Execute the run (parallel processing happens here)
data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent(
model=flow.model,
task=task,
extension=flow.extension,
add_local_measures=add_local_measures,
n_jobs=n_jobs,
)

data_content, trace, fold_evaluations, sample_evaluations = res
fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"]
generated_description = "\n".join(fields)
run = OpenMLRun(
task_id=task.task_id,
# 7. Create run from results
run = _create_run_from_results(
task=task,
flow=flow,
flow_id=flow_id,
dataset_id=dataset.dataset_id,
model=flow.model,
flow_name=flow.name,
tags=tags,
trace=trace,
data_content=data_content,
flow=flow,
setup_string=flow.extension.create_setup_string(flow.model),
description_text=generated_description,
trace=trace,
fold_evaluations=fold_evaluations,
sample_evaluations=sample_evaluations,
tags=tags,
run_environment=run_environment,
upload_flow=upload_flow,
avoid_duplicate_runs=avoid_duplicate_runs,
)

if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None:
# We only extract the parameter settings if a sync happened with the server.
# I.e. when the flow was uploaded or we found it in the avoid_duplicate check.
# Otherwise, we will do this at upload time.
run.parameter_settings = flow.extension.obtain_parameter_values(flow)

# now we need to attach the detailed evaluations
if task.task_type_id == TaskType.LEARNING_CURVE:
run.sample_evaluations = sample_evaluations
else:
run.fold_evaluations = fold_evaluations

# 8. Log completion message
if flow_id:
message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}"
else:
Expand Down