Coverage for tdaad / anomaly_detectors.py: 94%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 16:54 +0000

1"""Topological Anomaly Detectors.""" 

2 

3# Author: Martin Royer 

4from typing import Optional, Union 

5 

6import numbers 

7import warnings 

8import numpy as np 

9 

10from sklearn.base import _fit_context, TransformerMixin 

11from sklearn.utils._param_validation import Interval 

12from sklearn.utils.validation import check_is_fitted 

13from sklearn.covariance import EllipticEnvelope 

14 

15from tdaad.topological_embedding import TopologicalEmbedding 

16 

17 

18def score_flat_fast_remapping(scores, window_size, stride, padding_length=0): 

19 """ 

20 Remap window-level anomaly scores to a flat sequence of per-time-step scores. 

21 

22 Parameters 

23 ---------- 

24 scores : array-like of shape (n_windows,) 

25 Anomaly scores for each window. Can be a pandas Series or NumPy array. 

26 

27 window_size : int 

28 Size of the sliding window. 

29 

30 stride : int 

31 Step size between windows. 

32 

33 padding_length : int, optional (default=0) 

34 Extra length to pad the output array (typically at the end of a signal). 

35 

36 Returns 

37 ------- 

38 remapped_scores : np.ndarray of shape (n_timestamps + padding_length,) 

39 Flattened anomaly scores with per-timestep resolution. NaN values (from 

40 positions not covered by any window) are replaced with 0. 

41 """ 

42 # Ensure scores is a NumPy array 

43 if hasattr(scores, "values"): 

44 scores = scores.values 

45 

46 n_windows = len(scores) 

47 

48 # Compute begin and end indices for each window 

49 begins = np.arange(n_windows) * stride 

50 ends = begins + window_size 

51 

52 # Output length based on last window + padding 

53 total_length = ends[-1] + padding_length 

54 remapped_scores = np.full(total_length, np.nan) 

55 

56 # Find all unique intersection points between windows 

57 intersections = np.unique(np.concatenate((begins, ends))) 

58 

59 # For each interval between two intersections, find overlapping windows and sum their scores 

60 for left, right in zip(intersections[:-1], intersections[1:]): 

61 overlapping = (begins <= left) & (right <= ends) 

62 if np.any(overlapping): 

63 remapped_scores[left:right] = np.nansum(scores[overlapping]) 

64 

65 # Replace NaNs (unscored positions) with 0 

66 np.nan_to_num(remapped_scores, copy=False) 

67 

68 return remapped_scores 

69 

70 

71class TopologicalAnomalyDetector(EllipticEnvelope, TransformerMixin): 

72 """ 

73 Anomaly detection for multivariate time series using topological embeddings and robust covariance estimation. 

74 

75 This detector extracts topological features from sliding windows of time series data and 

76 uses a robust Mahalanobis distance (via PandasEllipticEnvelope) to score anomalies. 

77 

78 Read more in the :ref:`User Guide <topological_anomaly_detection>`. 

79 

80 Parameters 

81 ---------- 

82 window_size : int, default=100 

83 Sliding window size for extracting time series subsequences. 

84 

85 step : int, default=5 

86 Step size between windows. 

87 

88 tda_max_dim : int, default=1 

89 Maximum homology dimension used for topological feature extraction. 

90 

91 n_centers_by_dim : int, default=5 

92 Number of k-means centers per topological dimension (for vectorization). 

93 

94 support_fraction : float or None, default=None 

95 Proportion of data to use for robust covariance estimation. If None, computed automatically. 

96 

97 contamination : float, default=0.1 

98 Proportion of anomalies in the data, used to compute decision threshold. 

99 

100 random_state : int, RandomState instance, or None, default=42 

101 Controls randomness of the topological embedding and robust estimator. 

102 

103 Attributes 

104 ---------- 

105 topological_embedding_ : object 

106 TopologicalEmbedding transformer object that is fitted at `fit`. 

107 

108 Examples 

109 -------- 

110 >>> n_timestamps = 1000 

111 >>> n_sensors = 20 

112 >>> import pandas as pd 

113 >>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps) 

114 >>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps) 

115 >>> X.iloc[n_timestamps//2:,:10] = -X.iloc[n_timestamps//2:,10:20] 

116 >>> detector = TopologicalAnomalyDetector(n_centers_by_dim=2, tda_max_dim=1).fit(X) 

117 >>> anomaly_scores = detector.score_samples(X) 

118 >>> decision = detector.decision_function(X) 

119 >>> anomalies = detector.predict(X) 

120 """ 

121 

122 _parameter_constraints: dict = { 

123 "window_size": [Interval(numbers.Integral, 1, None, closed="left")], 

124 "step": [Interval(numbers.Integral, 1, None, closed="left")], 

125 "tda_max_dim": [Interval(numbers.Integral, 0, 2, closed="both")], 

126 "n_centers_by_dim": [Interval(numbers.Integral, 2, None, closed="left")], 

127 "support_fraction": [Interval(numbers.Real, 0, 1, closed="right"), None], 

128 "contamination": [Interval(numbers.Real, 0, 0.5, closed="both")], 

129 "random_state": ["random_state"], 

130 } 

131 

132 def __init__( 

133 self, 

134 window_size: int = 100, 

135 step: int = 5, 

136 tda_max_dim: int = 1, 

137 n_centers_by_dim: int = 5, 

138 support_fraction: Optional[float] = None, 

139 contamination: float = 0.1, 

140 random_state: Optional[Union[int, np.random.RandomState]] = 42, 

141 ): 

142 super().__init__( 

143 support_fraction=support_fraction, 

144 contamination=contamination, 

145 random_state=random_state, 

146 ) 

147 self.window_size = window_size 

148 self.step = step 

149 self.tda_max_dim = tda_max_dim 

150 self.n_centers_by_dim = n_centers_by_dim 

151 

152 @_fit_context(prefer_skip_nested_validation=True) 

153 def fit(self, X, y=None): 

154 """Fit the TopologicalAnomalyDetector model. 

155 

156 Args 

157 ---- 

158 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors) 

159 Multiple time series to transform, where `n_timestamps` is the number of timestamps 

160 in the series X, and `n_sensors` is the number of sensors. 

161 y : Ignored 

162 Not used, present for API consistency by convention. 

163 

164 Returns 

165 ------- 

166 self : object 

167 Returns the instance itself. 

168 """ 

169 self.topological_embedding_ = TopologicalEmbedding( 

170 window_size=self.window_size, 

171 step=self.step, 

172 n_centers_by_dim=self.n_centers_by_dim, 

173 tda_max_dim=self.tda_max_dim, 

174 ) 

175 embedding = self.topological_embedding_.fit_transform(X) 

176 try: 

177 super().fit(embedding) 

178 except ValueError: 

179 warnings.warn(f"Failed with support_fraction={self.support_fraction}. ") 

180 

181 self._update_padding(X) 

182 raw_scores = super().score_samples(embedding) 

183 self.dist_ = score_flat_fast_remapping( 

184 raw_scores, 

185 window_size=self.window_size, 

186 stride=self.step, 

187 padding_length=self.padding_length_, 

188 ) 

189 self.offset_ = np.percentile(self.dist_, 100.0 * self.contamination) 

190 return self 

191 

192 def _update_padding(self, X): 

193 imax = (X.shape[0] - self.window_size) // self.step 

194 self.padding_length_ = X.shape[0] - (self.step * imax + self.window_size) 

195 

196 def _raw_score_samples(self, X): 

197 """Compute raw Mahalanobis scores (pre-remapping).""" 

198 check_is_fitted(self, "topological_embedding_") 

199 self._update_padding(X) 

200 embedding = self.topological_embedding_.transform(X) 

201 return super().score_samples(embedding) 

202 

203 def score_samples(self, X, y=None): 

204 """Compute anomaly scores from topological features.""" 

205 raw_scores = self._raw_score_samples(X) 

206 return score_flat_fast_remapping( 

207 raw_scores, 

208 window_size=self.window_size, 

209 stride=self.step, 

210 padding_length=self.padding_length_, 

211 ) 

212 

213 def decision_function(self, X): 

214 """Return the distance to the decision boundary.""" 

215 return self.score_samples(X) - self.offset_ 

216 

217 def predict(self, X): 

218 """Predict inliers (1) and outliers (-1) using learned threshold.""" 

219 return np.where(self.decision_function(X) < 0, -1, 1) 

220 

221 def transform(self, X): 

222 """Alias for score_samples. Returns anomaly scores.""" 

223 return self.score_samples(X)