Coverage for tdaad / anomaly_detectors.py: 94%
62 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:54 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 16:54 +0000
1"""Topological Anomaly Detectors."""
3# Author: Martin Royer
4from typing import Optional, Union
6import numbers
7import warnings
8import numpy as np
10from sklearn.base import _fit_context, TransformerMixin
11from sklearn.utils._param_validation import Interval
12from sklearn.utils.validation import check_is_fitted
13from sklearn.covariance import EllipticEnvelope
15from tdaad.topological_embedding import TopologicalEmbedding
18def score_flat_fast_remapping(scores, window_size, stride, padding_length=0):
19 """
20 Remap window-level anomaly scores to a flat sequence of per-time-step scores.
22 Parameters
23 ----------
24 scores : array-like of shape (n_windows,)
25 Anomaly scores for each window. Can be a pandas Series or NumPy array.
27 window_size : int
28 Size of the sliding window.
30 stride : int
31 Step size between windows.
33 padding_length : int, optional (default=0)
34 Extra length to pad the output array (typically at the end of a signal).
36 Returns
37 -------
38 remapped_scores : np.ndarray of shape (n_timestamps + padding_length,)
39 Flattened anomaly scores with per-timestep resolution. NaN values (from
40 positions not covered by any window) are replaced with 0.
41 """
42 # Ensure scores is a NumPy array
43 if hasattr(scores, "values"):
44 scores = scores.values
46 n_windows = len(scores)
48 # Compute begin and end indices for each window
49 begins = np.arange(n_windows) * stride
50 ends = begins + window_size
52 # Output length based on last window + padding
53 total_length = ends[-1] + padding_length
54 remapped_scores = np.full(total_length, np.nan)
56 # Find all unique intersection points between windows
57 intersections = np.unique(np.concatenate((begins, ends)))
59 # For each interval between two intersections, find overlapping windows and sum their scores
60 for left, right in zip(intersections[:-1], intersections[1:]):
61 overlapping = (begins <= left) & (right <= ends)
62 if np.any(overlapping):
63 remapped_scores[left:right] = np.nansum(scores[overlapping])
65 # Replace NaNs (unscored positions) with 0
66 np.nan_to_num(remapped_scores, copy=False)
68 return remapped_scores
71class TopologicalAnomalyDetector(EllipticEnvelope, TransformerMixin):
72 """
73 Anomaly detection for multivariate time series using topological embeddings and robust covariance estimation.
75 This detector extracts topological features from sliding windows of time series data and
76 uses a robust Mahalanobis distance (via PandasEllipticEnvelope) to score anomalies.
78 Read more in the :ref:`User Guide <topological_anomaly_detection>`.
80 Parameters
81 ----------
82 window_size : int, default=100
83 Sliding window size for extracting time series subsequences.
85 step : int, default=5
86 Step size between windows.
88 tda_max_dim : int, default=1
89 Maximum homology dimension used for topological feature extraction.
91 n_centers_by_dim : int, default=5
92 Number of k-means centers per topological dimension (for vectorization).
94 support_fraction : float or None, default=None
95 Proportion of data to use for robust covariance estimation. If None, computed automatically.
97 contamination : float, default=0.1
98 Proportion of anomalies in the data, used to compute decision threshold.
100 random_state : int, RandomState instance, or None, default=42
101 Controls randomness of the topological embedding and robust estimator.
103 Attributes
104 ----------
105 topological_embedding_ : object
106 TopologicalEmbedding transformer object that is fitted at `fit`.
108 Examples
109 --------
110 >>> n_timestamps = 1000
111 >>> n_sensors = 20
112 >>> import pandas as pd
113 >>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps)
114 >>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps)
115 >>> X.iloc[n_timestamps//2:,:10] = -X.iloc[n_timestamps//2:,10:20]
116 >>> detector = TopologicalAnomalyDetector(n_centers_by_dim=2, tda_max_dim=1).fit(X)
117 >>> anomaly_scores = detector.score_samples(X)
118 >>> decision = detector.decision_function(X)
119 >>> anomalies = detector.predict(X)
120 """
122 _parameter_constraints: dict = {
123 "window_size": [Interval(numbers.Integral, 1, None, closed="left")],
124 "step": [Interval(numbers.Integral, 1, None, closed="left")],
125 "tda_max_dim": [Interval(numbers.Integral, 0, 2, closed="both")],
126 "n_centers_by_dim": [Interval(numbers.Integral, 2, None, closed="left")],
127 "support_fraction": [Interval(numbers.Real, 0, 1, closed="right"), None],
128 "contamination": [Interval(numbers.Real, 0, 0.5, closed="both")],
129 "random_state": ["random_state"],
130 }
132 def __init__(
133 self,
134 window_size: int = 100,
135 step: int = 5,
136 tda_max_dim: int = 1,
137 n_centers_by_dim: int = 5,
138 support_fraction: Optional[float] = None,
139 contamination: float = 0.1,
140 random_state: Optional[Union[int, np.random.RandomState]] = 42,
141 ):
142 super().__init__(
143 support_fraction=support_fraction,
144 contamination=contamination,
145 random_state=random_state,
146 )
147 self.window_size = window_size
148 self.step = step
149 self.tda_max_dim = tda_max_dim
150 self.n_centers_by_dim = n_centers_by_dim
152 @_fit_context(prefer_skip_nested_validation=True)
153 def fit(self, X, y=None):
154 """Fit the TopologicalAnomalyDetector model.
156 Args
157 ----
158 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors)
159 Multiple time series to transform, where `n_timestamps` is the number of timestamps
160 in the series X, and `n_sensors` is the number of sensors.
161 y : Ignored
162 Not used, present for API consistency by convention.
164 Returns
165 -------
166 self : object
167 Returns the instance itself.
168 """
169 self.topological_embedding_ = TopologicalEmbedding(
170 window_size=self.window_size,
171 step=self.step,
172 n_centers_by_dim=self.n_centers_by_dim,
173 tda_max_dim=self.tda_max_dim,
174 )
175 embedding = self.topological_embedding_.fit_transform(X)
176 try:
177 super().fit(embedding)
178 except ValueError:
179 warnings.warn(f"Failed with support_fraction={self.support_fraction}. ")
181 self._update_padding(X)
182 raw_scores = super().score_samples(embedding)
183 self.dist_ = score_flat_fast_remapping(
184 raw_scores,
185 window_size=self.window_size,
186 stride=self.step,
187 padding_length=self.padding_length_,
188 )
189 self.offset_ = np.percentile(self.dist_, 100.0 * self.contamination)
190 return self
192 def _update_padding(self, X):
193 imax = (X.shape[0] - self.window_size) // self.step
194 self.padding_length_ = X.shape[0] - (self.step * imax + self.window_size)
196 def _raw_score_samples(self, X):
197 """Compute raw Mahalanobis scores (pre-remapping)."""
198 check_is_fitted(self, "topological_embedding_")
199 self._update_padding(X)
200 embedding = self.topological_embedding_.transform(X)
201 return super().score_samples(embedding)
203 def score_samples(self, X, y=None):
204 """Compute anomaly scores from topological features."""
205 raw_scores = self._raw_score_samples(X)
206 return score_flat_fast_remapping(
207 raw_scores,
208 window_size=self.window_size,
209 stride=self.step,
210 padding_length=self.padding_length_,
211 )
213 def decision_function(self, X):
214 """Return the distance to the decision boundary."""
215 return self.score_samples(X) - self.offset_
217 def predict(self, X):
218 """Predict inliers (1) and outliers (-1) using learned threshold."""
219 return np.where(self.decision_function(X) < 0, -1, 1)
221 def transform(self, X):
222 """Alias for score_samples. Returns anomaly scores."""
223 return self.score_samples(X)