Coverage for tdaad/anomaly_detectors.py: 84%
44 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-13 13:45 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-13 13:45 +0000
1"""Topological Anomaly Detectors."""
3# Author: Martin Royer
5from typing import Sequence
6from numbers import Integral
8import pandas as pd
10from sklearn.utils._param_validation import Interval
11from sklearn.base import _fit_context, TransformerMixin
12from tdaad.utils.remapping_functions import score_flat_fast_remapping
13from tdaad.topological_embedding import TopologicalEmbedding
14from tdaad.utils.local_elliptic_envelope import EllipticEnvelope
17class TopologicalAnomalyDetector(EllipticEnvelope, TransformerMixin):
18 """Object for detecting anomaly base on Topological Embedding and sklearn.covariance.EllipticEnvelope.
20 This object analyzes multiple time series data through the following operations:
21 - run a sliding window algorithm and represent each time series window with topological features,
22 see :ref:`Topological Embedding <topological_embedding>`,
24 - use a MinCovDet algorithm to robustly estimate the data mean and covariance in the embedding space,
25 and use these to derive an embedding mahalanobis distance and associated outlier detection procedure,
26 see :ref:`Elliptic Envelope <elliptic_envelope>`.
28 After fitting, it is able to produce an anomaly score from a time series describing normal / abnormal time segments.
29 (the lower, the more abnormal)
30 The predict method (inherited from EllipticEnvelope) allows to transform that score into
31 binary normal / anomaly labels.
33 Read more in the :ref:`User Guide <topological_anomaly_detection>`.
35 Parameters
36 ----------
37 window_size : int, default=40
38 Size of the sliding window algorithm to extract subsequences as input to named_pipeline.
39 step : int, default=5
40 Size of the sliding window steps between each window.
41 tda_max_dim : int, default=2
42 The maximum dimension of the topological feature extraction.
43 n_centers_by_dim : int, default=5
44 The number of centroids to generate by dimension for vectorizing topological features.
45 The resulting embedding will have total dimension =< tda_max_dim * n_centers_by_dim.
46 The resulting embedding dimension might be smaller because of the KMeans algorithm in the Archipelago step.
47 support_fraction : float, default=None
48 The proportion of points to be included in the support of the raw
49 MCD estimate. If None, the minimum value of support_fraction will
50 be used within the algorithm: `[n_sample + n_features + 1] / 2`.
51 Range is (0, 1).
52 contamination : float, default=0.1
53 The amount of contamination of the data set, i.e. the proportion
54 of outliers in the data set. Range is (0, 0.5]. Only matters for computing the decision function.
55 random_state : int, RandomState instance or None, default=None
56 Determines the pseudo random number generator for shuffling
57 the data. Pass an int for reproducible results across multiple function
58 calls.
60 Attributes
61 ----------
62 topological_embedding_ : object
63 TopologicalEmbedding transformer object that is fitted at `fit`.
65 Examples
66 --------
67 >>> import numpy as np
68 >>> n_timestamps = 1000
69 >>> n_sensors = 20
70 >>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps)
71 >>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps)
72 >>> X.iloc[n_timestamps//2:,:10] = -X.iloc[n_timestamps//2:,10:20]
73 >>> detector = TopologicalAnomalyDetector(n_centers_by_dim=2, tda_max_dim=1).fit(X)
74 >>> anomaly_scores = detector.score_samples(X)
75 >>> decision = detector.decision_function(X)
76 >>> anomalies = detector.predict(X)
77 """
79 required_properties: Sequence[str] = ["multiple_time_series"]
81 _parameter_constraints: dict = {
82 **EllipticEnvelope._parameter_constraints,
83 "window_size": [Interval(Integral, left=2, right=None, closed="left")],
84 "step": [Interval(Integral, left=1, right=None, closed="left")],
85 "tda_max_dim": [Interval(Integral, left=0, right=3, closed="left")],
86 "n_centers_by_dim": [Interval(Integral, left=1, right=None, closed="left")],
87 }
89 def __init__(
90 self,
91 window_size: int = 100,
92 step: int = 5,
93 tda_max_dim: int = 2,
94 n_centers_by_dim: int = 5,
95 support_fraction: float = None,
96 contamination: float = 0.1,
97 random_state: int = 42,
98 ):
99 super().__init__(
100 support_fraction=support_fraction,
101 contamination=contamination,
102 random_state=random_state
103 )
104 self.window_size = window_size
105 self.step = step
106 self.tda_max_dim = tda_max_dim
107 self.n_centers_by_dim = n_centers_by_dim
109 @_fit_context(prefer_skip_nested_validation=True)
110 def fit(self, X, y=None):
111 """Fit the TopologicalAnomalyDetector model.
113 Args
114 ----
115 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors)
116 Multiple time series to transform, where `n_timestamps` is the number of timestamps
117 in the series X, and `n_sensors` is the number of sensors.
118 y : Ignored
119 Not used, present for API consistency by convention.
121 Returns
122 -------
123 self : object
124 Returns the instance itself.
125 """
126 self.topological_embedding_ = TopologicalEmbedding(
127 window_size=self.window_size,
128 step=self.step,
129 n_centers_by_dim=self.n_centers_by_dim,
130 tda_max_dim=self.tda_max_dim,
131 )
132 if not hasattr(X, "index"):
133 X = pd.DataFrame(data=X, index=range(X.shape[0]))
134 embedding = self.topological_embedding_.fit_transform(X)
135 try:
136 super().fit(embedding)
137 except ValueError as e:
138 print(f"Catching {e=}, will increase support fraction.")
139 self.support_fraction = 1
140 super().fit(embedding)
141 self.support_fraction = None
142 return self
144 def _warped_score_samples(self, X, y=None): # this exists to retrieve scores before remapping
145 """Compute the negative Mahalanobis distances associated with the TopologicalEmbedding representation of X.
147 Args
148 ----
149 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors)
150 Multiple time series to transform, where `n_timestamps` is the number of timestamps
151 in the series X, and `n_sensors` is the number of sensors.
152 y : Ignored
153 Not used, present for API consistency by convention.
155 Returns
156 -------
157 negative_mahal_distances : pandas.DataFrame of shape (n_samples,)
158 Opposite of the Mahalanobis distances.
159 """
160 if not hasattr(X, "index"):
161 X = pd.DataFrame(data=X, index=range(X.shape[0]))
163 imax = (X.shape[0] - self.window_size) // self.step
164 self.padding_length_ = X.shape[0] - (self.step * imax + self.window_size)
165 print(f"{X.shape[0]=}, {self.window_size=}, {self.step=}, so running {self.padding_length_=}...")
167 embedding = self.topological_embedding_.transform(X=X)
168 return super().score_samples(embedding)
170 def score_samples(self, X, y=None):
171 """Compute the negative Mahalanobis distances associated with the TopologicalEmbedding representation of X.
173 Args
174 ----
175 X : {array-like, sparse matrix} of shape (n_timestamps, n_sensors)
176 Multiple time series to transform, where `n_timestamps` is the number of timestamps
177 in the series X, and `n_sensors` is the number of sensors.
178 y : Ignored
179 Not used, present for API consistency by convention.
181 Returns
182 -------
183 negative_mahal_distances : ndarray of shape (n_samples,)
184 Opposite of the Mahalanobis distances.
185 """
186 warped_score_samples = self._warped_score_samples(X)
188 unwarped_scores = score_flat_fast_remapping(warped_score_samples, window_size=self.window_size,
189 stride=self.step, padding_length=self.padding_length_)
190 # print(f"...yields {remapped_scores.shape[0]=}")
192 return unwarped_scores
195TopologicalAnomalyDetector.transform = TopologicalAnomalyDetector.score_samples