Coverage for tdaad/anomaly_detectors.py: 90%
50 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 16:23 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 16:23 +0000
1"""Topological Anomaly Detectors."""
3# Author: Martin Royer
4from typing import Sequence, Optional, Union
6import numbers
7import warnings
8import numpy as np
10from sklearn.base import _fit_context, TransformerMixin
11from sklearn.utils._param_validation import Interval
12from sklearn.utils.validation import check_is_fitted
14from tdaad.utils.remapping_functions import score_flat_fast_remapping
15from tdaad.topological_embedding import TopologicalEmbedding
16from tdaad.utils.local_elliptic_envelope import EllipticEnvelope
19class TopologicalAnomalyDetector(EllipticEnvelope, TransformerMixin):
20 """
21 Anomaly detection for multivariate time series using topological embeddings and robust covariance estimation.
23 This detector extracts topological features from sliding windows of time series data and
24 uses a robust Mahalanobis distance (via EllipticEnvelope) to score anomalies.
26 Read more in the :ref:`User Guide <topological_anomaly_detection>`.
28 Parameters
29 ----------
30 window_size : int, default=100
31 Sliding window size for extracting time series subsequences.
33 step : int, default=5
34 Step size between windows.
36 tda_max_dim : int, default=1
37 Maximum homology dimension used for topological feature extraction.
39 n_centers_by_dim : int, default=5
40 Number of k-means centers per topological dimension (for vectorization).
42 support_fraction : float or None, default=None
43 Proportion of data to use for robust covariance estimation. If None, computed automatically.
45 contamination : float, default=0.1
46 Proportion of anomalies in the data, used to compute decision threshold.
48 random_state : int, RandomState instance, or None, default=42
49 Controls randomness of the topological embedding and robust estimator.
51 Attributes
52 ----------
53 topological_embedding_ : object
54 TopologicalEmbedding transformer object that is fitted at `fit`.
56 Examples
57 --------
58 >>> n_timestamps = 1000
59 >>> n_sensors = 20
60 >>> import pandas as pd
61 >>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps)
62 >>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps)
63 >>> X.iloc[n_timestamps//2:,:10] = -X.iloc[n_timestamps//2:,10:20]
64 >>> detector = TopologicalAnomalyDetector(n_centers_by_dim=2, tda_max_dim=1).fit(X)
65 >>> anomaly_scores = detector.score_samples(X)
66 >>> decision = detector.decision_function(X)
67 >>> anomalies = detector.predict(X)
68 """
70 required_properties: Sequence[str] = ["multiple_time_series"]
72 _parameter_constraints: dict = {
73 "window_size": [Interval(numbers.Integral, 1, None, closed="left")],
74 "step": [Interval(numbers.Integral, 1, None, closed="left")],
75 "tda_max_dim": [Interval(numbers.Integral, 0, 2, closed="both")],
76 "n_centers_by_dim": [Interval(numbers.Integral, 2, None, closed="left")],
77 "support_fraction": [Interval(numbers.Real, 0, 1, closed="both"), None],
78 "contamination": [Interval(numbers.Real, 0, 0.5, closed="both")],
79 "random_state": ["random_state"],
80 }
82 def __init__(
83 self,
84 window_size: int = 100,
85 step: int = 5,
86 tda_max_dim: int = 1,
87 n_centers_by_dim: int = 5,
88 support_fraction: Optional[float] = None,
89 contamination: float = 0.1,
90 random_state: Optional[Union[int, np.random.RandomState]] = 42,
91 ):
92 super().__init__(
93 support_fraction=support_fraction,
94 contamination=contamination,
95 random_state=random_state,
96 )
97 self.window_size = window_size
98 self.step = step
99 self.tda_max_dim = tda_max_dim
100 self.n_centers_by_dim = n_centers_by_dim
102 @_fit_context(prefer_skip_nested_validation=True)
103 def fit(self, X, y=None):
104 """Fit the TopologicalAnomalyDetector model.
106 Args
107 ----
108 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors)
109 Multiple time series to transform, where `n_timestamps` is the number of timestamps
110 in the series X, and `n_sensors` is the number of sensors.
111 y : Ignored
112 Not used, present for API consistency by convention.
114 Returns
115 -------
116 self : object
117 Returns the instance itself.
118 """
119 self.topological_embedding_ = TopologicalEmbedding(
120 window_size=self.window_size,
121 step=self.step,
122 n_centers_by_dim=self.n_centers_by_dim,
123 tda_max_dim=self.tda_max_dim,
124 )
125 embedding = self.topological_embedding_.fit_transform(X)
126 try:
127 super().fit(embedding)
128 except ValueError:
129 warnings.warn(f"Failed with support_fraction={self.support_fraction}. ")
131 self._update_padding(X)
132 raw_scores = super().score_samples(embedding)
133 self.dist_ = score_flat_fast_remapping(
134 raw_scores,
135 window_size=self.window_size,
136 stride=self.step,
137 padding_length=self.padding_length_,
138 )
139 self.offset_ = np.percentile(self.dist_, 100.0 * self.contamination)
140 return self
142 def _update_padding(self, X):
143 imax = (X.shape[0] - self.window_size) // self.step
144 self.padding_length_ = X.shape[0] - (self.step * imax + self.window_size)
146 def _raw_score_samples(self, X):
147 """Compute raw Mahalanobis scores (pre-remapping)."""
148 check_is_fitted(self, "topological_embedding_")
149 self._update_padding(X)
150 embedding = self.topological_embedding_.transform(X)
151 return super().score_samples(embedding)
153 def score_samples(self, X, y=None):
154 """Compute anomaly scores from topological features."""
155 raw_scores = self._raw_score_samples(X)
156 return score_flat_fast_remapping(
157 raw_scores,
158 window_size=self.window_size,
159 stride=self.step,
160 padding_length=self.padding_length_,
161 )
163 def decision_function(self, X):
164 """Return the distance to the decision boundary."""
165 return self.offset_ - self.score_samples(X)
167 def predict(self, X):
168 """Predict inliers (1) and outliers (-1) using learned threshold."""
169 scores = self.score_samples(X)
170 return np.where(scores < self.offset_, -1, 1)
172 def transform(self, X):
173 """Alias for score_samples. Returns anomaly scores."""
174 return self.score_samples(X)