diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..fb38e850d Binary files /dev/null and b/.DS_Store differ diff --git a/airbyte_cdk/test/utils/transforms/test_cleaning.py b/airbyte_cdk/test/utils/transforms/test_cleaning.py new file mode 100644 index 000000000..bc05c12ae --- /dev/null +++ b/airbyte_cdk/test/utils/transforms/test_cleaning.py @@ -0,0 +1,142 @@ +"""Unit tests for cleaning transforms.""" +import pytest +from airbyte_cdk.utils.transforms.cleaning import ( + to_lower, + strip_whitespace, + squash_whitespace, + normalize_unicode, + remove_punctuation, + map_values, + cast_numeric, +) + +def test_to_lower(): + """Test string lowercasing function.""" + # Test normal cases + assert to_lower("Hello") == "hello" + assert to_lower("HELLO") == "hello" + assert to_lower("HeLLo") == "hello" + + # Test with spaces and special characters + assert to_lower("Hello World!") == "hello world!" + assert to_lower("Hello123") == "hello123" + + # Test empty and None + assert to_lower("") == "" + assert to_lower(None) is None + +def test_strip_whitespace(): + """Test whitespace stripping function.""" + # Test normal cases + assert strip_whitespace(" hello ") == "hello" + assert strip_whitespace("hello") == "hello" + + # Test with tabs and newlines + assert strip_whitespace("\thello\n") == "hello" + assert strip_whitespace(" hello\n world ") == "hello\n world" + + # Test empty and None + assert strip_whitespace(" ") == "" + assert strip_whitespace("") == "" + assert strip_whitespace(None) is None + +def test_squash_whitespace(): + """Test whitespace squashing function.""" + # Test normal cases + assert squash_whitespace("hello world") == "hello world" + assert squash_whitespace(" hello world ") == "hello world" + + # Test with tabs and newlines + assert squash_whitespace("hello\n\nworld") == "hello world" + assert squash_whitespace("hello\t\tworld") == "hello world" + assert squash_whitespace("\n hello \t world \n") == "hello world" + + # Test empty and None + assert squash_whitespace(" ") == "" + assert squash_whitespace("") == "" + assert squash_whitespace(None) is None + +def test_normalize_unicode(): + """Test unicode normalization function.""" + # Test normal cases + assert normalize_unicode("hello") == "hello" + + # Test composed characters + assert normalize_unicode("café") == "café" # Composed 'é' + + # Test decomposed characters + decomposed = "cafe\u0301" # 'e' with combining acute accent + assert normalize_unicode(decomposed) == "café" # Should normalize to composed form + + # Test different normalization forms + assert normalize_unicode("café", form="NFD") != normalize_unicode("café", form="NFC") + + # Test empty and None + assert normalize_unicode("") == "" + assert normalize_unicode(None) is None + +def test_remove_punctuation(): + """Test punctuation removal function.""" + # Test normal cases + assert remove_punctuation("hello, world!") == "hello world" + assert remove_punctuation("hello.world") == "helloworld" + + # Test with multiple punctuation marks + assert remove_punctuation("hello!!! world???") == "hello world" + assert remove_punctuation("hello@#$%world") == "helloworld" + + # Test with unicode punctuation + assert remove_punctuation("hello—world") == "helloworld" + assert remove_punctuation("«hello»") == "hello" + + # Test empty and None + assert remove_punctuation("") == "" + assert remove_punctuation(None) is None + +def test_map_values(): + """Test value mapping function.""" + mapping = {"a": 1, "b": 2, "c": 3} + + # Test normal cases + assert map_values("a", mapping) == 1 + assert map_values("b", mapping) == 2 + + # Test with default value + assert map_values("x", mapping) is None + assert map_values("x", mapping, default=0) == 0 + + # Test with different value types + mixed_mapping = {1: "one", "two": 2, None: "null"} + assert map_values(1, mixed_mapping) == "one" + assert map_values(None, mixed_mapping) == "null" + +def test_cast_numeric(): + """Test numeric casting function.""" + # Test successful casts + assert cast_numeric("123") == 123 + assert cast_numeric("123.45") == 123.45 + assert cast_numeric(123) == 123 + assert cast_numeric(123.45) == 123.45 + + # Test integers vs floats + assert isinstance(cast_numeric("123"), int) + assert isinstance(cast_numeric("123.45"), float) + + # Test empty values + assert cast_numeric(None) is None + assert cast_numeric("", on_error="none") is None # Need to specify on_error="none" to get None for empty string + assert cast_numeric(" ", on_error="none") is None # Need to specify on_error="none" to get None for whitespace + + # Test empty values with default behavior (on_error="ignore") + assert cast_numeric("") == "" + assert cast_numeric(" ") == " " + + # Test error handling modes + non_numeric = "abc" + assert cast_numeric(non_numeric, on_error="ignore") == non_numeric + assert cast_numeric(non_numeric, on_error="none") is None + assert cast_numeric(non_numeric, on_error="default", default=0) == 0 + + # Test error raising + with pytest.raises(Exception): + cast_numeric(non_numeric, on_error="raise") \ No newline at end of file diff --git a/airbyte_cdk/test/utils/transforms/test_date.py b/airbyte_cdk/test/utils/transforms/test_date.py new file mode 100644 index 000000000..af8fb6f25 --- /dev/null +++ b/airbyte_cdk/test/utils/transforms/test_date.py @@ -0,0 +1,72 @@ +"""Unit tests for date transforms.""" +from datetime import datetime + +from airbyte_cdk.utils.transforms.date import ( + try_parse_date, + extract_date_parts, + floor_to_month, + ceil_to_month, +) + +def test_try_parse_date(): + """Test date parsing function.""" + # Test with datetime object + dt = datetime(2023, 1, 15) + assert try_parse_date(dt) == dt + + # Test with non-date object + assert try_parse_date("2023-01-15") is None + assert try_parse_date(123) is None + assert try_parse_date(None) is None + +def test_extract_date_parts(): + """Test date parts extraction function.""" + # Test with valid datetime + dt = datetime(2023, 1, 15) # Sunday + parts = extract_date_parts(dt) + assert parts["year"] == 2023 + assert parts["month"] == 1 + assert parts["day"] == 15 + assert parts["dow"] == 6 # Sunday is 6 + + # Test with invalid input + parts = extract_date_parts(None) + assert all(v is None for v in parts.values()) + + parts = extract_date_parts("not a date") + assert all(v is None for v in parts.values()) + +def test_floor_to_month(): + """Test floor to month function.""" + # Test normal cases + dt = datetime(2023, 1, 15) + assert floor_to_month(dt) == datetime(2023, 1, 1) + + dt = datetime(2023, 12, 31) + assert floor_to_month(dt) == datetime(2023, 12, 1) + + # Test first day of month + dt = datetime(2023, 1, 1) + assert floor_to_month(dt) == dt + + # Test with invalid input + assert floor_to_month(None) is None + assert floor_to_month("not a date") is None + +def test_ceil_to_month(): + """Test ceil to month function.""" + # Test normal cases + dt = datetime(2023, 1, 15) + assert ceil_to_month(dt) == datetime(2023, 2, 1) + + # Test end of year + dt = datetime(2023, 12, 15) + assert ceil_to_month(dt) == datetime(2024, 1, 1) + + # Test first day of month + dt = datetime(2023, 1, 1) + assert ceil_to_month(dt) == datetime(2023, 2, 1) + + # Test with invalid input + assert ceil_to_month(None) is None + assert ceil_to_month("not a date") is None \ No newline at end of file diff --git a/airbyte_cdk/test/utils/transforms/test_impute.py b/airbyte_cdk/test/utils/transforms/test_impute.py new file mode 100644 index 000000000..44724cf88 --- /dev/null +++ b/airbyte_cdk/test/utils/transforms/test_impute.py @@ -0,0 +1,117 @@ +"""Unit tests for imputation transforms.""" +import pytest +from airbyte_cdk.utils.transforms.impute import ( + _numeric_skewness, + choose_imputation_strategy, + compute_imputation_value, + fill_nulls_column, + fill_nulls_record, + ImputationReport, +) + +def test_numeric_skewness(): + """Test skewness calculation function.""" + # Test normal cases + assert _numeric_skewness([1, 2, 3]) == pytest.approx(0.0, abs=1e-10) # Symmetric data + assert _numeric_skewness([1, 1, 2]) > 0 # Positive skew + assert _numeric_skewness([1, 2, 2]) < 0 # Negative skew + + # Test edge cases + assert _numeric_skewness([1, 1]) == 0.0 # Less than 3 values + assert _numeric_skewness([1, 1, 1]) == 0.0 # No variance + + # Test with floating point values + assert _numeric_skewness([1.0, 2.0, 3.0]) == pytest.approx(0.0, abs=1e-10) + +def test_choose_imputation_strategy(): + """Test imputation strategy selection function.""" + # Test numeric data + assert choose_imputation_strategy([1, 2, 3]) == "mean" # Low skew + assert choose_imputation_strategy([1, 1, 10]) == "median" # High skew + + # Test categorical data + assert choose_imputation_strategy(["a", "b", "c"], numeric=False) == "mode" + assert choose_imputation_strategy(["a", "a", "b"]) == "mode" # Autodetect non-numeric + + # Test repeated values with custom threshold + assert choose_imputation_strategy([1, 1, 1, 2], unique_ratio_threshold=0.6) == "mode" # Low unique ratio (0.5 < 0.6) + + # Test empty and None values + assert choose_imputation_strategy([]) == "mode" + assert choose_imputation_strategy([None, None]) == "mode" + + # Test with mixed types + assert choose_imputation_strategy([1, "2", 3]) == "mode" # Non-numeric detected + +def test_compute_imputation_value(): + """Test imputation value computation function.""" + # Test mean strategy + assert compute_imputation_value([1, 2, 3], "mean") == 2.0 + assert compute_imputation_value([1.5, 2.5, 3.5], "mean") == 2.5 + + # Test median strategy + assert compute_imputation_value([1, 2, 3, 4], "median") == 2.5 + assert compute_imputation_value([1, 2, 3], "median") == 2.0 + + # Test mode strategy + assert compute_imputation_value([1, 1, 2], "mode") == 1 + assert compute_imputation_value(["a", "a", "b"], "mode") == "a" + + # Test with None values + assert compute_imputation_value([1, None, 3], "mean") == 2.0 + assert compute_imputation_value([None, None], "mean") is None + + # Test invalid strategy + with pytest.raises(ValueError): + compute_imputation_value([1, 2, 3], "invalid") + +def test_fill_nulls_column(): + """Test column null filling function.""" + # Test numeric data + values, report = fill_nulls_column([1, None, 3]) + assert values == [1, 2.0, 3] + assert report.strategy == "mean" + assert report.value_used == 2.0 + + # Test categorical data + values, report = fill_nulls_column(["a", None, "a"]) + assert values == ["a", "a", "a"] + assert report.strategy == "mode" + assert report.value_used == "a" + + # Test explicit strategy + values, report = fill_nulls_column([1, None, 3], explicit_strategy="median") + assert values == [1, 2, 3] + assert report.strategy == "median" + + # Test all None values + values, report = fill_nulls_column([None, None]) + assert values == [None, None] + assert report.value_used is None + +def test_fill_nulls_record(): + """Test record null filling function.""" + # Test basic record filling + record = {"a": 1, "b": None, "c": "x"} + samples = {"a": [1, 2, 3], "b": [4, 5, 6], "c": ["x", "y", "x"]} + filled, reports = fill_nulls_record(record, ["a", "b", "c"], samples) + + assert filled["a"] == 1 + assert filled["b"] == 5.0 # Mean of samples + assert filled["c"] == "x" + assert len(reports) == 3 + assert all(isinstance(r, ImputationReport) for r in reports) + + # Test with explicit strategies + strategies = {"b": "median"} + filled, reports = fill_nulls_record(record, ["a", "b", "c"], samples, strategies=strategies) + assert filled["b"] == 5.0 # Median of samples + + # Test with empty samples + filled, reports = fill_nulls_record(record, ["a", "b", "c"], {}) + assert filled["b"] is None # No samples to impute from + + # Test with missing columns + filled, reports = fill_nulls_record(record, ["a", "d"], samples) + assert "d" in filled + assert len(reports) == 2 \ No newline at end of file diff --git a/airbyte_cdk/test/utils/transforms/test_math.py b/airbyte_cdk/test/utils/transforms/test_math.py new file mode 100644 index 000000000..a9de95aa7 --- /dev/null +++ b/airbyte_cdk/test/utils/transforms/test_math.py @@ -0,0 +1,129 @@ +"""Unit tests for math transforms.""" +import math +import pytest +from airbyte_cdk.utils.transforms.math import ( + minmax_scale, + zscore, + clip, + winsorize, + log1p_safe, + bucketize, + robust_percentile_scale, +) + +def test_minmax_scale(): + """Test minmax scaling function.""" + # Test normal scaling + assert minmax_scale(5, 0, 10) == 0.5 + assert minmax_scale(5, 0, 10, (0, 100)) == 50.0 + + # Test edge cases + assert minmax_scale(0, 0, 10) == 0.0 + assert minmax_scale(10, 0, 10) == 1.0 + + # Test custom range scaling + assert minmax_scale(5, 0, 10, (-1, 1)) == 0.0 + + # Test when data_max equals data_min (prevents division by zero) + assert minmax_scale(5, 5, 5) == 0.5 # Should return middle of output range + + # Test with float inputs + assert minmax_scale(5.5, 0.0, 10.0) == 0.55 + +def test_zscore(): + """Test z-score calculation function.""" + # Test normal cases + assert zscore(10, 5, 2) == 2.5 # (10 - 5) / 2 + assert zscore(0, 5, 2) == -2.5 # (0 - 5) / 2 + + # Test with zero sigma + assert zscore(10, 5, 0) == 0.0 # Should handle division by zero gracefully + + # Test with float inputs + assert zscore(10.5, 5.0, 2.0) == 2.75 + +def test_clip(): + """Test value clipping function.""" + # Test normal clipping + assert clip(5, 0, 10) == 5 + assert clip(-1, 0, 10) == 0 + assert clip(11, 0, 10) == 10 + + # Test with float values + assert clip(5.5, 0.0, 10.0) == 5.5 + assert clip(-1.5, 0.0, 10.0) == 0.0 + + # Test when low == high + assert clip(5, 3, 3) == 3 + +def test_winsorize(): + """Test winsorization function.""" + # Test normal cases + assert winsorize(5, 0, 10) == 5 + assert winsorize(-1, 0, 10) == 0 + assert winsorize(11, 0, 10) == 10 + + # Test with float values + assert winsorize(5.5, 0.0, 10.0) == 5.5 + + # Test when low == high + assert winsorize(5, 3, 3) == 3 + +def test_log1p_safe(): + """Test safe log1p calculation function.""" + # Test normal cases + assert log1p_safe(0) == 0.0 + assert log1p_safe(math.e - 1) == 1.0 + + # Test negative values > -1 + assert abs(log1p_safe(-0.5) - math.log1p(-0.5)) < 1e-10 + + # Test negative values <= -1 + assert log1p_safe(-2) == -2.0 # Should return input value + + # Test error cases + assert log1p_safe(float('inf')) == float('inf') + +def test_bucketize(): + """Test bucketization function.""" + edges = [0, 10, 20, 30] + + # Test normal cases + assert bucketize(-5, edges) == 0 + assert bucketize(5, edges) == 1 + assert bucketize(15, edges) == 2 + assert bucketize(25, edges) == 3 + assert bucketize(35, edges) == 4 + + # Test edge values + assert bucketize(0, edges) == 0 + assert bucketize(10, edges) == 1 + assert bucketize(20, edges) == 2 + assert bucketize(30, edges) == 3 + + # Test empty edges + assert bucketize(5, []) == 0 + + # Test single edge + assert bucketize(5, [10]) == 0 # 5 ≤ 10, so bucket 0 + assert bucketize(15, [10]) == 1 # 15 > 10, so bucket 1 + +def test_robust_percentile_scale(): + """Test robust percentile scaling function.""" + # Test normal scaling + assert robust_percentile_scale(5, 0, 10) == 0.5 + assert robust_percentile_scale(5, 0, 10, (0, 100)) == 50.0 + + # Test edge cases + assert robust_percentile_scale(0, 0, 10) == 0.0 + assert robust_percentile_scale(10, 0, 10) == 1.0 + + # Test custom range + assert robust_percentile_scale(5, 0, 10, (-1, 1)) == 0.0 + + # Test clipping + assert robust_percentile_scale(-1, 0, 10) == 0.0 # With clipping + assert robust_percentile_scale(-1, 0, 10, clip_outliers=False) < 0.0 # Without clipping + + # Test when high equals low + assert robust_percentile_scale(5, 5, 5) == 0.5 # Should return middle of output range diff --git a/airbyte_cdk/utils/transforms/__init__.py b/airbyte_cdk/utils/transforms/__init__.py new file mode 100644 index 000000000..6c5e3d214 --- /dev/null +++ b/airbyte_cdk/utils/transforms/__init__.py @@ -0,0 +1,29 @@ +from .math import ( + minmax_scale, zscore, clip, winsorize, log1p_safe, + bucketize, robust_percentile_scale +) +from .cleaning import ( + to_lower, strip_whitespace, squash_whitespace, + normalize_unicode, remove_punctuation, map_values, cast_numeric +) +from .date import ( + try_parse_date, extract_date_parts, floor_to_month, ceil_to_month +) +from .impute import ( + ImputationReport, choose_imputation_strategy, + compute_imputation_value, fill_nulls_column, fill_nulls_record +) + +__all__ = [ + # math + "minmax_scale","zscore","clip","winsorize","log1p_safe", + "bucketize","robust_percentile_scale", + # cleaning + "to_lower","strip_whitespace","squash_whitespace", + "normalize_unicode","remove_punctuation","map_values","cast_numeric", + # date + "try_parse_date","extract_date_parts","floor_to_month","ceil_to_month", + # impute + "ImputationReport","choose_imputation_strategy", + "compute_imputation_value","fill_nulls_column","fill_nulls_record", +] diff --git a/airbyte_cdk/utils/transforms/cleaning.py b/airbyte_cdk/utils/transforms/cleaning.py new file mode 100644 index 000000000..12219d02a --- /dev/null +++ b/airbyte_cdk/utils/transforms/cleaning.py @@ -0,0 +1,44 @@ +from __future__ import annotations +from typing import Any, Mapping, Optional, Union +import re, unicodedata + +Number = Union[int, float] + +def to_lower(s: Optional[str]) -> Optional[str]: + return None if s is None else s.lower() + +def strip_whitespace(s: Optional[str]) -> Optional[str]: + return None if s is None else s.strip() + +def squash_whitespace(s: Optional[str]) -> Optional[str]: + if s is None: + return None + return re.sub(r"\s+", " ", s).strip() + +def normalize_unicode(s: Optional[str], form: str="NFKC") -> Optional[str]: + return None if s is None else unicodedata.normalize(form, s) + +_PUNCT_RE = re.compile(r"[^\w\s]", re.UNICODE) +def remove_punctuation(s: Optional[str]) -> Optional[str]: + if s is None: + return None + return _PUNCT_RE.sub("", s) + +def map_values(value: Any, mapping: Mapping[Any, Any], default: Any=None) -> Any: + return mapping.get(value, default) + +def cast_numeric(value: Any, on_error: str="ignore", default: Optional[Number]=None) -> Optional[Number]: + try: + if value is None or (isinstance(value, str) and value.strip() == ""): + raise ValueError("empty") + f = float(value) + i = int(f) + return i if i == f else f + except Exception: + if on_error == "default": + return default + if on_error == "none": + return None + if on_error == "raise": + raise + return value diff --git a/airbyte_cdk/utils/transforms/date.py b/airbyte_cdk/utils/transforms/date.py new file mode 100644 index 000000000..0f2bf8abd --- /dev/null +++ b/airbyte_cdk/utils/transforms/date.py @@ -0,0 +1,28 @@ +from __future__ import annotations +from typing import Any, Dict, Optional + +def try_parse_date(value: Any): + # accept datetime/pandas/pendulum-like objects; else None + if hasattr(value, "year") and hasattr(value, "month") and hasattr(value, "day"): + return value + return None + +def extract_date_parts(dt) -> Dict[str, Optional[int]]: + try: + return {"year": dt.year, "month": dt.month, "day": dt.day, "dow": int(dt.weekday())} + except Exception: + return {"year": None, "month": None, "day": None, "dow": None} + +def floor_to_month(dt): + try: + return dt.replace(day=1) + except Exception: + return None + +def ceil_to_month(dt): + try: + if dt.month == 12: + return dt.replace(year=dt.year + 1, month=1, day=1) + return dt.replace(month=dt.month + 1, day=1) + except Exception: + return None diff --git a/airbyte_cdk/utils/transforms/impute.py b/airbyte_cdk/utils/transforms/impute.py new file mode 100644 index 000000000..f64bedde2 --- /dev/null +++ b/airbyte_cdk/utils/transforms/impute.py @@ -0,0 +1,94 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union +from statistics import mean, median +from collections import Counter +import math + +Number = Union[int, float] + +@dataclass +class ImputationReport: + field: str + strategy: str + value_used: Any + notes: str = "" + +def _numeric_skewness(values: List[Number]) -> float: + n = len(values) + if n < 3: return 0.0 + mu = mean(values) + var = sum((x - mu) ** 2 for x in values) / (n - 1) + if var == 0: return 0.0 + sd = math.sqrt(var) + m3 = sum((x - mu) ** 3 for x in values) / n + g1 = m3 / (sd ** 3) + return float(((n * (n - 1)) ** 0.5 / (n - 2)) * g1) + +def choose_imputation_strategy( + series: Sequence[Any], + numeric: bool | None = None, + skew_threshold: float = 0.75, + unique_ratio_threshold: float = 0.05, +) -> str: + data = [x for x in series if x is not None] + if not data: + return "mode" + if numeric is None: + numeric = all(isinstance(x, (int, float)) for x in data) + if not numeric: + return "mode" + uniq = len(set(data)) + if (uniq / max(len(data), 1)) <= unique_ratio_threshold: + return "mode" + skew = abs(_numeric_skewness([float(x) for x in data])) + return "median" if skew > skew_threshold else "mean" + +def compute_imputation_value(series: Sequence[Any], strategy: str) -> Any: + clean = [x for x in series if x is not None] + if not clean: + return None + if strategy == "mean": + nums = [float(x) for x in clean if isinstance(x, (int, float))] + return mean(nums) if nums else None + if strategy == "median": + nums = [float(x) for x in clean if isinstance(x, (int, float))] + return median(nums) if nums else None + if strategy == "mode": + counts = Counter(clean) + maxc = max(counts.values()) + # deterministic tie-break + return sorted([k for k, v in counts.items() if v == maxc], key=lambda x: repr(x))[0] + raise ValueError(f"Unknown strategy: {strategy}") + +def fill_nulls_column( + series: Sequence[Any], + explicit_strategy: Optional[str] = None, + numeric: Optional[bool] = None, + **choose_kwargs +) -> Tuple[List[Any], ImputationReport]: + strategy = explicit_strategy or choose_imputation_strategy(series, numeric=numeric, **choose_kwargs) + fill_value = compute_imputation_value(series, strategy) + return [fill_value if x is None else x for x in series], ImputationReport("", strategy, fill_value) + +def fill_nulls_record( + record: Dict[str, Any], + columns: Sequence[str], + samples: Mapping[str, Sequence[Any]], + strategies: Optional[Mapping[str, str]] = None, + choose_kwargs: Optional[Dict[str, Any]] = None, +) -> Tuple[Dict[str, Any], List[ImputationReport]]: + choose_kwargs = choose_kwargs or {} + out = dict(record) + reports: List[ImputationReport] = [] + for col in columns: + series = samples.get(col, []) + sflag = strategies.get(col) if strategies else None + # infer numeric from samples if not set + numeric = all(isinstance(x, (int, float)) for x in series if x is not None) if series else None + # Use sample data for imputation, but only fill the record's value + strategy = sflag or choose_imputation_strategy(series, numeric=numeric, **choose_kwargs) + fill_value = compute_imputation_value(series, strategy=strategy) + out[col] = record.get(col) if record.get(col) is not None else fill_value + reports.append(ImputationReport(col, strategy, fill_value)) + return out, reports diff --git a/airbyte_cdk/utils/transforms/math.py b/airbyte_cdk/utils/transforms/math.py new file mode 100644 index 000000000..563ee9109 --- /dev/null +++ b/airbyte_cdk/utils/transforms/math.py @@ -0,0 +1,50 @@ +from __future__ import annotations +from typing import Sequence, Tuple, Union +import math + +Number = Union[int, float] + +def minmax_scale(x: Number, data_min: Number, data_max: Number, out_range: Tuple[Number, Number]=(0.0, 1.0)) -> float: + a, b = out_range + if data_max == data_min: + return float(a + (b - a) / 2.0) + return ((float(x) - data_min) / (data_max - data_min)) * (b - a) + a + +def zscore(x: Number, mu: float, sigma: float) -> float: + return 0.0 if sigma == 0 else (float(x) - mu) / sigma + +def clip(x: Number, low: Number, high: Number) -> Number: + return max(low, min(high, x)) + +def winsorize(x: Number, low_value: Number, high_value: Number) -> Number: + return clip(x, low_value, high_value) + +def log1p_safe(x: Number) -> float: + if x < -1: + return float(x) + try: + return math.log1p(float(x)) + except Exception: + return float(x) + +def bucketize(x: Number, edges: Sequence[Number]) -> int: + for i, e in enumerate(edges): + if x <= e: + return i + return len(edges) + +def robust_percentile_scale( + x: Number, + p_low_value: Number, + p_high_value: Number, + out_range: Tuple[Number, Number]=(0.0, 1.0), + clip_outliers: bool=True +) -> float: + a, b = out_range + lo, hi = float(p_low_value), float(p_high_value) + if clip_outliers: + x = clip(float(x), lo, hi) + width = hi - lo + if width == 0: + return float(a + (b - a) / 2.0) + return ((float(x) - lo) / width) * (b - a) + a