Coverage for tdaad/utils/window_functions.py: 94%

17 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 16:23 +0000

1"""Window Functions.""" 

2 

3# Author: Martin Royer 

4 

5import hashlib 

6import numpy as np 

7import pandas as pd 

8 

9from joblib import Parallel, delayed 

10 

11 

12def hash_window(window: np.ndarray) -> str: 

13 """Hash encoding of sliding window index.""" 

14 return hashlib.sha1(np.ascontiguousarray(window).view(np.uint8)).hexdigest() 

15 

16 

17def sliding_window_3D_view(data, window_size, step): 

18 """ 

19 Create a 3D sliding window view over a 2D array without copying data. 

20 

21 This function returns overlapping sliding windows from a 2D input array 

22 using NumPy's `as_strided` for memory-efficient view creation. The resulting 

23 3D array has shape `(num_windows, window_size, num_features)`, where each 

24 window contains `window_size` rows from the original data, spaced by `step`. 

25 

26 Parameters 

27 ---------- 

28 data : np.ndarray 

29 Input 2D array of shape (num_rows, num_features). 

30 window_size : int 

31 Number of consecutive rows to include in each window. 

32 step : int 

33 Step size (stride) between successive windows. 

34 

35 Returns 

36 ------- 

37 np.ndarray 

38 3D array of shape (num_windows, window_size, num_features), where each 

39 entry is a view into the original `data`. 

40 

41 Notes 

42 ----- 

43 - This function uses `np.lib.stride_tricks.as_strided`, which does not copy 

44 the data. Be cautious when modifying the output array. 

45 - The number of windows returned is calculated as: 

46 floor((num_rows - window_size) / step) + 1 

47 """ 

48 num_rows, num_features = data.shape 

49 

50 shape = (num_rows - window_size + 1, window_size, num_features) 

51 strides = (data.strides[0], data.strides[0], data.strides[1]) 

52 

53 windows = np.lib.stride_tricks.as_strided(data, shape=shape, strides=strides) 

54 return windows[::step] 

55 

56 

57def sliding_window_ppl_pp(data, func, window_size=120, step=5, n_jobs=-1): 

58 """ 

59 Apply a processing function to sliding windows over time series data in parallel. 

60 

61 This function slices a 2D time series (Pandas DataFrame) into overlapping windows, 

62 applies a user-defined function (`func`) to each window in parallel, and returns 

63 the aggregated results as a DataFrame indexed by a hash of each window. 

64 

65 Parameters 

66 ---------- 

67 data : pd.DataFrame 

68 Input 2D time series data with shape (num_rows, num_features). Must be indexable 

69 and convertible to a NumPy array. 

70 func : callable 

71 Function to apply to each window. It should accept a NumPy array of shape 

72 (window_size, num_features) and return a result (e.g., scalar, dict, or Series). 

73 step : int, optional (default=5) 

74 Step size (stride) between successive windows. 

75 window_size : int, optional (default=120) 

76 Number of consecutive rows to include in each sliding window. 

77 n_jobs : int, optional (default=-1) 

78 Number of parallel jobs to run. Passed to `joblib.Parallel`. 

79 Use -1 to utilize all available CPUs. 

80 

81 Returns 

82 ------- 

83 pd.DataFrame 

84 DataFrame where each row corresponds to a window. The index is a unique hash of the 

85 window content (via `hash_window`), and each row contains the result of `func(w)`. 

86 

87 Notes 

88 ----- 

89 - Requires the helper function `_sliding_window_3D_view()` to create window views. 

90 - Requires a `hash_window()` function that generates a unique, hashable ID for a window. 

91 - Function assumes that `func(w)` returns something convertible to a dictionary-like format 

92 (e.g., dict, Series) for use with `pd.DataFrame.from_dict`. 

93 

94 Example 

95 ------- 

96 >>> def mean_window(w): 

97 ... return {'mean': w.mean()} 

98 >>> result = sliding_window_ppl_pp(X, func=mean_window, window_size=10, step=2) 

99 >>> print(result.head()) 

100 """ 

101 windows = sliding_window_3D_view(data.to_numpy(), window_size, step) 

102 

103 results = Parallel(n_jobs=n_jobs)( 

104 delayed(lambda wdw: (hash_window(wdw), func(wdw)))(w) for w in windows 

105 ) 

106 

107 post_result = pd.DataFrame.from_dict(dict(results), orient="index") 

108 return post_result