diff --git a/mack/__init__.py b/mack/__init__.py index 7c6fa49..12c3659 100644 --- a/mack/__init__.py +++ b/mack/__init__.py @@ -394,3 +394,33 @@ def with_md5_cols( if type(df) == DeltaTable: df = df.toDF() return df.withColumn(output_col_name, md5(concat_ws("||", *cols))) + + +def check_data( + df: Union[DeltaTable, DataFrame], + check_invariants: Union[str, List[str]] +): + condition = None + if type(df) == DeltaTable: + df = df.toDF() + if type(check_invariants) == str: + condition = check_invariants + elif type(check_invariants) == list: + check_invariants = ["(" + i + ")" for i in check_invariants] + condition = " AND ".join(check_invariants) + print(f"For the specified check invariant(s) '{condition}' {df.filter(condition).count()} rows are being affected.") + return df.filter(condition) + + +def drop_by_invariants( + dt: DeltaTable, + delete_invariants: Union[str, List[str]] +): + condition = None + if type(delete_invariants) == str: + condition = delete_invariants + elif type(delete_invariants) == list: + delete_invariants = ["(" + i + ")" for i in delete_invariants] + condition = " AND ".join(delete_invariants) + print(f"{dt.toDF().filter(condition).count()} rows are being deleted from the delta table.") + dt.delete(condition)