tdaad.utils package๏ƒ

Submodules๏ƒ

tdaad.utils.local_elliptic_envelope module๏ƒ

Pandas Elliptic Envelope.

tdaad.utils.local_elliptic_envelope.pandas_mahalanobis(self, X)[source]๏ƒ

Compute the negative Mahalanobis distances of embedding matrix X.

Parameters:

X (array-like of shape (n_samples, n_features)) โ€“ The embedding matrix.

Returns:

negative_mahal_distances โ€“ Opposite of the Mahalanobis distances.

Return type:

pandas.DataFrame of shape (n_samples,)

tdaad.utils.local_elliptic_envelope.pandas_score_samples(self, X)[source]๏ƒ

Compute the negative Mahalanobis distances.

Parameters:

X (array-like of shape (n_samples, n_features)) โ€“ The data matrix.

Returns:

negative_mahal_distances โ€“ Opposite of the Mahalanobis distances.

Return type:

array-like of shape (n_samples,)

tdaad.utils.remapping_functions module๏ƒ

Remapping Functions.

tdaad.utils.remapping_functions.score_flat_fast_remapping(scores, window_size, stride, padding_length=0)[source]๏ƒ

Remap window-level anomaly scores to a flat sequence of per-time-step scores.

Parameters:
  • scores (array-like of shape (n_windows,)) โ€“ Anomaly scores for each window. Can be a pandas Series or NumPy array.

  • window_size (int) โ€“ Size of the sliding window.

  • stride (int) โ€“ Step size between windows.

  • padding_length (int, optional (default=0)) โ€“ Extra length to pad the output array (typically at the end of a signal).

Returns:

remapped_scores โ€“ Flattened anomaly scores with per-timestep resolution. NaN values (from positions not covered by any window) are replaced with 0.

Return type:

np.ndarray of shape (n_timestamps + padding_length,)

tdaad.utils.tda_functions module๏ƒ

Persistence Diagram Transformers.

tdaad.utils.tda_functions.transform_to_persistence_diagram(X, tda_max_dim=0)[source]๏ƒ

Persistence Diagram Transformer for point cloud.

For a given point cloud, form a similarity matrix and apply a RipsPersistence procedure to produce topological descriptors in the form of persistence diagrams.

Read more in the :ref: User Guide <persistence_diagrams>.

Parameters:

tda_max_dim โ€“ int, default=0 The maximum dimension of the topological feature extraction.

Example

>>> n_timestamps = 100
>>> n_sensors = 5
>>> import numpy as np
>>> np.corrcoef(X)
>>> import pandas as pd
>>> timestamps = pd.to_datetime('2024-01-01', utc=True) + pd.Timedelta(1, 'h') * np.arange(n_timestamps)
>>> X = pd.DataFrame(np.random.random(size=(n_timestamps, n_sensors)), index=timestamps)
>>> PersistenceDiagramTransformer().fit_transform(X.to_numpy())

tdaad.utils.window_functions module๏ƒ

Window Functions.

tdaad.utils.window_functions.hash_window(window: ndarray) str[source]๏ƒ

Hash encoding of sliding window index.

tdaad.utils.window_functions.sliding_window_3D_view(data, window_size, step)[source]๏ƒ

Create a 3D sliding window view over a 2D array without copying data.

This function returns overlapping sliding windows from a 2D input array using NumPyโ€™s as_strided for memory-efficient view creation. The resulting 3D array has shape (num_windows, window_size, num_features), where each window contains window_size rows from the original data, spaced by step.

Parameters:
  • data (np.ndarray) โ€“ Input 2D array of shape (num_rows, num_features).

  • window_size (int) โ€“ Number of consecutive rows to include in each window.

  • step (int) โ€“ Step size (stride) between successive windows.

Returns:

3D array of shape (num_windows, window_size, num_features), where each entry is a view into the original data.

Return type:

np.ndarray

Notes

  • This function uses np.lib.stride_tricks.as_strided, which does not copy the data. Be cautious when modifying the output array.

  • The number of windows returned is calculated as: floor((num_rows - window_size) / step) + 1

tdaad.utils.window_functions.sliding_window_ppl_pp(data, func, window_size=120, step=5, n_jobs=-1)[source]๏ƒ

Apply a processing function to sliding windows over time series data in parallel.

This function slices a 2D time series (Pandas DataFrame) into overlapping windows, applies a user-defined function (func) to each window in parallel, and returns the aggregated results as a DataFrame indexed by a hash of each window.

Parameters:
  • data (pd.DataFrame) โ€“ Input 2D time series data with shape (num_rows, num_features). Must be indexable and convertible to a NumPy array.

  • func (callable) โ€“ Function to apply to each window. It should accept a NumPy array of shape (window_size, num_features) and return a result (e.g., scalar, dict, or Series).

  • step (int, optional (default=5)) โ€“ Step size (stride) between successive windows.

  • window_size (int, optional (default=120)) โ€“ Number of consecutive rows to include in each sliding window.

  • n_jobs (int, optional (default=-1)) โ€“ Number of parallel jobs to run. Passed to joblib.Parallel. Use -1 to utilize all available CPUs.

Returns:

DataFrame where each row corresponds to a window. The index is a unique hash of the window content (via hash_window), and each row contains the result of func(w).

Return type:

pd.DataFrame

Notes

  • Requires the helper function _sliding_window_3D_view() to create window views.

  • Requires a hash_window() function that generates a unique, hashable ID for a window.

  • Function assumes that func(w) returns something convertible to a dictionary-like format (e.g., dict, Series) for use with pd.DataFrame.from_dict.

Example

>>> def mean_window(w):
...     return {'mean': w.mean()}
>>> result = sliding_window_ppl_pp(X, func=mean_window, window_size=10, step=2)
>>> print(result.head())

Module contents๏ƒ