From 2ef61ffb01a30894d99ed863be2376d3e45bbd78 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Mon, 8 Sep 2025 13:08:28 +0200 Subject: [PATCH 01/11] add groupby method to DataGraph --- src/sciline/data_graph.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/sciline/data_graph.py b/src/sciline/data_graph.py index 401a07ce..352f65e8 100644 --- a/src/sciline/data_graph.py +++ b/src/sciline/data_graph.py @@ -207,6 +207,23 @@ def map(self: T, node_values: dict[Key, Any]) -> T: """ return self._from_cyclebane(self._cbgraph.map(node_values)) + def groupby(self: T, node: Key) -> T: + """Group the graph by a specific node. + + Parameters + ---------- + node: + Node key to group by. + + Returns + ------- + : + A new graph that groups mapped nodes by the given key. This graph is not + meant to be executed directly, but to be further processed via + :meth:`reduce`. + """ + return self._from_cyclebane(self._cbgraph.groupby(node)) + def reduce(self: T, *, func: Callable[..., Any], **kwargs: Any) -> T: """Reduce the outputs of a mapped graph into a single value and provider. From 15e262e15f72f4f6091ef650c86364b413967223 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Mon, 8 Sep 2025 14:49:49 +0200 Subject: [PATCH 02/11] add groupby test --- tests/groupby_test.py | 105 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 tests/groupby_test.py diff --git a/tests/groupby_test.py b/tests/groupby_test.py new file mode 100644 index 00000000..4714a885 --- /dev/null +++ b/tests/groupby_test.py @@ -0,0 +1,105 @@ +from typing import NewType + +import pandas as pd + +import sciline as sl + +_fake_filesytem = { + 'file102.txt': [1, 2, float('nan'), 3], + 'file103.txt': [4, 5, 6, 7], + 'file104.txt': [8, 9, 10, 11, 12], + 'file105.txt': [13, 14, 15], +} + +# 1. Define domain types + +Filename = NewType('Filename', str) +RawData = NewType('RawData', dict) +CleanedData = NewType('CleanedData', list) +ScaleFactor = NewType('ScaleFactor', float) +Result = NewType('Result', float) +Material = NewType('Material', str) + + +# 2. Define providers + + +def load(filename: Filename) -> RawData: + """Load the data from the filename.""" + + data = _fake_filesytem[filename] + return RawData({'data': data, 'meta': {'filename': filename}}) + + +def clean(raw_data: RawData) -> CleanedData: + """Clean the data, removing NaNs.""" + import math + + return CleanedData([x for x in raw_data['data'] if not math.isnan(x)]) + + +def process(data: CleanedData, param: ScaleFactor) -> Result: + """Process the data, multiplying the sum by the scale factor.""" + return Result(sum(data) * param) + + +def test_groupby_material(): + # Create pipeline + providers = [load, clean, process] + params = {ScaleFactor: 2.0} + base = sl.Pipeline(providers, params=params) + + # Make parameter table + run_ids = [102, 103, 104, 105] + sample = ['diamond', 'graphite', 'graphite', 'graphite'] + filenames = [f'file{i}.txt' for i in run_ids] + param_table = pd.DataFrame( + {Filename: filenames, 'Material': sample}, index=run_ids + ).rename_axis(index='run_id') + + # Define function to merge RawData + def merge(*das): + out = {"data": [], "meta": {}} + for da in das: + out["data"].extend(da["data"]) + for k, v in da["meta"].items(): + if k not in out["meta"]: + out["meta"][k] = [] + out["meta"][k].append(v) + return out + + # Group by Material and merge RawData + MergedData = NewType('MergedData', int) + grouped = ( + base[RawData] + .map(param_table) + .groupby('Material') + .reduce(key=RawData, func=merge, name=MergedData) + ) + + # Join back to base pipeline + new = base.copy() + new[RawData] = None + + mapped = new.map( + # Need dummy RawData column to allow re-attaching + pd.DataFrame({RawData: [1, 2], 'Material': ['diamond', 'graphite']}).set_index( + 'Material' + ) + ) + + # Attach the grouped MergedData to the lower part of the pipeline + mapped[RawData] = grouped[MergedData] + + clean_data = sl.compute_mapped(mapped, CleanedData) + assert clean_data['diamond'] == [1, 2, 3] + assert clean_data['graphite'] == [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + + result = sl.compute_mapped(mapped, Result) + assert result['diamond'] == 12.0 + assert result['graphite'] == 228.0 + + # TODO: currently cannot compute RawData: ValueError: Multiple mapped nodes with + # name '__main__.RawData' found: + # - MappedNode(name=__main__.RawData, indices=('Material',)) + # - MappedNode(name=__main__.RawData, indices=('run_id',)) From 30bd20bd2984b8e8b58342326d1332cd9778c80e Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Mon, 8 Sep 2025 15:23:07 +0200 Subject: [PATCH 03/11] add simple groupby test grouping at Result --- tests/groupby_test.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/tests/groupby_test.py b/tests/groupby_test.py index 4714a885..c748a60e 100644 --- a/tests/groupby_test.py +++ b/tests/groupby_test.py @@ -43,7 +43,37 @@ def process(data: CleanedData, param: ScaleFactor) -> Result: return Result(sum(data) * param) -def test_groupby_material(): +def merge(*data): + return sum(data) + + +def test_groupby_material_at_result(): + # Create pipeline + providers = [load, clean, process] + params = {ScaleFactor: 2.0} + base = sl.Pipeline(providers, params=params) + + # Make parameter table + run_ids = [102, 103, 104, 105] + sample = ['diamond', 'graphite', 'graphite', 'graphite'] + filenames = [f'file{i}.txt' for i in run_ids] + param_table = pd.DataFrame( + {Filename: filenames, Material: sample}, index=run_ids + ).rename_axis(index='run_id') + + # Group by Material and merge Result + grouped = ( + base.map(param_table) + .groupby(Material) + .reduce(key=Result, func=merge, name="merged") + ) + + result = sl.compute_mapped(grouped, "merged") + assert result['diamond'] == 12.0 + assert result['graphite'] == 228.0 + + +def test_groupby_material_at_rawdata(): # Create pipeline providers = [load, clean, process] params = {ScaleFactor: 2.0} @@ -58,7 +88,7 @@ def test_groupby_material(): ).rename_axis(index='run_id') # Define function to merge RawData - def merge(*das): + def merge_raw(*das): out = {"data": [], "meta": {}} for da in das: out["data"].extend(da["data"]) @@ -74,7 +104,7 @@ def merge(*das): base[RawData] .map(param_table) .groupby('Material') - .reduce(key=RawData, func=merge, name=MergedData) + .reduce(key=RawData, func=merge_raw, name=MergedData) ) # Join back to base pipeline From 1b6923e8b4278467ea4b537f945e328d7d7a1cfb Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Mon, 8 Sep 2025 15:27:19 +0200 Subject: [PATCH 04/11] update notebook --- docs/user-guide/parameter-tables.ipynb | 163 ++++++++++++++++++++++++- 1 file changed, 161 insertions(+), 2 deletions(-) diff --git a/docs/user-guide/parameter-tables.ipynb b/docs/user-guide/parameter-tables.ipynb index 08ca020a..2d314d09 100644 --- a/docs/user-guide/parameter-tables.ipynb +++ b/docs/user-guide/parameter-tables.ipynb @@ -229,7 +229,10 @@ "metadata": {}, "outputs": [], "source": [ - "graph = pipeline.reduce(func=lambda *result: sum(result), name='merged').get('merged')\n", + "def merge(*data):\n", + " return sum(data)\n", + "\n", + "graph = pipeline.reduce(func=merge, name='merged').get('merged')\n", "graph.visualize()" ] }, @@ -318,6 +321,162 @@ "param_table" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "grouped = base.map(param_table).groupby('Material').reduce(\n", + " key=Result,\n", + " func=merge,\n", + " name=\"merged\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "graph = grouped.get(sciline.get_mapped_node_names(grouped, \"merged\"))\n", + "graph.visualize()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sciline.compute_mapped(grouped, \"merged\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grouping early in the graph" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "run_ids = [102, 103, 104, 105]\n", + "sample = ['diamond', 'graphite', 'graphite', 'graphite']\n", + "filenames = [f'file{i}.txt' for i in run_ids]\n", + "param_table = pd.DataFrame(\n", + " {Filename: filenames, 'Material': sample}, index=run_ids\n", + ").rename_axis(index='run_id')\n", + "param_table" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define function to merge RawData\n", + "def merge_raw(*das):\n", + " out = {\"data\": [], \"meta\": {}}\n", + " for da in das:\n", + " out[\"data\"].extend(da[\"data\"])\n", + " for k, v in da[\"meta\"].items():\n", + " if k not in out[\"meta\"]:\n", + " out[\"meta\"][k] = []\n", + " out[\"meta\"][k].append(v)\n", + " return out" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "grouped = base[RawData].map(param_table).groupby('Material').reduce(key=RawData, func=merge_raw, name=\"merged\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "grouped.visualize(sciline.get_mapped_node_names(grouped, \"merged\"))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "new = base.copy()\n", + "new[RawData] = None\n", + "\n", + "mapped = new.map(\n", + " pd.DataFrame(\n", + " {RawData: [1, 2], 'Material': ['diamond', 'graphite']}\n", + " ).set_index('Material')\n", + ")\n", + "\n", + "mapped.visualize(sl.get_mapped_node_names(mapped, Result))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mapped[RawData] = grouped[\"merged\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mapped.visualize(sciline.get_mapped_node_names(mapped, Result))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sl.compute_mapped(mapped, Result)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -519,7 +678,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.12.7" } }, "nbformat": 4, From 48edf09ad94423f299dd6fd1a7f86118a0abf034 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Mon, 8 Sep 2025 15:28:35 +0200 Subject: [PATCH 05/11] fix notebook --- docs/user-guide/parameter-tables.ipynb | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/docs/user-guide/parameter-tables.ipynb b/docs/user-guide/parameter-tables.ipynb index 2d314d09..210571f9 100644 --- a/docs/user-guide/parameter-tables.ipynb +++ b/docs/user-guide/parameter-tables.ipynb @@ -321,13 +321,6 @@ "param_table" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "code", "execution_count": null, @@ -360,20 +353,6 @@ "sciline.compute_mapped(grouped, \"merged\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "markdown", "metadata": {}, @@ -447,7 +426,7 @@ " ).set_index('Material')\n", ")\n", "\n", - "mapped.visualize(sl.get_mapped_node_names(mapped, Result))" + "mapped.visualize(sciline.get_mapped_node_names(mapped, Result))" ] }, { @@ -474,7 +453,7 @@ "metadata": {}, "outputs": [], "source": [ - "sl.compute_mapped(mapped, Result)" + "sciline.compute_mapped(mapped, Result)" ] }, { From d541421b383aba101399c3f3148a4e37983cd973 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Wed, 10 Sep 2025 14:46:41 +0200 Subject: [PATCH 06/11] compute RawData in test using index_names --- tests/groupby_test.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/tests/groupby_test.py b/tests/groupby_test.py index c748a60e..fa86f017 100644 --- a/tests/groupby_test.py +++ b/tests/groupby_test.py @@ -1,5 +1,6 @@ from typing import NewType +import numpy as np import pandas as pd import sciline as sl @@ -99,12 +100,11 @@ def merge_raw(*das): return out # Group by Material and merge RawData - MergedData = NewType('MergedData', int) grouped = ( base[RawData] .map(param_table) .groupby('Material') - .reduce(key=RawData, func=merge_raw, name=MergedData) + .reduce(key=RawData, func=merge_raw, name='merged') ) # Join back to base pipeline @@ -118,18 +118,29 @@ def merge_raw(*das): ) ) - # Attach the grouped MergedData to the lower part of the pipeline - mapped[RawData] = grouped[MergedData] + # Attach the grouped merged data to the lower part of the pipeline + mapped[RawData] = grouped['merged'] clean_data = sl.compute_mapped(mapped, CleanedData) - assert clean_data['diamond'] == [1, 2, 3] - assert clean_data['graphite'] == [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + assert np.array_equal(clean_data['diamond'], [1, 2, 3]) + assert np.array_equal( + clean_data['graphite'], [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] + ) result = sl.compute_mapped(mapped, Result) assert result['diamond'] == 12.0 assert result['graphite'] == 228.0 - # TODO: currently cannot compute RawData: ValueError: Multiple mapped nodes with - # name '__main__.RawData' found: - # - MappedNode(name=__main__.RawData, indices=('Material',)) - # - MappedNode(name=__main__.RawData, indices=('run_id',)) + raw_data = sl.compute_mapped(mapped, RawData, index_names=['Material']) + assert np.array_equal( + raw_data['diamond']['data'], [1, 2, float('nan'), 3], equal_nan=True + ) + assert np.array_equal( + raw_data['graphite']['data'], + [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], + equal_nan=True, + ) + assert raw_data['diamond']['meta'] == {'filename': ['file102.txt']} + assert raw_data['graphite']['meta'] == { + 'filename': ['file103.txt', 'file104.txt', 'file105.txt'] + } From f26e3535a08af603d23318cb0426c90e108aa228 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Wed, 10 Sep 2025 15:08:08 +0200 Subject: [PATCH 07/11] update notebook in docs --- docs/user-guide/parameter-tables.ipynb | 153 ++++++++++++++----------- 1 file changed, 86 insertions(+), 67 deletions(-) diff --git a/docs/user-guide/parameter-tables.ipynb b/docs/user-guide/parameter-tables.ipynb index 210571f9..11df4e5a 100644 --- a/docs/user-guide/parameter-tables.ipynb +++ b/docs/user-guide/parameter-tables.ipynb @@ -37,19 +37,19 @@ "import sciline\n", "\n", "_fake_filesytem = {\n", - " 'file102.txt': [1, 2, float('nan'), 3],\n", - " 'file103.txt': [1, 2, 3, 4],\n", - " 'file104.txt': [1, 2, 3, 4, 5],\n", - " 'file105.txt': [1, 2, 3],\n", + " \"file102.txt\": [1, 2, float(\"nan\"), 3],\n", + " \"file103.txt\": [1, 2, 3, 4],\n", + " \"file104.txt\": [1, 2, 3, 4, 5],\n", + " \"file105.txt\": [1, 2, 3],\n", "}\n", "\n", "# 1. Define domain types\n", "\n", - "Filename = NewType('Filename', str)\n", - "RawData = NewType('RawData', dict)\n", - "CleanedData = NewType('CleanedData', list)\n", - "ScaleFactor = NewType('ScaleFactor', float)\n", - "Result = NewType('Result', float)\n", + "Filename = NewType(\"Filename\", str)\n", + "RawData = NewType(\"RawData\", dict)\n", + "CleanedData = NewType(\"CleanedData\", list)\n", + "ScaleFactor = NewType(\"ScaleFactor\", float)\n", + "Result = NewType(\"Result\", float)\n", "\n", "\n", "# 2. Define providers\n", @@ -59,14 +59,14 @@ " \"\"\"Load the data from the filename.\"\"\"\n", "\n", " data = _fake_filesytem[filename]\n", - " return RawData({'data': data, 'meta': {'filename': filename}})\n", + " return RawData({\"data\": data, \"meta\": {\"filename\": filename}})\n", "\n", "\n", "def clean(raw_data: RawData) -> CleanedData:\n", " \"\"\"Clean the data, removing NaNs.\"\"\"\n", " import math\n", "\n", - " return CleanedData([x for x in raw_data['data'] if not math.isnan(x)])\n", + " return CleanedData([x for x in raw_data[\"data\"] if not math.isnan(x)])\n", "\n", "\n", "def process(data: CleanedData, param: ScaleFactor) -> Result:\n", @@ -95,7 +95,7 @@ "metadata": {}, "outputs": [], "source": [ - "base.visualize(Result, graph_attr={'rankdir': 'LR'})" + "base.visualize(Result, graph_attr={\"rankdir\": \"LR\"})" ] }, { @@ -114,9 +114,9 @@ "import pandas as pd\n", "\n", "run_ids = [102, 103, 104, 105]\n", - "filenames = [f'file{i}.txt' for i in run_ids]\n", + "filenames = [f\"file{i}.txt\" for i in run_ids]\n", "param_table = pd.DataFrame({Filename: filenames}, index=run_ids).rename_axis(\n", - " index='run_id'\n", + " index=\"run_id\"\n", ")\n", "param_table" ] @@ -232,7 +232,8 @@ "def merge(*data):\n", " return sum(data)\n", "\n", - "graph = pipeline.reduce(func=merge, name='merged').get('merged')\n", + "\n", + "graph = pipeline.reduce(func=merge, name=\"merged\").get(\"merged\")\n", "graph.visualize()" ] }, @@ -297,8 +298,6 @@ "source": [ "## Grouping intermediate results based on secondary parameters\n", "\n", - "**Cyclebane and Sciline do not support `groupby` yet, this is work in progress so this example is not functional yet.**\n", - "\n", "This chapter illustrates how to implement *groupby* operations with Sciline.\n", "\n", "Continuing from the examples for *map* and *reduce*, we can introduce a secondary parameter in the table, such as the material of the sample:" @@ -310,27 +309,35 @@ "metadata": {}, "outputs": [], "source": [ - "Material = NewType('Material', str)\n", - "\n", "run_ids = [102, 103, 104, 105]\n", - "sample = ['diamond', 'graphite', 'graphite', 'graphite']\n", - "filenames = [f'file{i}.txt' for i in run_ids]\n", + "sample = [\"diamond\", \"graphite\", \"graphite\", \"graphite\"]\n", + "filenames = [f\"file{i}.txt\" for i in run_ids]\n", "param_table = pd.DataFrame(\n", - " {Filename: filenames, Material: sample}, index=run_ids\n", - ").rename_axis(index='run_id')\n", + " {Filename: filenames, \"Material\": sample}, index=run_ids\n", + ").rename_axis(index=\"run_id\")\n", "param_table" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We then group the results by sample material,\n", + "and reduce the data using the same merge function as before.\n", + "\n", + "The end goal being to obtain two end results; one for each material." + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "grouped = base.map(param_table).groupby('Material').reduce(\n", - " key=Result,\n", - " func=merge,\n", - " name=\"merged\"\n", + "grouped = (\n", + " base.map(param_table)\n", + " .groupby(\"Material\")\n", + " .reduce(key=Result, func=merge, name=\"merged\")\n", ")" ] }, @@ -357,22 +364,12 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Grouping early in the graph" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "run_ids = [102, 103, 104, 105]\n", - "sample = ['diamond', 'graphite', 'graphite', 'graphite']\n", - "filenames = [f'file{i}.txt' for i in run_ids]\n", - "param_table = pd.DataFrame(\n", - " {Filename: filenames, 'Material': sample}, index=run_ids\n", - ").rename_axis(index='run_id')\n", - "param_table" + "### Grouping early in the graph\n", + "\n", + "Sometimes, it is also desirable to apply grouping earlier in the pipeline graph.\n", + "In this example, we wish to combine the raw data before cleaning and computing the result.\n", + "\n", + "The function that merges the raw data needs to merge both data and metadata:" ] }, { @@ -393,13 +390,32 @@ " return out" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The grouped graph is now just taking the part that leads to `RawData` (via `base[RawData]`) and dropping everything downstream." + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "grouped = base[RawData].map(param_table).groupby('Material').reduce(key=RawData, func=merge_raw, name=\"merged\")" + "grouped = (\n", + " base[RawData]\n", + " .map(param_table)\n", + " .groupby(\"Material\")\n", + " .reduce(key=RawData, func=merge_raw, name=\"merged\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Visualizing the graph shows two `merged` nodes; one for each material:" ] }, { @@ -411,6 +427,14 @@ "grouped.visualize(sciline.get_mapped_node_names(grouped, \"merged\"))" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The task is now to reattach this grouped grap to the lower part of our pipeline.\n", + "Since the base graph has a single `RawData`, we first need to map it to the two possible materials that are left after grouping." + ] + }, { "cell_type": "code", "execution_count": null, @@ -420,15 +444,25 @@ "new = base.copy()\n", "new[RawData] = None\n", "\n", + "# Get the list of materials left after grouping\n", + "unique_materials = sciline.get_mapped_node_names(grouped, \"merged\").index\n", + "\n", "mapped = new.map(\n", " pd.DataFrame(\n", - " {RawData: [1, 2], 'Material': ['diamond', 'graphite']}\n", - " ).set_index('Material')\n", + " {RawData: [None] * len(unique_materials), \"Material\": unique_materials}\n", + " ).set_index(\"Material\")\n", ")\n", "\n", "mapped.visualize(sciline.get_mapped_node_names(mapped, Result))" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can now attach the top part of the graph to the bottom one:" + ] + }, { "cell_type": "code", "execution_count": null, @@ -456,21 +490,6 @@ "sciline.compute_mapped(mapped, Result)" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Future releases of Sciline will support a `groupby` operation, roughly as follows:\n", - "\n", - "```python\n", - "pipeline = base.map(param_table).groupby(Material).reduce(func=merge)\n", - "```\n", - "\n", - "We can then compute the merged result, grouped by the value of `Material`.\n", - "Note how the initial steps of the computation depend on the `run_id` index name, while later steps depend on `Material`, a new index name defined by the `groupby` operation.\n", - "The files for each run ID have been grouped by their material and then merged." - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -595,11 +614,11 @@ "pl = (\n", " base.map({Param1: [1, 4, 9]})\n", " .map({Param2: [1, 2]})\n", - " .reduce(func=gather, name='reduce_1', index='dim_1')\n", - " .reduce(func=gather, name='reduce_0')\n", + " .reduce(func=gather, name=\"reduce_1\", index=\"dim_1\")\n", + " .reduce(func=gather, name=\"reduce_0\")\n", ")\n", "\n", - "pl.visualize('reduce_0')" + "pl.visualize(\"reduce_0\")" ] }, { @@ -615,7 +634,7 @@ "metadata": {}, "outputs": [], "source": [ - "pl.compute('reduce_0')" + "pl.compute(\"reduce_0\")" ] }, { @@ -635,9 +654,9 @@ "pl = (\n", " base.map({Param1: [1, 4, 9]})\n", " .map({Param2: [1, 2]})\n", - " .reduce(func=gather, name='reduce_both')\n", + " .reduce(func=gather, name=\"reduce_both\")\n", ")\n", - "pl.visualize('reduce_both')" + "pl.visualize(\"reduce_both\")" ] } ], From 24c5f2dc58f0f1c3a63e44d8885c09b1830471bb Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Wed, 10 Sep 2025 18:12:40 +0200 Subject: [PATCH 08/11] add GroupbyGraph --- src/sciline/data_graph.py | 41 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/sciline/data_graph.py b/src/sciline/data_graph.py index 352f65e8..6dbbeb7a 100644 --- a/src/sciline/data_graph.py +++ b/src/sciline/data_graph.py @@ -10,6 +10,7 @@ import cyclebane as cb import networkx as nx +from cyclebane.graph import GroupbyGraph as CbGroupbyGraph from cyclebane.node_values import IndexName, IndexValue from ._provider import ArgSpec, Provider, ToProvider, _bind_free_typevars @@ -220,9 +221,11 @@ def groupby(self: T, node: Key) -> T: : A new graph that groups mapped nodes by the given key. This graph is not meant to be executed directly, but to be further processed via - :meth:`reduce`. + :meth:`GroupbyGraph.reduce`. """ - return self._from_cyclebane(self._cbgraph.groupby(node)) + return GroupbyGraph( + graph=self._cbgraph.groupby(node), graph_maker=self._from_cyclebane + ) def reduce(self: T, *, func: Callable[..., Any], **kwargs: Any) -> T: """Reduce the outputs of a mapped graph into a single value and provider. @@ -267,6 +270,40 @@ def visualize_data_graph(self, **kwargs: Any) -> graphviz.Digraph: return dot +class GroupbyGraph: + """ + A graph that has been grouped by a specific index. + This is a specialized graph that is used to represent the result of a groupby + operation. It allows for operations on the grouped data, + such as aggregation or summarization. + """ + + def __init__( + self, graph: CbGroupbyGraph, graph_maker: Callable[..., DataGraph] + ) -> None: + self._cbgraph = graph + # We forward the constructor so this can be used by both DataGraph and Pipeline + self._graph_maker = graph_maker + + def reduce(self, *, func: Callable[..., Any], **kwargs: Any) -> DataGraph: + """Reduce the grouped node in the graph group by group, so that it results in a + single value and provider per group. + + Parameters + ---------- + func: + Function that takes the values to reduce and returns a single value. + kwargs: + Forwarded to :meth:`cyclebane.GroupbyGraph.reduce`. + + Returns + ------- + : + A new :class:`DataGraph` with reduced grouped nodes. + """ + return self._graph_maker(self._cbgraph.reduce(attrs={'reduce': func}, **kwargs)) + + _no_value = object() From 33ad7d4fdaf32c4687f910c987d663ebc3451401 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Wed, 10 Sep 2025 18:14:36 +0200 Subject: [PATCH 09/11] formatting --- docs/user-guide/parameter-tables.ipynb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/user-guide/parameter-tables.ipynb b/docs/user-guide/parameter-tables.ipynb index 11df4e5a..741f6ce1 100644 --- a/docs/user-guide/parameter-tables.ipynb +++ b/docs/user-guide/parameter-tables.ipynb @@ -431,7 +431,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The task is now to reattach this grouped grap to the lower part of our pipeline.\n", + "The task is now to reattach this grouped graph to the lower part of our pipeline.\n", "Since the base graph has a single `RawData`, we first need to map it to the two possible materials that are left after grouping." ] }, @@ -675,8 +675,7 @@ "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.7" + "pygments_lexer": "ipython3" } }, "nbformat": 4, From ebe9fa12e2593acbca2027b09315be74cffabde5 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Wed, 17 Sep 2025 23:12:57 +0200 Subject: [PATCH 10/11] copy args from cyclebane groupbygraph --- src/sciline/data_graph.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/src/sciline/data_graph.py b/src/sciline/data_graph.py index 6dbbeb7a..cf2cb78c 100644 --- a/src/sciline/data_graph.py +++ b/src/sciline/data_graph.py @@ -4,7 +4,7 @@ from __future__ import annotations import itertools -from collections.abc import Callable, Generator, Iterable, Mapping +from collections.abc import Callable, Generator, Hashable, Iterable, Mapping from types import NoneType from typing import TYPE_CHECKING, Any, TypeVar, get_args @@ -285,7 +285,14 @@ def __init__( # We forward the constructor so this can be used by both DataGraph and Pipeline self._graph_maker = graph_maker - def reduce(self, *, func: Callable[..., Any], **kwargs: Any) -> DataGraph: + def reduce( + self, + *, + func: Callable[..., Any], + key: None | Hashable = None, + name: None | Hashable = None, + attrs: None | dict[str, Any] = None, + ) -> DataGraph: """Reduce the grouped node in the graph group by group, so that it results in a single value and provider per group. @@ -293,15 +300,33 @@ def reduce(self, *, func: Callable[..., Any], **kwargs: Any) -> DataGraph: ---------- func: Function that takes the values to reduce and returns a single value. - kwargs: - Forwarded to :meth:`cyclebane.GroupbyGraph.reduce`. + key: + The name of the source node to reduce. This is the original name prior to + mapping. If not given, tries to find a unique sink node. + See :meth:`cyclebane.Graph.reduce`. + name: + The name of the new node. If not given, a unique name is generated. + See :meth:`cyclebane.Graph.reduce`. + attrs: + Attributes to set on the new node(s). See :meth:`cyclebane.Graph.reduce`. Returns ------- : A new :class:`DataGraph` with reduced grouped nodes. """ - return self._graph_maker(self._cbgraph.reduce(attrs={'reduce': func}, **kwargs)) + cbattrs = {'reduce': func} + if attrs is not None: + if "func" in attrs: + raise ValueError( + "The 'func' attribute cannot be set via 'attrs'. " + "Use the 'func' argument instead." + ) + cbattrs.update(attrs) + + return self._graph_maker( + self._cbgraph.reduce(attrs=cbattrs, key=key, name=name) + ) _no_value = object() From 378958bc964453865049bf2a8edbbac65db85239 Mon Sep 17 00:00:00 2001 From: Neil Vaytet Date: Thu, 18 Sep 2025 08:46:26 +0200 Subject: [PATCH 11/11] revert quotes formatting --- docs/user-guide/parameter-tables.ipynb | 89 +++++++++++++------------- 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/docs/user-guide/parameter-tables.ipynb b/docs/user-guide/parameter-tables.ipynb index 741f6ce1..dde7e028 100644 --- a/docs/user-guide/parameter-tables.ipynb +++ b/docs/user-guide/parameter-tables.ipynb @@ -37,19 +37,19 @@ "import sciline\n", "\n", "_fake_filesytem = {\n", - " \"file102.txt\": [1, 2, float(\"nan\"), 3],\n", - " \"file103.txt\": [1, 2, 3, 4],\n", - " \"file104.txt\": [1, 2, 3, 4, 5],\n", - " \"file105.txt\": [1, 2, 3],\n", + " 'file102.txt': [1, 2, float('nan'), 3],\n", + " 'file103.txt': [1, 2, 3, 4],\n", + " 'file104.txt': [1, 2, 3, 4, 5],\n", + " 'file105.txt': [1, 2, 3],\n", "}\n", "\n", "# 1. Define domain types\n", "\n", - "Filename = NewType(\"Filename\", str)\n", - "RawData = NewType(\"RawData\", dict)\n", - "CleanedData = NewType(\"CleanedData\", list)\n", - "ScaleFactor = NewType(\"ScaleFactor\", float)\n", - "Result = NewType(\"Result\", float)\n", + "Filename = NewType('Filename', str)\n", + "RawData = NewType('RawData', dict)\n", + "CleanedData = NewType('CleanedData', list)\n", + "ScaleFactor = NewType('ScaleFactor', float)\n", + "Result = NewType('Result', float)\n", "\n", "\n", "# 2. Define providers\n", @@ -59,14 +59,14 @@ " \"\"\"Load the data from the filename.\"\"\"\n", "\n", " data = _fake_filesytem[filename]\n", - " return RawData({\"data\": data, \"meta\": {\"filename\": filename}})\n", + " return RawData({'data': data, 'meta': {'filename': filename}})\n", "\n", "\n", "def clean(raw_data: RawData) -> CleanedData:\n", " \"\"\"Clean the data, removing NaNs.\"\"\"\n", " import math\n", "\n", - " return CleanedData([x for x in raw_data[\"data\"] if not math.isnan(x)])\n", + " return CleanedData([x for x in raw_data['data'] if not math.isnan(x)])\n", "\n", "\n", "def process(data: CleanedData, param: ScaleFactor) -> Result:\n", @@ -95,7 +95,7 @@ "metadata": {}, "outputs": [], "source": [ - "base.visualize(Result, graph_attr={\"rankdir\": \"LR\"})" + "base.visualize(Result, graph_attr={'rankdir': 'LR'})" ] }, { @@ -114,9 +114,9 @@ "import pandas as pd\n", "\n", "run_ids = [102, 103, 104, 105]\n", - "filenames = [f\"file{i}.txt\" for i in run_ids]\n", + "filenames = [f'file{i}.txt' for i in run_ids]\n", "param_table = pd.DataFrame({Filename: filenames}, index=run_ids).rename_axis(\n", - " index=\"run_id\"\n", + " index='run_id'\n", ")\n", "param_table" ] @@ -233,7 +233,7 @@ " return sum(data)\n", "\n", "\n", - "graph = pipeline.reduce(func=merge, name=\"merged\").get(\"merged\")\n", + "graph = pipeline.reduce(func=merge, name='merged').get('merged')\n", "graph.visualize()" ] }, @@ -310,11 +310,11 @@ "outputs": [], "source": [ "run_ids = [102, 103, 104, 105]\n", - "sample = [\"diamond\", \"graphite\", \"graphite\", \"graphite\"]\n", - "filenames = [f\"file{i}.txt\" for i in run_ids]\n", + "sample = ['diamond', 'graphite', 'graphite', 'graphite']\n", + "filenames = [f'file{i}.txt' for i in run_ids]\n", "param_table = pd.DataFrame(\n", - " {Filename: filenames, \"Material\": sample}, index=run_ids\n", - ").rename_axis(index=\"run_id\")\n", + " {Filename: filenames, 'Material': sample}, index=run_ids\n", + ").rename_axis(index='run_id')\n", "param_table" ] }, @@ -336,8 +336,8 @@ "source": [ "grouped = (\n", " base.map(param_table)\n", - " .groupby(\"Material\")\n", - " .reduce(key=Result, func=merge, name=\"merged\")\n", + " .groupby('Material')\n", + " .reduce(key=Result, func=merge, name='merged')\n", ")" ] }, @@ -347,7 +347,7 @@ "metadata": {}, "outputs": [], "source": [ - "graph = grouped.get(sciline.get_mapped_node_names(grouped, \"merged\"))\n", + "graph = grouped.get(sciline.get_mapped_node_names(grouped, 'merged'))\n", "graph.visualize()" ] }, @@ -357,7 +357,7 @@ "metadata": {}, "outputs": [], "source": [ - "sciline.compute_mapped(grouped, \"merged\")" + "sciline.compute_mapped(grouped, 'merged')" ] }, { @@ -369,7 +369,7 @@ "Sometimes, it is also desirable to apply grouping earlier in the pipeline graph.\n", "In this example, we wish to combine the raw data before cleaning and computing the result.\n", "\n", - "The function that merges the raw data needs to merge both data and metadata:" + "The function that merges the raw data needs to merge both data and metadata: " ] }, { @@ -380,13 +380,13 @@ "source": [ "# Define function to merge RawData\n", "def merge_raw(*das):\n", - " out = {\"data\": [], \"meta\": {}}\n", + " out = {'data': [], 'meta': {}}\n", " for da in das:\n", - " out[\"data\"].extend(da[\"data\"])\n", - " for k, v in da[\"meta\"].items():\n", - " if k not in out[\"meta\"]:\n", - " out[\"meta\"][k] = []\n", - " out[\"meta\"][k].append(v)\n", + " out['data'].extend(da['data'])\n", + " for k, v in da['meta'].items():\n", + " if k not in out['meta']:\n", + " out['meta'][k] = []\n", + " out['meta'][k].append(v)\n", " return out" ] }, @@ -406,8 +406,8 @@ "grouped = (\n", " base[RawData]\n", " .map(param_table)\n", - " .groupby(\"Material\")\n", - " .reduce(key=RawData, func=merge_raw, name=\"merged\")\n", + " .groupby('Material')\n", + " .reduce(key=RawData, func=merge_raw, name='merged')\n", ")" ] }, @@ -424,7 +424,7 @@ "metadata": {}, "outputs": [], "source": [ - "grouped.visualize(sciline.get_mapped_node_names(grouped, \"merged\"))" + "grouped.visualize(sciline.get_mapped_node_names(grouped, 'merged'))" ] }, { @@ -445,12 +445,12 @@ "new[RawData] = None\n", "\n", "# Get the list of materials left after grouping\n", - "unique_materials = sciline.get_mapped_node_names(grouped, \"merged\").index\n", + "unique_materials = sciline.get_mapped_node_names(grouped, 'merged').index\n", "\n", "mapped = new.map(\n", " pd.DataFrame(\n", - " {RawData: [None] * len(unique_materials), \"Material\": unique_materials}\n", - " ).set_index(\"Material\")\n", + " {RawData: [None] * len(unique_materials), 'Material': unique_materials}\n", + " ).set_index('Material')\n", ")\n", "\n", "mapped.visualize(sciline.get_mapped_node_names(mapped, Result))" @@ -469,7 +469,7 @@ "metadata": {}, "outputs": [], "source": [ - "mapped[RawData] = grouped[\"merged\"]" + "mapped[RawData] = grouped['merged']" ] }, { @@ -614,11 +614,11 @@ "pl = (\n", " base.map({Param1: [1, 4, 9]})\n", " .map({Param2: [1, 2]})\n", - " .reduce(func=gather, name=\"reduce_1\", index=\"dim_1\")\n", - " .reduce(func=gather, name=\"reduce_0\")\n", + " .reduce(func=gather, name='reduce_1', index='dim_1')\n", + " .reduce(func=gather, name='reduce_0')\n", ")\n", "\n", - "pl.visualize(\"reduce_0\")" + "pl.visualize('reduce_0')" ] }, { @@ -634,7 +634,7 @@ "metadata": {}, "outputs": [], "source": [ - "pl.compute(\"reduce_0\")" + "pl.compute('reduce_0')" ] }, { @@ -654,9 +654,9 @@ "pl = (\n", " base.map({Param1: [1, 4, 9]})\n", " .map({Param2: [1, 2]})\n", - " .reduce(func=gather, name=\"reduce_both\")\n", + " .reduce(func=gather, name='reduce_both')\n", ")\n", - "pl.visualize(\"reduce_both\")" + "pl.visualize('reduce_both')" ] } ], @@ -675,7 +675,8 @@ "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", - "pygments_lexer": "ipython3" + "pygments_lexer": "ipython3", + "version": "3.12.7" } }, "nbformat": 4,