Coverage for tdaad/anomaly_detectors.py: 90%

50 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 16:23 +0000

1"""Topological Anomaly Detectors.""" 

2 

3# Author: Martin Royer 

4from typing import Sequence, Optional, Union 

5 

6import numbers 

7import warnings 

8import numpy as np 

9 

10from sklearn.base import _fit_context, TransformerMixin 

11from sklearn.utils._param_validation import Interval 

12from sklearn.utils.validation import check_is_fitted 

13 

14from tdaad.utils.remapping_functions import score_flat_fast_remapping 

15from tdaad.topological_embedding import TopologicalEmbedding 

16from tdaad.utils.local_elliptic_envelope import EllipticEnvelope 

17 

18 

19class TopologicalAnomalyDetector(EllipticEnvelope, TransformerMixin): 

20 """ 

21 Anomaly detection for multivariate time series using topological embeddings and robust covariance estimation. 

22 

23 This detector extracts topological features from sliding windows of time series data and 

24 uses a robust Mahalanobis distance (via EllipticEnvelope) to score anomalies. 

25 

26 Read more in the :ref:`User Guide <topological_anomaly_detection>`. 

27 

28 Parameters 

29 ---------- 

30 window_size : int, default=100 

31 Sliding window size for extracting time series subsequences. 

32 

33 step : int, default=5 

34 Step size between windows. 

35 

36 tda_max_dim : int, default=1 

37 Maximum homology dimension used for topological feature extraction. 

38 

39 n_centers_by_dim : int, default=5 

40 Number of k-means centers per topological dimension (for vectorization). 

41 

42 support_fraction : float or None, default=None 

43 Proportion of data to use for robust covariance estimation. If None, computed automatically. 

44 

45 contamination : float, default=0.1 

46 Proportion of anomalies in the data, used to compute decision threshold. 

47 

48 random_state : int, RandomState instance, or None, default=42 

49 Controls randomness of the topological embedding and robust estimator. 

50 

51 Attributes 

52 ---------- 

53 topological_embedding_ : object 

54 TopologicalEmbedding transformer object that is fitted at `fit`. 

55 

56 Examples 

57 -------- 

58 >>> n_timestamps = 1000 

59 >>> n_sensors = 20 

60 >>> import pandas as pd 

61 >>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps) 

62 >>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps) 

63 >>> X.iloc[n_timestamps//2:,:10] = -X.iloc[n_timestamps//2:,10:20] 

64 >>> detector = TopologicalAnomalyDetector(n_centers_by_dim=2, tda_max_dim=1).fit(X) 

65 >>> anomaly_scores = detector.score_samples(X) 

66 >>> decision = detector.decision_function(X) 

67 >>> anomalies = detector.predict(X) 

68 """ 

69 

70 required_properties: Sequence[str] = ["multiple_time_series"] 

71 

72 _parameter_constraints: dict = { 

73 "window_size": [Interval(numbers.Integral, 1, None, closed="left")], 

74 "step": [Interval(numbers.Integral, 1, None, closed="left")], 

75 "tda_max_dim": [Interval(numbers.Integral, 0, 2, closed="both")], 

76 "n_centers_by_dim": [Interval(numbers.Integral, 2, None, closed="left")], 

77 "support_fraction": [Interval(numbers.Real, 0, 1, closed="both"), None], 

78 "contamination": [Interval(numbers.Real, 0, 0.5, closed="both")], 

79 "random_state": ["random_state"], 

80 } 

81 

82 def __init__( 

83 self, 

84 window_size: int = 100, 

85 step: int = 5, 

86 tda_max_dim: int = 1, 

87 n_centers_by_dim: int = 5, 

88 support_fraction: Optional[float] = None, 

89 contamination: float = 0.1, 

90 random_state: Optional[Union[int, np.random.RandomState]] = 42, 

91 ): 

92 super().__init__( 

93 support_fraction=support_fraction, 

94 contamination=contamination, 

95 random_state=random_state, 

96 ) 

97 self.window_size = window_size 

98 self.step = step 

99 self.tda_max_dim = tda_max_dim 

100 self.n_centers_by_dim = n_centers_by_dim 

101 

102 @_fit_context(prefer_skip_nested_validation=True) 

103 def fit(self, X, y=None): 

104 """Fit the TopologicalAnomalyDetector model. 

105 

106 Args 

107 ---- 

108 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors) 

109 Multiple time series to transform, where `n_timestamps` is the number of timestamps 

110 in the series X, and `n_sensors` is the number of sensors. 

111 y : Ignored 

112 Not used, present for API consistency by convention. 

113 

114 Returns 

115 ------- 

116 self : object 

117 Returns the instance itself. 

118 """ 

119 self.topological_embedding_ = TopologicalEmbedding( 

120 window_size=self.window_size, 

121 step=self.step, 

122 n_centers_by_dim=self.n_centers_by_dim, 

123 tda_max_dim=self.tda_max_dim, 

124 ) 

125 embedding = self.topological_embedding_.fit_transform(X) 

126 try: 

127 super().fit(embedding) 

128 except ValueError: 

129 warnings.warn(f"Failed with support_fraction={self.support_fraction}. ") 

130 

131 self._update_padding(X) 

132 raw_scores = super().score_samples(embedding) 

133 self.dist_ = score_flat_fast_remapping( 

134 raw_scores, 

135 window_size=self.window_size, 

136 stride=self.step, 

137 padding_length=self.padding_length_, 

138 ) 

139 self.offset_ = np.percentile(self.dist_, 100.0 * self.contamination) 

140 return self 

141 

142 def _update_padding(self, X): 

143 imax = (X.shape[0] - self.window_size) // self.step 

144 self.padding_length_ = X.shape[0] - (self.step * imax + self.window_size) 

145 

146 def _raw_score_samples(self, X): 

147 """Compute raw Mahalanobis scores (pre-remapping).""" 

148 check_is_fitted(self, "topological_embedding_") 

149 self._update_padding(X) 

150 embedding = self.topological_embedding_.transform(X) 

151 return super().score_samples(embedding) 

152 

153 def score_samples(self, X, y=None): 

154 """Compute anomaly scores from topological features.""" 

155 raw_scores = self._raw_score_samples(X) 

156 return score_flat_fast_remapping( 

157 raw_scores, 

158 window_size=self.window_size, 

159 stride=self.step, 

160 padding_length=self.padding_length_, 

161 ) 

162 

163 def decision_function(self, X): 

164 """Return the distance to the decision boundary.""" 

165 return self.offset_ - self.score_samples(X) 

166 

167 def predict(self, X): 

168 """Predict inliers (1) and outliers (-1) using learned threshold.""" 

169 scores = self.score_samples(X) 

170 return np.where(scores < self.offset_, -1, 1) 

171 

172 def transform(self, X): 

173 """Alias for score_samples. Returns anomaly scores.""" 

174 return self.score_samples(X)