From a7500951bb4ecf4827534f4311b25df1836ca9be Mon Sep 17 00:00:00 2001 From: Souvik Pratiher Date: Sat, 31 Dec 2022 22:22:36 +0530 Subject: [PATCH] get all composite key candidates function added --- README.md | 20 ++++++++++++++++++++ mack/__init__.py | 29 ++++++++++++++++++++++++++++- tests/test_public_interface.py | 26 ++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e1dd0fd..5a713ca 100644 --- a/README.md +++ b/README.md @@ -409,6 +409,26 @@ Suppose you have the following Delta table: Running `mack.find_composite_key_candidates(delta_table)` on that table will return `["col1", "col3"]`. +## Find All possible valid Composite Key Candidates in the Delta table + +The `find_all_composite_key_combos` function helps you find all of the possible valid composite key candidates that +uniquely identifies the rows your Delta table. It returns a list of column combinations that can be used as a +composite key. + +Suppose you have the following Delta table: + +``` ++----+----+----+ +|x |y |z | ++----+----+----+ +| 1| 1| 1| +| 2| 1| 1| +| 3| 2| 1| ++----+----+----+ +``` + +Running `mack.find_all_composite_key_combos(delta_table)` on that table will return `["x", "x,y", "x,z", "x,y,z"]`. + ## Append md5 column The `with_md5_cols` function appends a `md5` hash of specified columns to the DataFrame. This can be used as a unique key if the selected columns form a composite key. diff --git a/mack/__init__.py b/mack/__init__.py index 7c6fa49..2453bbb 100644 --- a/mack/__init__.py +++ b/mack/__init__.py @@ -3,7 +3,7 @@ from delta import DeltaTable import pyspark -from pyspark.sql.functions import count, col, row_number, md5, concat_ws +from pyspark.sql.functions import count, col, row_number, md5, concat_ws, collect_set, size, lit from pyspark.sql.window import Window from pyspark.sql.dataframe import DataFrame @@ -384,6 +384,33 @@ def find_composite_key_candidates( return list(df_col_excluded.select(*c).columns) +def find_all_composite_key_combos( + df: Union[DeltaTable, DataFrame], exclude_cols: List[str] = None +): + if type(df) == DeltaTable: + df = df.toDF() + if exclude_cols is None: + exclude_cols = [] + df_col_excluded = df.drop(*exclude_cols) + col_select_condition = df_col_excluded.distinct().count() + initcols = df_col_excluded.columns + for i in range(len(initcols)+1): + for c in list(combinations(initcols, i+2)): + df_col_excluded = df_col_excluded.withColumn(','.join(c), concat_ws(',', *c)) + finalcols = df_col_excluded.columns + exprs = [size(collect_set(x)).alias(x) for x in finalcols] + df_col_excluded = df_col_excluded \ + .withColumn("column_combos ->", lit("distinct_row_counts ->")) \ + .groupBy("column_combos ->") \ + .agg(*exprs) + columns = [ + column for column in df_col_excluded.columns if + df_col_excluded.select(column).collect()[0][0] == col_select_condition + ] + df_col_excluded.select("column_combos ->", *columns).show(truncate=False) + return columns + + def with_md5_cols( df: Union[DeltaTable, DataFrame], cols: List[str], diff --git a/tests/test_public_interface.py b/tests/test_public_interface.py index 9a4cbfc..578b145 100644 --- a/tests/test_public_interface.py +++ b/tests/test_public_interface.py @@ -669,6 +669,32 @@ def test_find_composite_key(tmp_path): assert composite_keys == expected_keys +def test_find_all_composite_key_combos(tmp_path): + path = f"{tmp_path}/find_all_composite_key_combos" + data = [ + (1, 1, 1), + (2, 1, 1), + (3, 2, 1), + ] + df = spark.createDataFrame( + data, + [ + "x", + "y", + "z", + ], + ) + df.write.format("delta").save(path) + + delta_table = DeltaTable.forPath(spark, path) + + composite_keys = mack.find_all_composite_key_combos(delta_table) + + expected_keys = ['x', 'x,y', 'x,z', 'x,y,z'] + + assert composite_keys == expected_keys + + def test_find_composite_key_with_value_error(tmp_path): path = f"{tmp_path}/find_composite_key" data = [