Coverage for uqmodels/preprocessing/preprocessing.py: 10%
449 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1"""
2Data preprocessing module.
3"""
5import copy
7import numpy as np
8import pandas as pd
9import sklearn
10from scipy.interpolate import interp1d
11from scipy.sparse.csgraph import connected_components
12from sklearn.model_selection import TimeSeriesSplit
14import uqmodels.preprocessing.structure as pre_struc
15import uqmodels.utils as ut
17# Target selection based on corrcoef
20def compute_corr_and_filter(data):
21 mat_corr = np.corrcoef(data.T)
22 signal_to_drop = np.isnan(mat_corr[0])
23 mat_corr[np.isnan(mat_corr)] = 0
24 return (mat_corr, signal_to_drop)
27def get_k_composants(mat_corr, n_cible):
28 d = mat_corr.shape[0]
29 corr_flow = []
30 for i in range(100):
31 corr_flow.append(
32 connected_components(
33 np.maximum(0, np.abs(mat_corr) - i / 100) != 0, connection="strong"
34 )[1]
35 )
36 corr_flow = np.concatenate(corr_flow).reshape(100, d)
37 step = np.argmax(np.max(corr_flow, axis=1) == n_cible)
38 list_components = []
39 for component in range(n_cible):
40 mask = corr_flow[step] == component
41 list_components.append(np.arange(d)[mask])
42 return list_components
45def select_best_representant(mat_corr, list_components):
46 best_signals = []
47 for i, ind_components in enumerate(list_components):
48 ind_best = np.argmax(mat_corr[ind_components][:, ind_components].sum(axis=1))
49 best_signals.append(ind_components[ind_best])
50 return best_signals
53def select_signal(data, n_cible=None):
54 if n_cible is None:
55 n_cible = data.shape[1]
57 mat_corr, signal_to_drop = compute_corr_and_filter(data)
58 raw_ind = np.arange(data.shape[1])
59 restricted_mat_corr = mat_corr[~signal_to_drop][:, ~signal_to_drop]
60 list_restricted_components = get_k_composants(restricted_mat_corr, n_cible)
61 best_restricted_signals = select_best_representant(
62 restricted_mat_corr, list_restricted_components
63 )
64 best_signals = raw_ind[~signal_to_drop][best_restricted_signals]
66 print(np.max(mat_corr[best_signals], axis=0).sum() / data.shape[1])
67 return (best_signals, signal_to_drop)
70#
73def check_is_pd_date(date):
74 if isinstance(date, str):
75 try:
76 date = pd.to_datetime(date)
77 except BaseException:
78 pass
79 return date
82# Base replacement function
85def locate_near_index_values(index_scale, index_val):
86 if (index_val > index_scale[-1]) or (index_val < index_scale[0]):
87 return None
88 else:
89 ind = np.abs(index_scale - index_val).argmin()
90 if (index_scale[ind] - index_val) > 0:
91 return ind - 1
92 else:
93 return ind
96def get_event_into_series(
97 list_events, index_scale, n_type_event, dtype="datetime64[s]"
98):
99 """Locate flag erros of sensors in regular time_refenrencial"""
100 nan_series = np.zeros((len(index_scale), n_type_event + 1))
101 index_scale = index_scale.astype(dtype).astype(float)
102 for n, dict_events in enumerate(list_events):
103 for index_event in list(dict_events.keys()):
104 type_anom = dict_events[index_event]
105 try:
106 start_event, end_event = index_event
107 start_index = (
108 pd.to_datetime(start_event)
109 .to_datetime64()
110 .astype(dtype)
111 .astype(float)
112 )
114 end_index = (
115 pd.to_datetime(end_event)
116 .to_datetime64()
117 .astype(dtype)
118 .astype(float)
119 )
121 if (start_index is None) & (end_index is None):
122 pass
123 else:
124 if start_index is None:
125 start_index = 0
127 if end_index is None:
128 end_index = -1
130 nan_series[start_index:end_index, type_anom] += 1
132 except BaseException:
133 index_event = (
134 pd.to_datetime(index_event)
135 .to_datetime64()
136 .astype(dtype)
137 .astype(float)
138 )
139 ind = locate_near_index_values(index_scale, index_event)
140 nan_series[ind, type_anom] += 1
142 return nan_series
145def extract_sensors_errors(series, type_sensor_error=[]):
146 """Extract list of non floating values
148 Args:
149 series (_type_): series of sensor_values
150 type_sensor_error (list, optional): list of others errors.
151 """
153 for i in list(set(series)):
154 try:
155 float(i)
156 except ValueError as e:
157 e = str(e).split("'")[1]
158 if e not in type_sensor_error:
159 type_sensor_error.append(e)
160 return type_sensor_error
163def handle_nan(y):
164 "Replace nan values by last values"
165 nan_flag = np.isnan(y)
166 last_non_nan = np.copy(y)
167 for j in np.arange(len(y))[nan_flag]:
168 if j < 1:
169 last_non_nan[j] = np.nanmean(y, axis=0)
170 else:
171 last_non_nan[j] = y[j - 1]
172 v = last_non_nan[j]
173 y[j] = v
174 return y
177def interpolate(
178 x,
179 y,
180 xnew=None,
181 time_structure=None,
182 type_interpolation="linear",
183 fill_values=None,
184 moving_average=False,
185):
186 """Drop nan values & perform 'interpolation' interpolation from [x,y] to [xnew,ynew]
187 if xnew is none, compute xnew from time_structure
189 if moving_average=True perform "interpolate moving average" using int(len(xnew)/len(x))= M
190 in order to perform mean of M interpolated point evenly distributed for each step.
192 Args:
193 x (array): X_axis
194 y (array): Y_axis (values)
195 xnew (array): new X_axis
196 moving_average (bool, optional): Perform moving average 'interpolation'.
198 Returns:
199 ynew: new interpolated Y_axis
200 """
201 if x is None:
202 x = np.arange(len(y))
204 if fill_values is None:
205 fill_values = np.nanmean(y, axis=0)
207 only_fill_nan = False
208 if (time_structure is None) and (xnew is None):
209 only_fill_nan = True
210 xnew = x
212 precision = 0
213 if xnew is None:
214 # Interpolation de la série avec prise en compte de l'échantillonage théorique selon la time structure
215 if time_structure is None:
216 print("time_structure is None")
217 raise ValueError
219 delta = time_structure.get("delta", default_value=1)
220 # If precision and frequence are provided, else default_value
221 precision = time_structure.get("precision", default_value=0)
222 frequence = time_structure.get("frequence", default_value=1)
224 new_time_step = delta * frequence
226 xnew = pre_struc.get_regular_step_scale(
227 delta=new_time_step, range_=(x[-1] - x[0]), time_offset=x[0]
228 )
230 # Ignore Nan values
231 nan_flag = np.isnan(y)
233 # ration of moding average
234 ynew = np.zeros(len(xnew))
235 f = interp1d(
236 x[~nan_flag], y[~nan_flag], type_interpolation, fill_value="extrapolate"
237 )
239 if only_fill_nan:
240 ynew = y
241 ynew[nan_flag] = f(xnew[nan_flag])
243 else:
244 ynew = np.zeros(len(xnew))
246 # ratio of moving average
247 ratio = 1
248 if moving_average:
249 ratio = int(len(x) / len(xnew))
250 if ratio < 1:
251 ratio = 1
253 for i in range(ratio):
254 ynew += f(xnew - (i * xnew[0]) / ratio) / ratio
256 if precision > 0:
257 ynew = np.round(ynew, -int(np.floor(np.log10(precision))))
259 return xnew, ynew
262def add_row(df, date_pivot, mode="first"):
263 """Add first or last np.Nan row to df with date_pivot as index values.
265 Args:
266 df (_type_): dataframe
267 date_pivot (_type_): index
268 mode (str, optional): 'first' or 'last'. Defaults to 'first'.
270 Returns:
271 df: dataframe augmented with one row
272 """
273 new_row = copy.deepcopy(df.iloc[0:1])
274 for columns in new_row.columns:
275 new_row.loc[new_row.index[0], columns] = np.NaN
276 new_row = new_row.rename({new_row.index[0]: date_pivot})
278 if mode == "first":
279 df = pd.concat([new_row, df])
281 if mode == "last":
282 df = pd.concat([df, new_row])
284 return df
287def remove_rows(df, date_pivot, mode="first"):
288 """Remove rows smaller/greated than date_pivot. then add apply add_row
290 Args:
291 df (_type_): dataframe
292 date_pivot (_type_): index_pivot
293 mode (str, optional): 'first' or 'last'. Defaults to 'first'.
295 Returns:
296 df: dataframe which removed values and a new bondary row
297 """
298 if mode == "first":
299 if df.index[0] > date_pivot:
300 pass
301 else:
302 df = df[df.index > date_pivot]
303 if mode == "last":
304 if df.index[-1] < date_pivot:
305 pass
306 else:
307 df = df[df.index < date_pivot]
308 df = add_row(df, date_pivot, mode=mode)
309 return df
312def df_selection(df, start_date=None, end_date=None):
313 """Format dataframe to obtain a new version that start at start_date and finish and end_date
315 Args:
316 df (_type_): dataframe
317 start_date (_type_, optional): strat_date or None. Defaults to None: do nothnig
318 end_date (_type_, optional): end_date or None. Defaults to None: do nothnig
320 Returns:
321 dataframe: Time formated dataframe
322 """
324 if start_date is not None:
325 start_date = check_is_pd_date(start_date)
326 df = remove_rows(df, start_date, mode="first")
327 if end_date is not None:
328 end_date = check_is_pd_date(end_date)
329 df = remove_rows(df, end_date, mode="last")
330 return df
333def upscale_series(
334 dataframe,
335 delta,
336 offset=None,
337 start_date=None,
338 end_date=None,
339 mode="time",
340 max_time_jump=10,
341 replace_val=None,
342 **kwargs
343):
344 """Upsample series using pandas interpolation function
346 Args:
347 dataframe (_type_): data to resample
348 delta (_type_): Timedelta
349 offset (str, optional): _description_. Defaults to '-1ms'.
350 origin (str, optional): _description_. Defaults to 'start_day'.
351 mode (str, optional): _description_. Defaults to 'time'.
352 max_time_jump (int, optional): _description_. Defaults to 10.
353 replace_val (_type_, optional): _description_. Defaults to None.
355 Returns:
356 _type_: _description_
357 """
358 # Generate upsampling with NaN values
360 start_date = pre_struc.check_date(start_date)
361 end_date = pre_struc.check_date(end_date)
362 delta = pre_struc.check_delta(delta)
364 origin_date = start_date
365 if delta is not None:
366 origin_date = origin_date - delta
368 dataframe = df_selection(dataframe, start_date=origin_date, end_date=end_date)
370 # Fill nan using interpolation
371 if mode == "previous":
372 upscale_dataframe = dataframe.resample(
373 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset
374 ).ffill(limit=max_time_jump)
375 else:
376 upscale_dataframe = (
377 dataframe.resample(
378 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset
379 )
380 .mean()
381 .interpolate(mode, limit=max_time_jump)
382 )
384 if max_time_jump is not None:
385 mask = np.isnan(upscale_dataframe.values)
386 # Replace using by min values
387 if replace_val is None:
388 replace_val = dataframe.min(axis=0).values - 0.1 * np.abs(
389 dataframe.min(axis=0).values
390 )
392 # Replace using default values
393 if isinstance(replace_val, np.ndarray):
394 for i, val in enumerate(replace_val):
395 upscale_dataframe.iloc[mask[:, i], i] = val
397 else:
398 upscale_dataframe[mask] = replace_val
400 return upscale_dataframe[1:]
403def downscale_series(
404 dataframe,
405 delta,
406 offset="-1ms",
407 start_date=None,
408 end_date=None,
409 mode="mean",
410 dtype="datetime64[s]",
411 **kwargs
412):
413 end_date = pre_struc.check_date(end_date, dtype)
414 start_date = pre_struc.check_date(start_date, dtype)
415 delta = pre_struc.check_delta(delta, dtype)
417 dataframe = df_selection(
418 dataframe, start_date=pd.to_datetime(start_date - delta), end_date=end_date
419 )
421 downscale = dataframe.resample(
422 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset
423 )
425 if hasattr(downscale, mode):
426 func_ = getattr(downscale, mode)
427 downscale_dataframe = func_().interpolate("time")
428 else:
429 raise (ValueError, "mode:", mode, "not handle")
430 downscale_dataframe.index -= pd.Timedelta(offset)
431 return downscale_dataframe
434def rolling_statistics(
435 data, delta, step=None, reduc_functions=["mean"], reduc_names=["mean"], **kwargs
436):
437 """Compute rollling_statistics from dataframe
439 Args:
440 data (pd.DataFrame): dataframe (times,sources)
441 delta (int or timedelta64): size of rolling window
442 step (int): Evaluate the window at every ``step`` result
443 reduc_functions (_type_): str of pandas window function (fast) or custom set->stat function (slow)
444 reduc_names (_type_): name stored in stat_dataframe
445 time_mask (_type_, optional): time_mask. Defaults to None.
446 **kwargs: others paramaters provide to DataFrame.rolling
448 Returns:
449 _type_: _description_
450 """
452 new_data = []
453 colums_transformation = []
454 reduc_functions = reduc_functions.copy()
455 reduc_names = reduc_names.copy()
457 columns = data.columns
458 if not isinstance(delta, int):
459 delta = pd.Timedelta(pre_struc.check_delta(delta, "datetime64[ns]"))
460 data.index = data.index.astype("datetime64[ns]")
462 flag_extremum = False
463 if "extremum" in reduc_functions:
464 flag_extremum = True
465 ind_extremum = reduc_functions.index("extremum")
466 reduc_functions[ind_extremum] = "max"
467 reduc_functions.append("min")
468 reduc_names.append("min")
470 for n, reduc_func in enumerate(reduc_functions):
471 colums_transformation.append(reduc_names[n])
472 if isinstance(reduc_func, str):
473 if reduc_func == "cov_to_median":
474 # Hold unimplementation of step
475 df_mean = data.rolling(delta, step=step, min_periods=1, **kwargs).mean()
476 tmp_df = df_mean.rolling(20, min_periods=1).cov(df_mean.median(axis=1))
477 tmp_df = tmp_df.fillna(0)
478 elif reduc_func == "corr_to_median":
479 # Hold unimplementation of step
480 df_mean = data.rolling(delta, step=step, min_periods=1, **kwargs).mean()
481 tmp_df = df_mean.rolling(20).corr(df_mean.median(axis=1))
482 tmp_df = tmp_df.fillna(0)
483 else:
484 rolling = data.rolling(delta, step=step, min_periods=1, **kwargs)
485 func_ = getattr(rolling, reduc_func)
486 tmp_df = func_()
488 else:
489 rolling = data.rolling(delta, step=step, min_periods=1, **kwargs)
490 tmp_df = rolling.apply(reduc_func)
491 tmp_df.columns = [reduc_names[n] + "_" + str(i) for i in tmp_df.columns]
492 new_data.append(tmp_df)
494 data = pd.concat(new_data, axis=1)
495 if flag_extremum:
496 for column in columns:
497 column = str(column)
498 # Max is already named extremum
499 data["extremum_" + column] = np.maximum(
500 np.abs(data["min_" + column].values-data["mean_" + column].values),
501 np.abs(data["extremum_" + column].values-data["mean_" + column].values),
502 )
503 data.drop(["min_" + column], axis=1, inplace=True)
505 return data
508# Base measure
511def entropy(y, set_val, v_bins=100):
512 "Compute naive entropy score of y by tokenize values with max of v_bins"
513 flag_nan = np.isnan(y)
514 y[flag_nan] = -0.1
515 if len(y) == 0:
516 return 0
517 else:
518 labels = np.digitize(
519 y,
520 bins=np.sort(
521 [
522 np.quantile(y, i / set_val)
523 for i in np.arange(0, min(set_val, v_bins))
524 ]
525 ),
526 )
527 labels = labels.astype(int)
529 value, counts = np.unique(labels, return_counts=True)
530 count = np.zeros(set_val + 1) + 1
531 for n, i in enumerate(value):
532 count[i] = counts[n]
533 norm_counts = count / count.sum()
534 return -(norm_counts * np.log(norm_counts)).sum()
537# Preprocessing function:
540def df_interpolation_and_fusion(list_df, target_index_scale, dtype="datetime64[s]"):
541 """Interpolation of all sources on a same temporal referencial
543 Args:
544 list_df (list of 2D array): List of dataframe
545 target_index_scale (_type_): Indice of sensors
546 dtype:
548 Returns:
549 interpolated_data: List of interpolated array
550 """
551 interpolated_data = []
552 list_columns = []
553 if type(target_index_scale):
554 target_index_scale = target_index_scale.astype(dtype).astype(float)
556 for df in list_df:
557 df_index = df.index.values.astype(dtype).astype(float)
558 list_columns.append(list(df.columns))
559 if len(df_index) == len(target_index_scale):
560 if (df_index - target_index_scale).sum() == 0:
561 interpolated_data.append(df.values)
562 else:
563 new_channels = np.stack(
564 [
565 interpolate(
566 x=df_index,
567 y=channels,
568 xnew=target_index_scale,
569 moving_average=True,
570 )[1]
571 for channels in df.values.T
572 ]
573 ).T
575 interpolated_data.append(new_channels)
577 interpolated_data = np.swapaxes(interpolated_data, 0, 1).reshape(
578 len(target_index_scale), -1
579 )
581 interpolated_data = pd.DataFrame(
582 interpolated_data,
583 columns=np.concatenate(list_columns),
584 index=target_index_scale.astype(dtype),
585 )
587 return interpolated_data
590# Map reduce statistics (may be computationally suboptimal)
593def Past_Moving_window_mapping(array, deta, window_size=None):
594 # Past moving mapping using yield: Unefficient for large data !
595 if window_size is None:
596 window_size = deta
598 for i in range(int(np.floor(len(array) / deta))):
599 yield array[max(0, i * deta - window_size) : i * deta + 1]
602def identity(x, **kwargs):
603 return x
606def map_reduce(data, map_=identity, map_paramaters={}, reduce=identity):
607 mapping = Regular_Moving_window_mapping(data, **map_paramaters)
608 reduced = np.array(list(map(reduce, mapping)))
609 return reduced
612def auto_corr_reduce(set_):
613 mean = np.mean(set_)
614 var = np.var(set_)
615 set_ = set_ - mean
616 acorr = np.correlate(set_, set_, "full")[len(set_) - 1 :]
617 acorr = acorr / var / len(set_)
618 return acorr
621def mean_reduce(set_):
622 mu = np.nanmean(set_, axis=0)
623 std = np.nanstd(set_, axis=0)
624 carac = np.concatenate([mu[:, None], std[:, None]], axis=1)
625 return carac
628def last_reduce(set_):
629 return set_[-1]
632def corrcoef_reduce(set_):
633 return np.corrcoef(set_.T)
636def fft_reduce(set_):
637 fft = np.fft.fft(set_.T)
638 energy = np.abs(fft)[:, 0 : int(len(set_) / 2)] / len(set_)
639 phase = np.angle(fft)[:, 0 : int(len(set_) / 2)]
640 carac = np.concatenate([energy, phase], axis=1)
641 return carac
644def Regular_Moving_window_mapping(array, deta, window_size, mode="left", **kwargs):
645 if window_size is None:
646 window_size = deta
647 for i in range(int(np.floor(len(array) / deta))):
648 if mode == "left":
649 yield array[i * deta : i * deta + window_size]
650 elif mode == "center":
651 size_bot = int(np.ceil(window_size / 2))
652 size_top = int(np.floor(window_size / 2))
653 yield array[i * deta - size_bot : i * deta + size_top]
655 elif mode == "right":
656 yield array[i * deta - (window_size - 1) : i * deta + 1]
658 else:
659 raise ("error mode")
662# Splitter
665def identity_split(X_fit, y_fit, X_calib, y_calib):
666 """Identity splitter that wraps an already existing data assignment"""
668 def iterable_split(X, y):
669 """X and y are placeholders."""
670 iterable = [(X_fit, y_fit, X_calib, y_calib)]
671 return iterable
673 return iterable_split
676def random_split(ratio):
677 """Random splitter that assign samples given a ratio"""
679 def iterable_split(X, y):
680 rng = np.random.RandomState(0)
681 fit_sample = rng.rand(len(X)) > ratio
682 cal_sample = np.invert(fit_sample)
683 return [(X[fit_sample], y[fit_sample], X[cal_sample], y[cal_sample])]
685 return iterable_split
688def kfold_random_split(K, random_state=None):
689 """Splitter that randomly assign data into K folds"""
690 kfold = sklearn.model_selection.KFold(K, shuffle=True, random_state=random_state)
692 def iterable_split(X, y):
693 iterable = []
694 for fit, calib in kfold.split(X):
695 iterable.append((X[fit], y[fit], X[calib], y[calib]))
696 return iterable
698 return iterable_split
701# Encapsulated data from stored dict file:
704class splitter:
705 """Generic data-set provider (Iterable)"""
707 def __init__(self, X_split):
708 self.X_split = X_split
710 def split(self, X):
711 def cv_split(X_split, i):
712 train = np.arange(len(X))[X_split < i]
713 test = np.arange(len(X))[X_split == i]
714 return (train, test)
716 return [
717 cv_split(self.X_split, i) for i in range(1, 1 + int(self.X_split.max()))
718 ]
721# Encapsulated data from array:
724def dataset_generator_from_array(
725 X,
726 y,
727 context=None,
728 objective=None,
729 sk_split=TimeSeriesSplit(5),
730 repetition=1,
731 remove_from_train=None,
732 attack_name="",
733 cv_list_name=None,
734):
735 """Produce data_generator (iterable [X, y, X_split, context, objective, name]) from arrays
737 Args:
738 X (array): Inputs.
739 y (array or None): Targets.
740 context (array or None): Additional information.
741 objective (array or None): Ground truth (Unsupervised task).
742 sk_split (split strategy): Sklearn split strategy."""
744 def select_or_none(array, sample):
745 if array is None:
746 return None
747 elif isinstance(array, str):
748 return array
749 else:
750 return array[sample]
752 if remove_from_train is None:
753 remove_from_train = np.zeros(len(X))
755 dataset_generator = []
756 for n_repet in np.arange(repetition):
757 cpt = 0
758 if n_repet == 0:
759 str_repet = ""
760 else:
761 str_repet = "_bis" + str(n_repet)
763 for train_index, test_index in sk_split.split(X):
764 X_split = np.zeros(len(X))
765 X_split[train_index] = 1
766 X_split[(remove_from_train == 1) & (X_split == 1)] = -1
768 sample_cv = sorted(np.concatenate([train_index, test_index]))
769 cv_name = "cv_" + str(cpt) + attack_name + str_repet
770 if cv_list_name:
771 cv_name = cv_list_name[cpt] + attack_name + str_repet
772 dataset_generator.append(
773 [
774 select_or_none(e, sample_cv)
775 for e in [X, y, X_split, context, objective, cv_name]
776 ]
777 )
778 cpt += 1
779 return dataset_generator
782####################################################################
783# Raw data structure regularisation & statistics synthesize.
786def raw_analysis(raw_series, time_structure):
787 source = raw_series.columns[0]
788 x = raw_series.index
789 y = raw_series[source].values
790 frequence = time_structure.get("frequence", 1)
791 dtype = time_structure.get("dtype", "datetime64[s]")
792 n_nan = np.isnan(y).sum()
793 range_ = (
794 (x.values[-1] - x.values[0])
795 .astype(dtype.replace("datetime", "timedelta"))
796 .astype(float)
797 )
799 if len(y) > 0:
800 n_obs = len(y)
801 ratio_comp = np.round(n_obs / (frequence * range_), 4)
803 else:
804 n_val, v_entropy, n_obs, n_nan, ratio_comp = 0, 0, 0, 0, 0
806 n_val = len(set(y))
807 v_entropy = entropy(y, n_val)
808 time_structure.set("n_obs", n_obs)
809 time_structure.set("ratio_comp", ratio_comp)
810 time_structure.set("n_val", n_val)
811 time_structure.set("v_entropy", v_entropy)
812 print(n_val, v_entropy, n_obs, n_nan, ratio_comp)
815####################################################################
816# Auxilliaire Preprocessor function
819def process_raw_source(self, data, query, structure):
820 time_structure = structure
822 raw_series, dict_errors = data
824 type_sensor_error = time_structure.get("type_sensor_error")
825 time_structure.set("list_error", dict_errors)
826 time_structure.set("type_sensor_error", type_sensor_error)
827 time_structure.set("dict_errors", dict_errors)
828 raw_analysis(raw_series, time_structure)
829 return (raw_series, time_structure)
832def process_irregular_data(self, data, query, structure):
833 """Apply interpolation & statistics extraction on data using query parameters
834 with metadata stored in structure ['start_date','end_date','delta']
835 of structure are used to specificy the start, the end and the statistics_step_synthesis.
836 ['window_size','begin_by_interpolation] of query are used to specify
837 the final step (delta*window_size) and if there is a pre-interpolation step.
840 Args:
841 data (_type_): _description_
842 query (_type_): _description_
843 structure (_type_): _description_
845 Returns:
846 _type_: _description_
847 """
849 time_structure = structure
850 begin_by_interpolation = False
851 begin_by_interpolation = time_structure.get("begin_by_interpolation", False)
852 window_size = time_structure.get("window_size", 1)
854 if len(data) == 2:
855 raw_series, raw_time_structure = data
856 dict_errors = raw_time_structure.get("dict_errors")
857 else:
858 raw_series = data
859 dict_errors = {}
861 time_structure.set("dict_errors", dict_errors)
863 # Specify rolling statistics
864 delta = time_structure.get("delta", 1)
865 dtype = time_structure.get("dtype", "datetime64[s]")
867 stats = time_structure.get("stats", ["mean", "std", "extremum", "count"])
869 # Statistics processing experte kwdowledge based:
870 frequence = time_structure.get("frequence", 1)
871 precision = time_structure.get("precision", 1)
873 # Interpolation specification
874 interpolation = time_structure.get("interpolation", "previous")
875 max_time_jump = time_structure.get("max_time_jump", 10)
876 replace_val = time_structure.get("replace_val", None)
878 # Regular interpolation on rolling statistics:
879 start_date = time_structure.get("start_date", None)
880 end_date = time_structure.get("end_date", None)
882 if begin_by_interpolation:
883 raw_series = regular_stat_series = upscale_series(
884 raw_series,
885 delta * frequence,
886 start_date=start_date,
887 end_date=end_date,
888 x_time_jump=max_time_jump,
889 replace_val=replace_val,
890 mode=interpolation,
891 )
893 new_delta = pre_struc.check_delta(delta * window_size, dtype=dtype)
895 # Irregular rolling statistics
896 regular_stat_series = rolling_statistics(
897 raw_series, delta=new_delta, step=None, reduc_functions=stats, reduc_names=stats
898 )
900 columns = regular_stat_series.columns
901 flag_extremum = ["extremum" in column for column in columns]
902 flag_count = ["count" in column for column in columns]
903 flag_std = ["std" in column for column in columns]
905 # Normalisation of count statistics
906 for col_name in columns[flag_std]:
907 std_value = regular_stat_series[col_name].values
908 std_value[np.isnan(std_value)] = 0.000001
909 regular_stat_series[col_name] = np.maximum(precision, std_value)
910 # Normalisation of count statistics
911 for col_name in columns[flag_extremum]:
912 regular_stat_series[col_name] = np.log(
913 0.0001 + regular_stat_series[col_name].values
914 )
916 # Include consecutive non-observation in statistics
917 for col_name in columns[flag_count]:
918 consecutive_no_obs = (
919 -np.log10(
920 0.5
921 + np.maximum(
922 ut.cum_sum_consecutive_zero(
923 regular_stat_series[col_name].values >= 1
924 )
925 - 1,
926 0,
927 )
928 )
929 / 3
930 )
932 regular_stat_series[col_name] = (
933 (regular_stat_series[col_name].values - consecutive_no_obs)
934 / frequence
935 * new_delta.astype(float)
936 )
938 regular_stat_series = upscale_series(
939 regular_stat_series,
940 new_delta,
941 start_date=start_date,
942 end_date=end_date,
943 max_time_jump=max_time_jump,
944 replace_val=replace_val,
945 mode=interpolation,
946 )
948 return (regular_stat_series, time_structure)
951####################################################################
952# Auxilliaire Preprocessor function
955def process_label(
956 label_df, sources_selection, start_date, end_date, delta=1, dtype="datetime64[s]"
957):
958 """Process anom label dataframe with (start: datetime64[s], end: datetime64[s],source)
959 Into a ground truth matrix with a regular step scale of delta that start at start_date & end at end_date
960 """
961 source_anom = [i for i in label_df["source"]]
962 flag_selection = [i in sources_selection for i in source_anom]
964 step_begin_anoms = (
965 label_df[flag_selection]["start"].astype(dtype).astype(int).values
966 )
967 step_end_anoms = label_df[flag_selection]["end"].astype(dtype).astype(int).values
969 step_begin = pre_struc.date_to_step(start_date)
970 step_end = pre_struc.date_to_step(end_date)
972 step_scale = pre_struc.get_regular_step_scale(
973 delta, step_end - step_begin, step_begin
974 )
976 label_anom = np.zeros((len(step_scale), len(sources_selection)))
977 for i, name in enumerate(sources_selection):
978 mask = (step_scale > step_begin_anoms[i]) & (step_scale < step_end_anoms[i])
979 label_anom[mask, i] += 1
980 date_index = pre_struc.step_to_date(
981 step_scale, delta=delta, date_init=start_date, dtype=dtype
982 )
983 label_df = pd.DataFrame(data=label_anom, index=date_index)
985 return label_df