Coverage for tdaad/utils/remapping_functions.py: 100%
16 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 16:23 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 16:23 +0000
1"""Remapping Functions."""
3# Author: Martin Royer
5import numpy as np
8def score_flat_fast_remapping(scores, window_size, stride, padding_length=0):
9 """
10 Remap window-level anomaly scores to a flat sequence of per-time-step scores.
12 Parameters
13 ----------
14 scores : array-like of shape (n_windows,)
15 Anomaly scores for each window. Can be a pandas Series or NumPy array.
17 window_size : int
18 Size of the sliding window.
20 stride : int
21 Step size between windows.
23 padding_length : int, optional (default=0)
24 Extra length to pad the output array (typically at the end of a signal).
26 Returns
27 -------
28 remapped_scores : np.ndarray of shape (n_timestamps + padding_length,)
29 Flattened anomaly scores with per-timestep resolution. NaN values (from
30 positions not covered by any window) are replaced with 0.
31 """
32 # Ensure scores is a NumPy array
33 if hasattr(scores, "values"):
34 scores = scores.values
36 n_windows = len(scores)
38 # Compute begin and end indices for each window
39 begins = np.arange(n_windows) * stride
40 ends = begins + window_size
42 # Output length based on last window + padding
43 total_length = ends[-1] + padding_length
44 remapped_scores = np.full(total_length, np.nan)
46 # Find all unique intersection points between windows
47 intersections = np.unique(np.concatenate((begins, ends)))
49 # For each interval between two intersections, find overlapping windows and sum their scores
50 for left, right in zip(intersections[:-1], intersections[1:]):
51 overlapping = (begins <= left) & (right <= ends)
52 if np.any(overlapping):
53 remapped_scores[left:right] = np.nansum(scores[overlapping])
55 # Replace NaNs (unscored positions) with 0
56 np.nan_to_num(remapped_scores, copy=False)
58 return remapped_scores