Coverage for tdaad / topological_embedding.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 16:54 +0000

1"""Topological Embedding Transformers.""" 

2 

3# Author: Martin Royer 

4import numpy as np 

5import pandas as pd 

6 

7from sklearn.base import BaseEstimator, TransformerMixin 

8from sklearn.pipeline import Pipeline 

9from sklearn.preprocessing import FunctionTransformer, StandardScaler 

10from sklearn.compose import ColumnTransformer 

11from sklearn.cluster import KMeans 

12 

13from gudhi.representations.vector_methods import Atol 

14from gudhi.sklearn.rips_persistence import RipsPersistence 

15 

16 

17def numpy_data_to_similarity(X, filter_nan=True): 

18 r"""Transforms numpy matrix X into similarity matrix :math:`1-\mathbf{Corr}(X)`.""" 

19 target = 1 - np.corrcoef(X, rowvar=False) 

20 # this filters when a variable is constant -> nan on all rows 

21 nanrowcols = np.isnan(target).all(axis=0) if filter_nan else ~target.any(axis=0) 

22 return target[~nanrowcols][:, ~nanrowcols] 

23 

24 

25class SlidingWindowTransformer(BaseEstimator, TransformerMixin): 

26 """ 

27 Slice a 2D numpy array into overlapping windows. 

28 

29 Output: list of 2D numpy arrays, one per window. 

30 """ 

31 

32 def __init__(self, window_size=40, step=5): 

33 self.window_size = window_size 

34 self.step = step 

35 

36 def fit(self, X, y=None): 

37 return self 

38 

39 def transform(self, X): 

40 n_rows = X.shape[0] 

41 self.window_index_ = list(range(0, n_rows - self.window_size + 1, self.step)) 

42 return [X[i: i + self.window_size] for i in self.window_index_] 

43 

44 

45class TopologicalEmbedding(BaseEstimator, TransformerMixin): 

46 """ 

47 Topological embedding for multivariate time series using sliding windows, 

48 persistent homology (Rips), and ATOL vectorization. 

49 

50 Pipeline: 

51 Sliding windows -> similarity -> RipsPersistence -> ColumnTransformer(Atol) 

52 

53 Parameters 

54 ---------- 

55 window_size : int 

56 Number of rows per sliding window. 

57 step : int 

58 Step size between windows. 

59 tda_max_dim : int 

60 Maximum homology dimension for RipsPersistence. 

61 n_centers_by_dim : int 

62 Number of centroids per homology dimension in ATOL. 

63 filter_nan : bool 

64 Whether to filter NaNs in similarity matrices. 

65 output : str, default="pandas" 

66 "pandas" returns a DataFrame with proper index and column names. 

67 "numpy" returns a numpy array. 

68 """ 

69 

70 def __init__( 

71 self, 

72 window_size: int = 40, 

73 step: int = 5, 

74 tda_max_dim: int = 2, 

75 n_centers_by_dim: int = 5, 

76 filter_nan: bool = True, 

77 output: str = "pandas", 

78 ): 

79 self.window_size = window_size 

80 self.step = step 

81 self.tda_max_dim = tda_max_dim 

82 self.n_centers_by_dim = n_centers_by_dim 

83 self.filter_nan = filter_nan 

84 self.output = output 

85 

86 def _build_pipeline(self): 

87 # FunctionTransformer to convert windows -> distance/similarity matrices 

88 similarity_fn = FunctionTransformer( 

89 func=lambda X_list: [ 

90 numpy_data_to_similarity(x, filter_nan=self.filter_nan) for x in X_list 

91 ] 

92 ) 

93 

94 # Batched RipsPersistence 

95 rips_transformer = RipsPersistence( 

96 homology_dimensions=range(self.tda_max_dim + 1), 

97 input_type="lower distance matrix", 

98 ) 

99 

100 # ColumnTransformer: one Atol per homology dimension 

101 archipelago_transformer = ColumnTransformer( 

102 transformers=[ 

103 ( 

104 f"atol_dim_{i}", 

105 Atol( 

106 quantiser=KMeans( 

107 n_clusters=self.n_centers_by_dim, 

108 random_state=42, 

109 n_init="auto", 

110 ) 

111 ), 

112 i, 

113 ) 

114 for i in range(self.tda_max_dim + 1) 

115 ] 

116 ) 

117 

118 # Full sklearn pipeline 

119 pipeline = Pipeline( 

120 steps=[ 

121 ("scaler", StandardScaler()), 

122 ( 

123 "windows", 

124 SlidingWindowTransformer( 

125 window_size=self.window_size, step=self.step 

126 ), 

127 ), 

128 ("similarity", similarity_fn), 

129 ("rips", rips_transformer), 

130 ("archipelago", archipelago_transformer), 

131 ] 

132 ) 

133 

134 return pipeline 

135 

136 def fit(self, X, y=None): 

137 """ 

138 Fit the full pipeline to the data. 

139 

140 Parameters 

141 ---------- 

142 X : np.ndarray, shape (n_samples, n_features) 

143 Input multivariate time series. 

144 

145 y : Ignored 

146 """ 

147 self.pipeline_ = self._build_pipeline() 

148 self.pipeline_.fit(X, y) 

149 return self 

150 

151 def transform(self, X): 

152 """ 

153 Transform the input data and return a pandas DataFrame with 

154 row index = window start position and columns named feature_0, feature_1, ... 

155 """ 

156 X_transformed = self.pipeline_.transform(X) 

157 

158 # Build column names: ph{i}_center{j} 

159 columns = [ 

160 f"ph{i}_center{j + 1}" 

161 for i in range(self.tda_max_dim + 1) 

162 for j in range(self.n_centers_by_dim) 

163 ] 

164 

165 # Build DataFrame with window index from SlidingWindowTransformer 

166 window_index = self.pipeline_.named_steps["windows"].window_index_ 

167 return pd.DataFrame(X_transformed, index=window_index, columns=columns)