Coverage for tdaad/utils/remapping_functions.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 16:23 +0000

1"""Remapping Functions.""" 

2 

3# Author: Martin Royer 

4 

5import numpy as np 

6 

7 

8def score_flat_fast_remapping(scores, window_size, stride, padding_length=0): 

9 """ 

10 Remap window-level anomaly scores to a flat sequence of per-time-step scores. 

11 

12 Parameters 

13 ---------- 

14 scores : array-like of shape (n_windows,) 

15 Anomaly scores for each window. Can be a pandas Series or NumPy array. 

16 

17 window_size : int 

18 Size of the sliding window. 

19 

20 stride : int 

21 Step size between windows. 

22 

23 padding_length : int, optional (default=0) 

24 Extra length to pad the output array (typically at the end of a signal). 

25 

26 Returns 

27 ------- 

28 remapped_scores : np.ndarray of shape (n_timestamps + padding_length,) 

29 Flattened anomaly scores with per-timestep resolution. NaN values (from 

30 positions not covered by any window) are replaced with 0. 

31 """ 

32 # Ensure scores is a NumPy array 

33 if hasattr(scores, "values"): 

34 scores = scores.values 

35 

36 n_windows = len(scores) 

37 

38 # Compute begin and end indices for each window 

39 begins = np.arange(n_windows) * stride 

40 ends = begins + window_size 

41 

42 # Output length based on last window + padding 

43 total_length = ends[-1] + padding_length 

44 remapped_scores = np.full(total_length, np.nan) 

45 

46 # Find all unique intersection points between windows 

47 intersections = np.unique(np.concatenate((begins, ends))) 

48 

49 # For each interval between two intersections, find overlapping windows and sum their scores 

50 for left, right in zip(intersections[:-1], intersections[1:]): 

51 overlapping = (begins <= left) & (right <= ends) 

52 if np.any(overlapping): 

53 remapped_scores[left:right] = np.nansum(scores[overlapping]) 

54 

55 # Replace NaNs (unscored positions) with 0 

56 np.nan_to_num(remapped_scores, copy=False) 

57 

58 return remapped_scores