Coverage for tdaad / topological_embedding.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:54 +0000
1"""Topological Embedding Transformers."""
3# Author: Martin Royer
4import numpy as np
5import pandas as pd
7from sklearn.base import BaseEstimator, TransformerMixin
8from sklearn.pipeline import Pipeline
9from sklearn.preprocessing import FunctionTransformer, StandardScaler
10from sklearn.compose import ColumnTransformer
11from sklearn.cluster import KMeans
13from gudhi.representations.vector_methods import Atol
14from gudhi.sklearn.rips_persistence import RipsPersistence
17def numpy_data_to_similarity(X, filter_nan=True):
18 r"""Transforms numpy matrix X into similarity matrix :math:`1-\mathbf{Corr}(X)`."""
19 target = 1 - np.corrcoef(X, rowvar=False)
20 # this filters when a variable is constant -> nan on all rows
21 nanrowcols = np.isnan(target).all(axis=0) if filter_nan else ~target.any(axis=0)
22 return target[~nanrowcols][:, ~nanrowcols]
25class SlidingWindowTransformer(BaseEstimator, TransformerMixin):
26 """
27 Slice a 2D numpy array into overlapping windows.
29 Output: list of 2D numpy arrays, one per window.
30 """
32 def __init__(self, window_size=40, step=5):
33 self.window_size = window_size
34 self.step = step
36 def fit(self, X, y=None):
37 return self
39 def transform(self, X):
40 n_rows = X.shape[0]
41 self.window_index_ = list(range(0, n_rows - self.window_size + 1, self.step))
42 return [X[i: i + self.window_size] for i in self.window_index_]
45class TopologicalEmbedding(BaseEstimator, TransformerMixin):
46 """
47 Topological embedding for multivariate time series using sliding windows,
48 persistent homology (Rips), and ATOL vectorization.
50 Pipeline:
51 Sliding windows -> similarity -> RipsPersistence -> ColumnTransformer(Atol)
53 Parameters
54 ----------
55 window_size : int
56 Number of rows per sliding window.
57 step : int
58 Step size between windows.
59 tda_max_dim : int
60 Maximum homology dimension for RipsPersistence.
61 n_centers_by_dim : int
62 Number of centroids per homology dimension in ATOL.
63 filter_nan : bool
64 Whether to filter NaNs in similarity matrices.
65 output : str, default="pandas"
66 "pandas" returns a DataFrame with proper index and column names.
67 "numpy" returns a numpy array.
68 """
70 def __init__(
71 self,
72 window_size: int = 40,
73 step: int = 5,
74 tda_max_dim: int = 2,
75 n_centers_by_dim: int = 5,
76 filter_nan: bool = True,
77 output: str = "pandas",
78 ):
79 self.window_size = window_size
80 self.step = step
81 self.tda_max_dim = tda_max_dim
82 self.n_centers_by_dim = n_centers_by_dim
83 self.filter_nan = filter_nan
84 self.output = output
86 def _build_pipeline(self):
87 # FunctionTransformer to convert windows -> distance/similarity matrices
88 similarity_fn = FunctionTransformer(
89 func=lambda X_list: [
90 numpy_data_to_similarity(x, filter_nan=self.filter_nan) for x in X_list
91 ]
92 )
94 # Batched RipsPersistence
95 rips_transformer = RipsPersistence(
96 homology_dimensions=range(self.tda_max_dim + 1),
97 input_type="lower distance matrix",
98 )
100 # ColumnTransformer: one Atol per homology dimension
101 archipelago_transformer = ColumnTransformer(
102 transformers=[
103 (
104 f"atol_dim_{i}",
105 Atol(
106 quantiser=KMeans(
107 n_clusters=self.n_centers_by_dim,
108 random_state=42,
109 n_init="auto",
110 )
111 ),
112 i,
113 )
114 for i in range(self.tda_max_dim + 1)
115 ]
116 )
118 # Full sklearn pipeline
119 pipeline = Pipeline(
120 steps=[
121 ("scaler", StandardScaler()),
122 (
123 "windows",
124 SlidingWindowTransformer(
125 window_size=self.window_size, step=self.step
126 ),
127 ),
128 ("similarity", similarity_fn),
129 ("rips", rips_transformer),
130 ("archipelago", archipelago_transformer),
131 ]
132 )
134 return pipeline
136 def fit(self, X, y=None):
137 """
138 Fit the full pipeline to the data.
140 Parameters
141 ----------
142 X : np.ndarray, shape (n_samples, n_features)
143 Input multivariate time series.
145 y : Ignored
146 """
147 self.pipeline_ = self._build_pipeline()
148 self.pipeline_.fit(X, y)
149 return self
151 def transform(self, X):
152 """
153 Transform the input data and return a pandas DataFrame with
154 row index = window start position and columns named feature_0, feature_1, ...
155 """
156 X_transformed = self.pipeline_.transform(X)
158 # Build column names: ph{i}_center{j}
159 columns = [
160 f"ph{i}_center{j + 1}"
161 for i in range(self.tda_max_dim + 1)
162 for j in range(self.n_centers_by_dim)
163 ]
165 # Build DataFrame with window index from SlidingWindowTransformer
166 window_index = self.pipeline_.named_steps["windows"].window_index_
167 return pd.DataFrame(X_transformed, index=window_index, columns=columns)