Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 194 additions & 38 deletions docs/user-guide/parameter-tables.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@
"import sciline\n",
"\n",
"_fake_filesytem = {\n",
" 'file102.txt': [1, 2, float('nan'), 3],\n",
" 'file103.txt': [1, 2, 3, 4],\n",
" 'file104.txt': [1, 2, 3, 4, 5],\n",
" 'file105.txt': [1, 2, 3],\n",
" \"file102.txt\": [1, 2, float(\"nan\"), 3],\n",
" \"file103.txt\": [1, 2, 3, 4],\n",
" \"file104.txt\": [1, 2, 3, 4, 5],\n",
" \"file105.txt\": [1, 2, 3],\n",
"}\n",
"\n",
"# 1. Define domain types\n",
"\n",
"Filename = NewType('Filename', str)\n",
"RawData = NewType('RawData', dict)\n",
"CleanedData = NewType('CleanedData', list)\n",
"ScaleFactor = NewType('ScaleFactor', float)\n",
"Result = NewType('Result', float)\n",
"Filename = NewType(\"Filename\", str)\n",
"RawData = NewType(\"RawData\", dict)\n",
"CleanedData = NewType(\"CleanedData\", list)\n",
"ScaleFactor = NewType(\"ScaleFactor\", float)\n",
"Result = NewType(\"Result\", float)\n",
"\n",
"\n",
"# 2. Define providers\n",
Expand All @@ -59,14 +59,14 @@
" \"\"\"Load the data from the filename.\"\"\"\n",
"\n",
" data = _fake_filesytem[filename]\n",
" return RawData({'data': data, 'meta': {'filename': filename}})\n",
" return RawData({\"data\": data, \"meta\": {\"filename\": filename}})\n",
"\n",
"\n",
"def clean(raw_data: RawData) -> CleanedData:\n",
" \"\"\"Clean the data, removing NaNs.\"\"\"\n",
" import math\n",
"\n",
" return CleanedData([x for x in raw_data['data'] if not math.isnan(x)])\n",
" return CleanedData([x for x in raw_data[\"data\"] if not math.isnan(x)])\n",
"\n",
"\n",
"def process(data: CleanedData, param: ScaleFactor) -> Result:\n",
Expand Down Expand Up @@ -95,7 +95,7 @@
"metadata": {},
"outputs": [],
"source": [
"base.visualize(Result, graph_attr={'rankdir': 'LR'})"
"base.visualize(Result, graph_attr={\"rankdir\": \"LR\"})"
]
},
{
Expand All @@ -114,9 +114,9 @@
"import pandas as pd\n",
"\n",
"run_ids = [102, 103, 104, 105]\n",
"filenames = [f'file{i}.txt' for i in run_ids]\n",
"filenames = [f\"file{i}.txt\" for i in run_ids]\n",
"param_table = pd.DataFrame({Filename: filenames}, index=run_ids).rename_axis(\n",
" index='run_id'\n",
" index=\"run_id\"\n",
")\n",
"param_table"
]
Expand Down Expand Up @@ -229,7 +229,11 @@
"metadata": {},
"outputs": [],
"source": [
"graph = pipeline.reduce(func=lambda *result: sum(result), name='merged').get('merged')\n",
"def merge(*data):\n",
" return sum(data)\n",
"\n",
"\n",
"graph = pipeline.reduce(func=merge, name=\"merged\").get(\"merged\")\n",
"graph.visualize()"
]
},
Expand Down Expand Up @@ -294,8 +298,6 @@
"source": [
"## Grouping intermediate results based on secondary parameters\n",
"\n",
"**Cyclebane and Sciline do not support `groupby` yet, this is work in progress so this example is not functional yet.**\n",
"\n",
"This chapter illustrates how to implement *groupby* operations with Sciline.\n",
"\n",
"Continuing from the examples for *map* and *reduce*, we can introduce a secondary parameter in the table, such as the material of the sample:"
Expand All @@ -307,30 +309,185 @@
"metadata": {},
"outputs": [],
"source": [
"Material = NewType('Material', str)\n",
"\n",
"run_ids = [102, 103, 104, 105]\n",
"sample = ['diamond', 'graphite', 'graphite', 'graphite']\n",
"filenames = [f'file{i}.txt' for i in run_ids]\n",
"sample = [\"diamond\", \"graphite\", \"graphite\", \"graphite\"]\n",
"filenames = [f\"file{i}.txt\" for i in run_ids]\n",
"param_table = pd.DataFrame(\n",
" {Filename: filenames, Material: sample}, index=run_ids\n",
").rename_axis(index='run_id')\n",
" {Filename: filenames, \"Material\": sample}, index=run_ids\n",
").rename_axis(index=\"run_id\")\n",
"param_table"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Future releases of Sciline will support a `groupby` operation, roughly as follows:\n",
"We then group the results by sample material,\n",
"and reduce the data using the same merge function as before.\n",
"\n",
"```python\n",
"pipeline = base.map(param_table).groupby(Material).reduce(func=merge)\n",
"```\n",
"The end goal being to obtain two end results; one for each material."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grouped = (\n",
" base.map(param_table)\n",
" .groupby(\"Material\")\n",
" .reduce(key=Result, func=merge, name=\"merged\")\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"graph = grouped.get(sciline.get_mapped_node_names(grouped, \"merged\"))\n",
"graph.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sciline.compute_mapped(grouped, \"merged\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Grouping early in the graph\n",
"\n",
"Sometimes, it is also desirable to apply grouping earlier in the pipeline graph.\n",
"In this example, we wish to combine the raw data before cleaning and computing the result.\n",
"\n",
"We can then compute the merged result, grouped by the value of `Material`.\n",
"Note how the initial steps of the computation depend on the `run_id` index name, while later steps depend on `Material`, a new index name defined by the `groupby` operation.\n",
"The files for each run ID have been grouped by their material and then merged."
"The function that merges the raw data needs to merge both data and metadata:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define function to merge RawData\n",
"def merge_raw(*das):\n",
" out = {\"data\": [], \"meta\": {}}\n",
" for da in das:\n",
" out[\"data\"].extend(da[\"data\"])\n",
" for k, v in da[\"meta\"].items():\n",
" if k not in out[\"meta\"]:\n",
" out[\"meta\"][k] = []\n",
" out[\"meta\"][k].append(v)\n",
" return out"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The grouped graph is now just taking the part that leads to `RawData` (via `base[RawData]`) and dropping everything downstream."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grouped = (\n",
" base[RawData]\n",
" .map(param_table)\n",
" .groupby(\"Material\")\n",
" .reduce(key=RawData, func=merge_raw, name=\"merged\")\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Visualizing the graph shows two `merged` nodes; one for each material:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grouped.visualize(sciline.get_mapped_node_names(grouped, \"merged\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The task is now to reattach this grouped graph to the lower part of our pipeline.\n",
"Since the base graph has a single `RawData`, we first need to map it to the two possible materials that are left after grouping."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new = base.copy()\n",
"new[RawData] = None\n",
"\n",
"# Get the list of materials left after grouping\n",
"unique_materials = sciline.get_mapped_node_names(grouped, \"merged\").index\n",
"\n",
"mapped = new.map(\n",
" pd.DataFrame(\n",
" {RawData: [None] * len(unique_materials), \"Material\": unique_materials}\n",
" ).set_index(\"Material\")\n",
")\n",
"\n",
"mapped.visualize(sciline.get_mapped_node_names(mapped, Result))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now attach the top part of the graph to the bottom one:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mapped[RawData] = grouped[\"merged\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mapped.visualize(sciline.get_mapped_node_names(mapped, Result))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sciline.compute_mapped(mapped, Result)"
]
},
{
Expand Down Expand Up @@ -457,11 +614,11 @@
"pl = (\n",
" base.map({Param1: [1, 4, 9]})\n",
" .map({Param2: [1, 2]})\n",
" .reduce(func=gather, name='reduce_1', index='dim_1')\n",
" .reduce(func=gather, name='reduce_0')\n",
" .reduce(func=gather, name=\"reduce_1\", index=\"dim_1\")\n",
" .reduce(func=gather, name=\"reduce_0\")\n",
")\n",
"\n",
"pl.visualize('reduce_0')"
"pl.visualize(\"reduce_0\")"
]
},
{
Expand All @@ -477,7 +634,7 @@
"metadata": {},
"outputs": [],
"source": [
"pl.compute('reduce_0')"
"pl.compute(\"reduce_0\")"
]
},
{
Expand All @@ -497,9 +654,9 @@
"pl = (\n",
" base.map({Param1: [1, 4, 9]})\n",
" .map({Param2: [1, 2]})\n",
" .reduce(func=gather, name='reduce_both')\n",
" .reduce(func=gather, name=\"reduce_both\")\n",
")\n",
"pl.visualize('reduce_both')"
"pl.visualize(\"reduce_both\")"
]
}
],
Expand All @@ -518,8 +675,7 @@
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"pygments_lexer": "ipython3"
}
},
"nbformat": 4,
Expand Down
3 changes: 2 additions & 1 deletion src/sciline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
HandleAsComputeTimeException,
UnsatisfiedRequirement,
)
from .pipeline import Pipeline, compute_mapped, get_mapped_node_names
from .pipeline import Pipeline, compute_mapped, get_mapped_node_names, is_mapped_node
from .task_graph import TaskGraph

__all__ = [
Expand All @@ -31,6 +31,7 @@
"UnsatisfiedRequirement",
"compute_mapped",
"get_mapped_node_names",
"is_mapped_node",
"scheduler",
]

Expand Down
Loading
Loading