-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStringDistanceEncoder.py
More file actions
197 lines (151 loc) · 6.88 KB
/
StringDistanceEncoder.py
File metadata and controls
197 lines (151 loc) · 6.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 29 14:22:38 2024
@author: cego
"""
import numpy as np
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.utils.validation import check_is_fitted
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import pairwise_distances
from sklearn.decomposition import TruncatedSVD
import pandas as pd
class StringDistanceEncoder(TransformerMixin, BaseEstimator):
""" A transformer that encodes dirty categories by using n-grams and the distance
between the strings.
Parameters
----------
n_components : int, default=2
Number of components to be extracted by TruncatedSVD applied to the
n-gram matrix.
metric : str, default='dice'
The metric to use when calculating distance between the categories.
If metric is a string, it must be one of the options allowed by scipy.spatial.distance.pdist
for its metric parameter, or a metric listed in sklearn.pairwise.PAIRWISE_DISTANCE_FUNCTIONS.
ngram_range : tuple (min_n, max_n), default=(3, 3)
The lower and upper boundary of the range of n-values for different char
n-grams to be extracted. All values of n such such that min_n <= n <= max_n
will be used.
lowercase : bool, default=True
Convert all characters to lowercase before tokenizing.
Attributes
----------
metric : str
Metric selected at initialization.
ngram_range : tuple
n-gram range selected at initialization.
categories_ : list
Unique categories that were saw during the `fit` method.
count_vectorizer_ : class
Respective CountVectorizer object.
categories_vectorized_ : array, shape (n_samples, n_grams)
Output of count_vectorizer_ object over categories_ list.
truncated_svd : class
Fitted TruncatedSVD object.
"""
def __init__(self,
n_components=2,
metric="dice",
ngram_range=(1, 3),
lowercase=True):
self.n_components = n_components
self.metric = metric
self.ngram_range= ngram_range
self.lowercase = lowercase
def fit(self, X, y=None):
"""
Parameters
----------
X : {DataFrame}, shape (n_samples, 1)
The training input samples.
y : None
There is no need of a target in a transformer, yet the pipeline API
requires this parameter.
Returns
-------
self : object
Returns self.
"""
if not isinstance(X, pd.DataFrame):
raise TypeError("X must be a DataFrame.")
if X.shape[1] != 1:
raise ValueError("X must have only one column.")
if X.dtypes[0] not in ["object", "string"]:
raise TypeError("Column must have object or string dtype.")
X = pd.Series(X.values.squeeze()).sort_values()
# we care only about the unique categories
X_unique = np.unique(X.values)
self.categories_ = X_unique.tolist()
# fitting CountVectorizer object using ngram_range options
self.count_vectorizer_ = CountVectorizer(ngram_range=self.ngram_range, analyzer="char",
lowercase=self.lowercase)
self.categories_vectorized_ = self.count_vectorizer_.fit_transform(self.categories_).toarray() > 0
# generating distance matrix using the selected distance metric
dist_array = pairwise_distances(self.categories_vectorized_,
metric=self.metric)
# fitting and storing the TruncatedSVD object
truncated_svd = TruncatedSVD(n_components=self.n_components,
algorithm="arpack")
truncated_svd.fit(dist_array)
self.truncated_svd = truncated_svd
return self
def transform(self, X):
"""
Parameters
----------
X : {DataFrame}, shape (n_samples, 1)
The input samples.
Returns
-------
X_transformed : array, shape (n_samples, n_components)
The transformed array with the distances between the categories
dully encoded.
"""
if not isinstance(X, pd.DataFrame):
raise TypeError("X must be a DataFrame.")
if X.shape[1] != 1:
raise ValueError("X must have only one column.")
if X.dtypes[0] not in ["object", "string"]:
raise TypeError("X must have object or string dtype.")
check_is_fitted(self, 'truncated_svd')
X = pd.Series(X.values.squeeze())
# we care only about the unique categories
X_unique = np.unique(X.values).tolist()
# vectorizing X using the fitted count_vectorizer_ object
X_unique_vectorized = self.count_vectorizer_.transform(X_unique).toarray() > 0
# calculating the distances between the categories in X and the categories
# that were already seen before
dist_array = pairwise_distances(X_unique_vectorized,
self.categories_vectorized_,
metric=self.metric)
# applying dimensionality reduction to the X_unique
X_unique_transformed = self.truncated_svd.transform(dist_array)
# but now we need to put X in the original shape
X_transformed = np.zeros((len(X), X_unique_transformed.shape[-1]))
for i, category in enumerate(X_unique):
cond = (X == category).values
X_transformed[cond, :] = X_unique_transformed[i, :]
return X_transformed
def fit_transform(self, X, y=None):
"""
Parameters
----------
X : {DataFrame}, shape (n_samples, 1)
The input samples.
y : None
There is no need of a target in a transformer, yet the pipeline API
requires this parameter.
Returns
-------
X_transformed : array, shape (n_samples, n_components)
The transformed array with the distances between the categories
dully encoded.
"""
if not isinstance(X, pd.DataFrame):
raise TypeError("X must be a DataFrame.")
if X.shape[1] != 1:
raise ValueError("X must have only one column.")
if X.dtypes[0] != "object":
raise TypeError("X must have object dtype.")
self.fit(X)
return self.transform(X)