Coverage for uqmodels/utils.py: 29%
388 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import math
2import sys
3from copy import deepcopy
4from typing import Iterable
6import numpy as np
7import scipy
8import tensorflow as tf
9from joblib import Parallel, delayed
10from sklearn.decomposition import PCA
11from sklearn.manifold import TSNE
12from sklearn.model_selection import RandomizedSearchCV, TimeSeriesSplit
13from sklearn.preprocessing import StandardScaler
15EPSILON = sys.float_info.min # small value to avoid underflow
18def add_random_state(random_state, values):
19 """hold addition with possible None values
21 Args:
22 random_state (int or None): Random state
23 values (int): Values to add
25 Returns:
26 random_state: int or None
27 """
28 if random_state is None:
29 return None
30 else:
31 return random_state + values
34def generate_random_state(random_state=None):
35 """Drawn random number is random_state is None, else return random_state
37 Args:
38 random_state (int or None, optional): random_state. Defaults to None.
40 Returns:
41 random_state: random_state int
42 """
43 if random_state is None:
44 random_state = np.random.randint(0, 10000000)
45 return random_state
48def norm_signal(y):
49 return (y - y.mean()) / y.std()
52def cum_sum_consecutive_zero(array):
53 """Count consecutive 0
54 1 0 1 0 0 0 -> 0 1 0 1 2 3
56 Args:
57 array (_type_): array of consecutive 0
59 Returns:
60 _type_: _description_
61 """
62 len_ = len(array)
63 inds = np.nonzero(array)[0]
64 consecutive_count = np.zeros(len_)
66 if not inds[0] == 0:
67 consecutive_count[: inds[0]] += 1
68 inds = np.concatenate([[0], inds])
70 flag = np.ones(len_)
71 cpt = 1
72 flag[inds] = 0
73 while flag.sum() > 0:
74 inds = inds + 1
75 if inds[-1] > len_ - 1:
76 inds = inds[:-1]
77 inds = inds[flag[inds] != 0]
78 consecutive_count[inds] += cpt
79 flag[inds] = 0
80 cpt += 1
81 return consecutive_count
84def threshold(array, min_val=None, max_val=None):
85 """Threshold an array with min_val as lower bound and max_val as upper bound
86 Can hold mutlidimensional threshold on last dimension
87 Args:
88 array (_type_): array nd to threeshold
89 min_val (val, array or): _description_
90 max_val (_type_): _description_
91 """
92 if min_val is not None:
93 array = np.maximum(array, min_val)
94 if max_val is not None:
95 array = np.minimum(array, max_val)
96 return array
99def corr_matrix_array(m, a):
100 """
101 Parameters
102 ----------
103 a: numpy array
104 v: true val
106 Returns
107 -------
108 c: numpy array
109 correlation coefficients of v against matrix m
110 """
111 a = a.reshape(-1, 1)
113 mean_t = np.mean(m, axis=0)
114 std_t = np.std(m, axis=0)
116 mean_i = np.mean(a, axis=0)
117 std_i = np.std(a, axis=0)
119 mean_xy = np.mean(m * a, axis=0)
121 c = (mean_xy - mean_i * mean_t) / (std_i * std_t)
122 return c
125def coefficients_spreaded(X):
126 coefficients = np.array(
127 [
128 1,
129 2,
130 3,
131 5,
132 8,
133 10,
134 15,
135 20,
136 25,
137 33,
138 50,
139 100,
140 250,
141 500,
142 1000,
143 2500,
144 5000,
145 10000,
146 25000,
147 50000,
148 100000,
149 ]
150 )
151 mask = coefficients < X
152 return coefficients[mask]
155def Extract_dict(dictionaire, str_keys):
156 list_extract = []
157 for str_name in str_keys:
158 list_extract.append(dictionaire[str_name])
160 if len(list_extract) == 1:
161 list_extract = list_extract[0]
162 return list_extract
165def sizeof_fmt(num, suffix="B"):
166 """by Fred Cirera, https://stackoverflow.com/a/1094933/1870254, modified"""
167 for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
168 if abs(num) < 1024.0:
169 return f"{num:3.1f} {unit}{suffix}"
170 num /= 1024.0
171 return f"{num:.1f} Yi{suffix}"
174def track_memory(string):
175 print("Tracking_memory : " + string)
176 for name, size in sorted(
177 ((name, sys.getsizeof(value)) for name, value in list(globals().items())),
178 key=lambda x: -x[1],
179 )[:10]:
180 print(f"{name:>30}: {sizeof_fmt(size):>8}")
183# Track memorry leakage/
186def propagate(bool_array, n_prop=1, inv=False, sym=False):
187 # Propagate True of a boolean flag to previous and following index
188 for _ in range(n_prop):
189 if inv or sym:
190 bool_array = bool_array | np.roll(bool_array, -1)
191 elif np.invert(inv) or sym:
192 bool_array = bool_array | np.roll(bool_array, 1)
193 return bool_array
196def expand_flag(flag):
197 # Propagate True of a boolean flag to previous and folloxing index
198 return (np.roll(flag, 1) + np.roll(flag, 0) + np.roll(flag, -1)) > 0
201def get_coeff(alpha):
202 return scipy.stats.norm.ppf(alpha, 0, 1)
205def flatten(y):
206 if y is not None:
207 if (len(y.shape) == 2) & (y.shape[1] == 1):
208 y = y[:, 0]
209 return y
212# Folding function
215def get_fold_nstep(size_window, size_subseq, padding):
216 # Compute the number of step according to size of window, size of subseq and padding.
217 return int(np.floor((size_window - size_subseq) / (padding))) + 1
220# To do update : Hold padding to limit train redunduncy
221def stack_and_roll(array, horizon, lag=0, seq_idx=None, step=1):
222 """_summary_
224 Args:
225 array (_type_): array 2D to stack and roll
226 horizon (_type_): depth
227 lag (int, optional): _description_. Defaults to 0.
228 seq_idx (_type_, optional): _description_. Defaults to None.
229 step (int, optional): _description_. Defaults to 1.
231 Returns:
232 _type_: _description_
233 """
234 # Perform a stack "horizon" time an array and roll in temporal stairs.
235 # a n+1 dimensional array with value rolled k times for the kth line
236 shape = (1, horizon, 1)
237 new_array = np.tile(array[:, None], shape)
239 if seq_idx is None:
240 for i in np.arange(horizon):
241 lag_current = horizon - lag - i - 1
242 new_array[:, i] = np.roll(new_array[:, i], lag_current, axis=0)
244 # Erase procedure
245 for i in np.arange(horizon):
246 lag_current = horizon - lag - i - 1
247 if lag_current > 0:
248 new_array[i, :lag_current] = 0
250 # by blocks :
252 else:
253 _, idx = np.unique(seq_idx, return_index=True)
254 for j in seq_idx[np.sort(idx)]:
255 flag = seq_idx == j
256 list_idx = np.nonzero(flag)[0]
257 for i in np.arange(horizon):
258 lag_current = horizon - lag - i - 1
259 new_array[flag, i] = np.roll(new_array[flag, i], lag_current, axis=0)
260 # A optimize : get first non zeros values.
261 for i in np.arange(horizon):
262 lag_current = horizon - lag - i - 1
263 new_array[list_idx[i], :lag_current] = 0
264 if step > 1:
265 mask = (np.arange(len(array)) % step) == step - 1
266 new_array = new_array[mask]
267 return new_array
270def stack_and_roll_layer(inputs, size_window, size_subseq, padding, name=""):
271 # slide_tensor = []
272 n_step = get_fold_nstep(size_window, size_subseq, padding)
273 # Implementation numpy
274 # if False:
275 # for i in range(n_step):
276 # slide_tensor.append(
277 # inputs[:, (i * padding) : (i * padding) + size_subseq, :][:, None]
278 # )
279 # return Lambda(lambda x: K.concatenate(x, axis=1), name=name + "_rollstack")(
280 # slide_tensor
281 # )
282 x = tf.map_fn(
283 lambda i: inputs[:, (i * padding): (i * padding) + size_subseq, :],
284 tf.range(n_step),
285 fn_output_signature=tf.float32,
286 )
287 x = tf.transpose(x, [1, 0, 2, 3])
288 return x
291def apply_middledim_reduction(ndarray, reduc_filter=None, roll=0):
292 """Apply middledim using ponderate mean reduc_filter weigth or do nothing
294 Args:
295 ndarray (np.array): object to reduce
296 reduc_filter (np.array): ponderate weigth for reduction
298 Return:
299 reduced_ndarray: reduced object
300 """
302 ndarray = deepcopy(ndarray)
303 if reduc_filter is None:
304 return ndarray
306 reduc_filter = np.array(reduc_filter)
308 dim_g = ndarray.shape[-1]
309 list_reduced_ndarray_by_dim = []
310 # Pour chaque dimension de s
311 for i in range(dim_g):
312 # Incompatibility test :
313 if len(reduc_filter) != ndarray.shape[1]:
314 print(
315 "error expect size of reduc filter : ",
316 str(ndarray.shape[1]),
317 "current size :",
318 str(len(reduc_filter)),
319 )
320 break
322 filt = reduc_filter / reduc_filter.sum()
323 ndarray[:, :, i] = ndarray[:, :, i] * filt
324 if roll:
325 reduced_ndarray = ndarray[:, 0, i]
326 for j in range(ndarray.shape[1] - 1):
327 reduced_ndarray += np.roll(ndarray[:, j + 1, i], (j + 1) * roll, axis=0)
328 else:
329 ndarray[:, :, i] = ndarray[:, :, i] * filt
330 reduced_ndarray = ndarray[:, :, i].sum(axis=1)
332 list_reduced_ndarray_by_dim.append(reduced_ndarray[:, None])
333 reduced_ndarray = np.concatenate(list_reduced_ndarray_by_dim, axis=1)
334 return reduced_ndarray
337def apply_mask(list_or_array_or_none, mask, axis=0, mode="bool_array"):
338 """Select subpart of an array/list_of_array/tupple using a mask and np.take function.
339 If list_or_array_or_none is array, then direclty apply selection on it.
340 else if it is a Tupple or list of array apply it on array in the list/tupple structure
342 Args:
343 list_or_array_or_none (_type_): ND array (list or tupple or ND.array)
344 mask (_type_): Mask as boolean array or indices array.
345 axis (int, optional): axis on sub array to apply. Defaults to 1.
346 mode (str, optional):if 'bool_array', turn bool_array_mask into an indice_array. Defaults to 'bool_array'.
348 Returns:
349 (List or Tuple or array or Ndarray) : Sub_selected list of array or Ndarray
350 """
352 if mode == "bool_array":
353 indices = np.arange(len(mask))[mask]
355 if type(list_or_array_or_none) in [list, tuple]:
356 return [np.take(array, indices, axis=axis) for array in list_or_array_or_none]
357 if list_or_array_or_none is None:
358 return None
359 else:
360 return np.take(list_or_array_or_none, indices, axis=axis)
363def base_cos_freq(array, freq=[2]):
364 """Transform 1D modulo features [1, N] in cyclic (cos,sin) features for given freq"""
365 features = []
366 for i in freq:
367 features.append(np.cos(i * math.pi * array))
368 features.append(np.sin(i * math.pi * array))
369 return np.concatenate(features).reshape(len(freq) * 2, -1).T
372# Preprocessing function
375def apply_conv(score, filt=None, reduc_filter=None):
376 """Apply naivly by dimension a convolution to s using filt as filter
378 Args:
379 s (np.array): score to 1D convolute
380 filt (np.array): filter to apply
382 Returns:
383 s: convoluted score
384 """
385 if filt is None:
386 return score
388 if len(score.shape) == 3:
389 dim_g = score.shape[-1]
390 # Pour chaque dimension de s
391 for t in range(score.shape[1]):
392 for i in range(score.shape[2]):
393 score[:, t, i] = np.convolve(score[:, t, i], filt, mode="same")
394 else:
395 dim_g = score.shape[-1]
396 # Pour chaque dimension de s
397 for i in range(dim_g):
398 score[:, i] = np.convolve(score[:, i], filt, mode="same")
399 return score
402def cut(values, cut_min, cut_max):
403 """Apply percentile minimal and maximal threeshold
405 Args:
406 values (_type_): values to cut
407 cut_min (_type_): percentile of the minimam cut threeshold
408 cut_max (_type_): percentile of the maximal cut threeshold
410 Returns:
411 _type_: _description_
412 """
414 if values is None:
415 return None
417 vpmin = np.quantile(values, cut_min, axis=0)
418 vpmax = np.quantile(values, cut_max, axis=0)
419 values = np.minimum(values, vpmax)
420 values = np.maximum(values, vpmin)
421 return values
424def format_data(self, X, y, fit=False, mode=None, flag_inverse=False):
425 """Feature and target Formatting"""
426 if self.rescale:
427 flag_reshape = False
428 if y is not None:
429 if len(y.shape) == 1:
430 flag_reshape = True
431 y = y[:, None]
433 if fit:
434 scalerX = StandardScaler(with_mean=True, with_std=True)
435 X = scalerX.fit_transform(X)
436 scalerY = StandardScaler(with_mean=True, with_std=True)
437 y = scalerY.fit_transform(y)
438 self.scaler = [scalerX, scalerY]
440 elif not flag_inverse:
441 if X is not None:
442 X = self.scaler[0].transform(X)
443 if y is not None:
444 y = self.scaler[1].transform(y)
445 else:
446 X_transformer = self.scaler[0].inverse_transform
447 Y_transformer = self.scaler[1].inverse_transform
448 if X is not None and not len(X) == 0:
449 X = X_transformer(X)
451 if y is not None and not len(y) == 0:
452 sigma = np.sqrt(self.scaler[1].var_)
453 if (mode == "sigma") | (mode == "var") | (mode == "var_A&E"):
454 y = y * sigma
456 elif mode == "2sigma":
457 y_reshape = np.moveaxis(y, -1, 0)
458 if len(y.shape) == 3:
459 y = np.concatenate(
460 [
461 np.expand_dims(i, 0)
462 for i in [y_reshape[0] * sigma, y_reshape[1] * sigma]
463 ],
464 axis=0,
465 )
466 else:
467 y = np.concatenate(
468 [
469 y_reshape[0] * sigma,
470 y_reshape[1] * sigma,
471 ],
472 axis=-1,
473 )
474 elif mode == "quantile":
475 y_bot = Y_transformer(y[:, 0])
476 y_top = Y_transformer(y[:, 1])
477 y = np.concatenate([y_bot, y_top], axis=1)
479 else:
480 y = Y_transformer(y)
482 if flag_reshape:
483 y = y[:, 0]
484 return (X, y)
487# PIs basics computation functionalities
490def compute_born(y_pred, sigma, alpha, mode="sigma"):
491 """Compute y_upper and y_lower boundary from gaussian hypothesis (sigma or 2sigma)
493 Args:
494 y_pred (array) : Mean prediction
495 sigma (array) : Variance estimation
496 alpha (float) : Misscoverage ratio
497 mode (str) : Distribution hypothesis
498 (sigma : gaussian residual hypothesis, 2sigma : gaussian positive and negative residual hypothesis)
500 Returns:
501 (y_lower,y_upper): Lower and upper bondary of Predictive interval
502 """
503 # Case sigma
504 if mode == "sigma":
505 y_lower = y_pred + scipy.stats.norm.ppf((alpha / 2), 0, sigma)
506 y_upper = y_pred + scipy.stats.norm.ppf((1 - (alpha / 2)), 0, sigma)
507 # Case 2 sigma
508 elif mode == "2sigma":
509 sigma = np.moveaxis(sigma, -1, 0)
510 y_lower = y_pred + scipy.stats.norm.ppf((alpha / 2), 0, sigma[0])
511 y_upper = y_pred + scipy.stats.norm.ppf((1 - (alpha / 2)), 0, sigma[1])
512 else:
513 raise NotImplementedError("This mode is not yet implemented.")
514 return y_lower, y_upper
517# Gaussian mixture quantile estimation :
518def mixture_quantile(pred, var_A, quantiles, n_jobs=5):
519 def aux_mixture_quantile(pred, var_A, quantiles):
520 list_q = []
521 n_data = pred.shape[1]
522 n_mixture = pred.shape[0]
523 for n in range(n_data):
524 mean_law = scipy.stats.norm(
525 pred[:, n, 0].mean(), np.sqrt(var_A[:, n, 0].mean())
526 )
527 xmin = mean_law.ppf(0.0000001)
528 xmax = mean_law.ppf(0.9999999)
529 scale = np.arange(xmin, xmax, (xmax - xmin) / 300)
530 Mixture_cdf = np.zeros(len(scale))
531 for i in range(n_mixture):
532 cur_law = scipy.stats.norm(pred[i, n, 0], np.sqrt(var_A[i, n, 0]))
533 Mixture_cdf += cur_law.cdf(scale) / n_mixture
534 q_val = []
535 for q in quantiles:
536 q_val.append(scale[np.abs(Mixture_cdf - q).argmin()])
537 list_q.append(q_val)
538 return np.array(list_q)
540 list_q = []
541 n_data = pred.shape[1]
542 parallel_partition = np.array_split(np.arange(n_data), 3)
543 # Split inputs of auxillar parralel tree statistics extraction
544 parallel_input = []
545 for partition in parallel_partition:
546 parallel_input.append((pred[:, partition], var_A[:, partition], quantiles))
547 list_q = Parallel(n_jobs=n_jobs)(
548 delayed(aux_mixture_quantile)(*inputs) for inputs in parallel_input
549 )
550 return np.concatenate(list_q, axis=0)
553# Scikit tuning function
556def aux_tuning(
557 model,
558 X,
559 Y,
560 params=None,
561 score="neg_mean_squared_error",
562 n_esti=100,
563 folds=4,
564 random_state=None,
565):
566 """Random_search with sequential k-split
568 Args:
569 model (scikit model): Estimator
570 X ([type]): Features
571 Y ([type]): Target
572 params ([type], optional): parameter_grid. Defaults to None.
573 score (str, optional): score. Defaults to 'neg_mean_squared_error'.
574 n_esti (int, optional): Number of grid try . Defaults to 100.
575 folds (int, optional): Number of sequential fold. Defaults to 4.
576 verbose (int, optional): [description]. Defaults to 0.
577 random_state (bool) : handle experimental random using seeds.
578 """
579 if isinstance(params, type(None)):
580 return model
581 tscv = TimeSeriesSplit(n_splits=folds)
582 random_search = RandomizedSearchCV(
583 model,
584 param_distributions=params,
585 n_iter=n_esti,
586 scoring=score,
587 n_jobs=8,
588 cv=tscv.split(X),
589 verbose=1,
590 random_state=random_state,
591 )
592 Y = np.squeeze(Y)
593 random_search.fit(X, Y)
594 return random_search.best_estimator_
597def agg_list(list_: Iterable):
598 try:
599 return np.concatenate(list_, axis=0)
600 except ValueError:
601 return None
604def agg_func(list_: Iterable):
605 try:
606 return np.mean(list_, axis=0)
607 except TypeError:
608 return None
611class GenericCalibrator:
612 def __init__(self, type_res="res", mode="symetric", name=None, alpha=0.1):
613 """Generic calibrator implementing several calibratio
615 Args:
616 type_res (str, optional): type of score
617 "no_calib : No calibration
618 "res" : Calibration based on mean residuals
619 "w_res" : Calibration based on weigthed mean residuals
620 "cqr" : Calibration based on quantile residuals
622 mode (str, optional):
623 if "symetric" : symetric calibration ()
624 else perform calibration independently on positive and negative residuals.
625 name (_type_, optional): _description_. Defaults to None.
626 alpha (float, optional): _description_. Defaults to 0.1.
627 """
628 super().__init__()
629 self._residuals = None
630 self.name = name
631 if name is None:
632 self.name = type_res + "_" + mode
633 self.mode = mode
634 self.alpha = alpha
635 self.type_res = type_res
637 if mode == "symetric":
638 self.fcorr = 1
639 else:
640 self.fcorr_lower = 1
641 self.fcorr_upper = 1
643 def estimate(
644 self, y_true, y_pred, y_pred_lower, y_pred_upper, sigma_pred, **kwargs
645 ):
646 flag_res_lower = np.zeros(y_true.shape)
647 flag_res_upper = np.zeros(y_true.shape)
649 if (self.type_res == "res") | (self.type_res == "w_res"):
650 if self.type_res == "res":
651 residuals = y_true - y_pred
652 sigma_pred = y_true * 0 + 1
654 if self.type_res == "w_res":
655 # sigma_pred = np.maximum(y_pred_upper - y_pred_lower, EPSILON) / 2
656 residuals = y_true - y_pred
658 if len(residuals.shape) == 1:
659 flag_res_lower = residuals <= 0
660 flag_res_upper = residuals >= 0
662 else:
663 flag_res_lower = np.concatenate(
664 [
665 np.expand_dims(residuals[:, i] <= 0, -1)
666 for i in range(0, y_true.shape[1])
667 ]
668 )
669 flag_res_upper = np.concatenate(
670 [
671 np.expand_dims(residuals[:, i] >= 0, -1)
672 for i in range(0, y_true.shape[1])
673 ]
674 )
676 if y_pred.shape != sigma_pred.shape:
677 sigma_pred = np.moveaxis(sigma_pred, -1, 0)
678 residuals[flag_res_lower] = (
679 np.abs(residuals)[flag_res_lower] / (sigma_pred[0])[flag_res_lower]
680 )
681 residuals[flag_res_upper] = (
682 np.abs(residuals)[flag_res_upper] / (sigma_pred[1])[flag_res_upper]
683 )
684 else:
685 residuals = np.abs(residuals) / (sigma_pred)
687 elif self.type_res == "cqr":
688 residuals = np.maximum(y_pred_lower - y_true, y_true - y_pred_upper)
689 flag_res_lower = (y_pred_lower - y_true) >= (y_true - y_pred_upper)
690 flag_res_upper = (y_pred_lower - y_true) <= (y_true - y_pred_upper)
692 elif self.type_res == "no_calib":
693 return
695 else:
696 print("Unknown type_res")
697 return
699 if self.mode == "symetric":
700 self.fcorr = np.quantile(
701 residuals, (1 - self.alpha) * (1 + 1 / len(residuals))
702 )
704 else:
705 self.fcorr_lower = np.quantile(
706 residuals[flag_res_lower],
707 np.minimum((1 - self.alpha) * (1 + 1 / flag_res_lower.sum()), 1),
708 )
710 self.fcorr_upper = np.quantile(
711 residuals[flag_res_upper],
712 np.minimum((1 - self.alpha) * (1 + 1 / flag_res_upper.sum()), 1),
713 )
714 return
716 def calibrate(self, y_pred, y_pred_lower, y_pred_upper, sigma_pred, **kwargs):
717 if self.type_res == "res":
718 sigma_pred = y_pred * 0 + 1
720 if self.mode == "symetric":
721 fcorr_lower = self.fcorr
722 fcorr_upper = self.fcorr
723 else:
724 fcorr_lower = self.fcorr_lower
725 fcorr_upper = self.fcorr_upper
727 if self.type_res in ["res", "w_res", "no_calib"]:
728 if y_pred.shape != sigma_pred.shape:
729 sigma_pred = np.moveaxis(sigma_pred, -1, 0)
730 y_pred_lower = y_pred - sigma_pred[0] * fcorr_lower
731 y_pred_upper = y_pred + sigma_pred[1] * fcorr_upper
732 else:
733 y_pred_lower = y_pred - sigma_pred * fcorr_lower
734 y_pred_upper = y_pred + sigma_pred * fcorr_upper
736 elif self.type_res in ["cqr"]:
737 y_pred_upper = y_pred_upper + fcorr_upper
738 y_pred_lower = y_pred_lower - fcorr_lower
739 else:
740 print("Unknown type_res")
741 return
743 return y_pred_lower, y_pred_upper
746def dimensional_reduction(data, mode="umap", reducer=None, fit=True, **kwargs):
747 if "n_components" not in kwargs.keys():
748 kwargs["n_components"] = 3
750 if reducer is not None:
751 pass
753 else:
754 if mode == "umap":
755 if "n_neighbors" not in kwargs:
756 kwargs["n_neighbors"] = 50
757 if "min_dist" not in kwargs:
758 kwargs["min_dist"] = 0.01
759 # reducer = umap.UMAP(**kwargs)
761 elif mode == "tsne":
762 reducer = TSNE(**kwargs)
764 elif mode == "pca":
765 reducer = PCA(**kwargs)
767 embedding = reducer.fit_transform(data)
768 return (embedding, reducer)
771# Moving average function for Lag feature extraction
774def fit_compute_lag(Y, lag=[1, 2, 3], delay=0):
775 """Create lag features from a numerical array
776 Args:
777 Y (float array): Target to extract lag-feature
778 lag (int, optional): Lag number. Defaults to 3.
779 delay (int, optional): Delay before 1 lag feature. Defaults to 0.
780 """
781 dim = Y.shape
782 new_features_list = []
783 new_features_name = []
784 for i in np.array(lag) + delay:
785 Y_cur = np.roll(Y, i, axis=0)
786 if i > 0:
787 Y_cur[0:i] = 0
788 for g in range(dim[1]):
789 new_features_list.append(Y_cur[:, g])
790 new_features_name.append("lag_" + str(i) + "_dim:" + str(g))
791 new_features_name = np.array(new_features_name)
792 return (np.array(new_features_list).T, new_features_name)
795def autocorr(x):
796 "Compute autocorrelation of x"
797 result = np.correlate(x, x, mode="full")
798 return result[int(result.size / 2):]
801def convolute_1D(array, filter=None):
802 """Convolution by dimension for 1 od 2D array using np.convolve
804 Args:
805 array (_type_): array_to_convolve
806 filter (_type_, optional): convolution fitler. Defaults to None.
808 Raises:
809 ValueError: dimension error
811 Returns:
812 array: array_convoluted
813 """
814 if filter is None:
815 return array
816 else:
817 array_convoluted = []
818 if len(array.shape) == 1:
819 array_convoluted = np.convolve(array, filter, "same")
820 elif len(array.shape) == 2:
821 for i in range(array.shape[1]):
822 array_convoluted.append(
823 np.convolve(array[:, i], filter, "same")[:, None]
824 )
825 array_convoluted = np.concatenate(array_convoluted, axis=1)
826 else:
827 print("Inapropriate size of array : hold 1 or 2d array")
828 raise ValueError
829 return array_convoluted