Source code for tadkit.catalog.rawtowideformatter
from typing import Sequence, Optional, Union
import numpy as np
import pandas as pd
from tadkit.base.formatter import Formatter
from tadkit.base.dataframe_type import DataFrameType
def _resample_array(array, timestamps, freq_s):
"""Resample a NumPy array using linear interpolation (seconds-based)."""
old_ts = timestamps.astype("datetime64[ns]").astype("int64") / 1e9 # seconds
new_ts = np.arange(old_ts[0], old_ts[-1] + freq_s, freq_s)
new_array = np.zeros((len(new_ts), array.shape[1]))
for i in range(array.shape[1]):
new_array[:, i] = np.interp(new_ts, old_ts, array[:, i])
new_timestamps = new_ts.astype("datetime64[s]")
return new_array, new_timestamps
def _asynchronous_to_synchronous(df):
"""Convert a long asynchronous DataFrame to wide synchronous format."""
if not {"sensor", "data"}.issubset(df.columns):
raise ValueError("Asynchronous data must have 'sensor', and 'data' columns.")
target = df["timestamp"] if "timestamp" in df.columns else df.index
df["timestamp"] = pd.to_datetime(target)
df_pivot = df.pivot_table(index="timestamp", columns="sensor", values="data")
df_pivot.sort_index(inplace=True)
df_pivot.interpolate(inplace=True)
return df_pivot
[docs]
class RawToWideFormatter(Formatter):
"""
A Formatter that supports both pandas DataFrame and NumPy array outputs.
Parameters
----------
data : pd.DataFrame or np.ndarray
Input data.
backend : str
'pandas' or 'numpy'.
timestamps : np.ndarray, optional
Required if data is a NumPy array.
columns : list[str], optional
Column names for NumPy arrays.
"""
def __init__(
self,
data: Union[pd.DataFrame, np.ndarray],
timestamps: Optional[Sequence] = None,
columns: Optional[Sequence[str]] = None,
backend: str = "numpy",
):
super().__init__()
self.backend = backend
# --- Handle pandas input ---
if isinstance(data, pd.DataFrame):
self.data_df_ = data.copy()
if DataFrameType.infer_from_df(data) == DataFrameType.ASYNCHRONOUS:
self.data_df_ = _asynchronous_to_synchronous(self.data_df_)
self.add_property("converted_to_synchronous")
if "timestamp" in self.data_df_.columns:
self.data_df_.set_index("timestamp", inplace=True)
self.data_df_.index = pd.to_datetime(self.data_df_.index, utc=True)
self.data_df_.index = self.data_df_.index.tz_convert(None)
self.timestamps = self.data_df_.index.to_numpy(dtype="datetime64[ns]")
self.data_array = self.data_df_.values
self.columns = self.data_df_.columns.tolist()
# --- Handle numpy input ---
elif isinstance(data, np.ndarray):
if timestamps is None:
raise ValueError("NumPy input requires a 'timestamps' array.")
self.timestamps = (
pd.to_datetime(timestamps, utc=True)
.tz_convert(None)
.to_numpy(dtype="datetime64[ns]")
)
self.data_array = data
self.columns = (
columns
if columns is not None
else [f"X{i}" for i in range(data.shape[1])]
)
# if pandas backend, create DataFrame from array
if self.backend == "pandas":
self.data_df_ = pd.DataFrame(
data, columns=self.columns, index=self.timestamps
)
else:
raise TypeError("Data must be a pandas DataFrame or a NumPy ndarray.")
"""Set available properties and default query description."""
self.add_property("synchronous")
self.add_property(f"{self.backend}_backend")
diffs = np.diff(self.timestamps.astype("datetime64[s]"))
if len(set(diffs)) == 1:
self.add_property("fixed_time_step")
"""Set default query description."""
self.add_query_description(
"target_period",
{
"description": "Time period to select",
"family": "time_interval",
"start": self.timestamps[0],
"stop": self.timestamps[-1],
"default": (self.timestamps[0], self.timestamps[-1]),
},
)
self.add_query_description(
"target_space",
{
"description": "Columns or sensors to select",
"family": "space",
"set": self.columns,
"default": self.columns,
},
)
self.add_query_description(
"resample",
{"description": "Resample data?", "family": "bool", "default": False},
)
self.add_query_description(
"resample_freq",
{
"description": "Resampling frequency in seconds.",
"family": "time",
"start": 60,
"default": 120,
"stop": 3600,
},
)
[docs]
def format(
self,
target_period=None,
target_space=None,
resample=False,
resample_freq: float = 1.0,
):
"""
Slice and optionally resample the data, using backend-specific resampling.
"""
# reset time-series-type properties
self.remove_property("univariate_time_series")
self.remove_property("multiple_time_series")
# --- target period ---
start, stop = (
target_period
if target_period is not None
else (self.timestamps[0], self.timestamps[-1])
)
mask = (self.timestamps >= start) & (self.timestamps <= stop)
# --- target space ---
selected_space = target_space if target_space is not None else self.columns
idx = [self.columns.index(c) for c in selected_space]
# --- slice data ---
array_slice = self.data_array[mask][:, idx]
# --- univariate/multivariate properties ---
if len(selected_space) > 1:
self.add_property("multiple_time_series")
else:
self.add_property("univariate_time_series")
if self.backend == "numpy":
if resample:
array_slice, _ = _resample_array(
array_slice, self.timestamps[mask], resample_freq
)
return array_slice
else: # pandas backend
df_slice = pd.DataFrame(
array_slice, columns=selected_space, index=self.timestamps[mask]
)
if resample:
df_slice = df_slice.resample(f"{int(resample_freq)}s").interpolate()
return df_slice