Coverage for uqmodels/preprocessing/preprocessing.py: 10%

449 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1""" 

2Data preprocessing module. 

3""" 

4 

5import copy 

6 

7import numpy as np 

8import pandas as pd 

9import sklearn 

10from scipy.interpolate import interp1d 

11from scipy.sparse.csgraph import connected_components 

12from sklearn.model_selection import TimeSeriesSplit 

13 

14import uqmodels.preprocessing.structure as pre_struc 

15import uqmodels.utils as ut 

16 

17# Target selection based on corrcoef 

18 

19 

20def compute_corr_and_filter(data): 

21 mat_corr = np.corrcoef(data.T) 

22 signal_to_drop = np.isnan(mat_corr[0]) 

23 mat_corr[np.isnan(mat_corr)] = 0 

24 return (mat_corr, signal_to_drop) 

25 

26 

27def get_k_composants(mat_corr, n_cible): 

28 d = mat_corr.shape[0] 

29 corr_flow = [] 

30 for i in range(100): 

31 corr_flow.append( 

32 connected_components( 

33 np.maximum(0, np.abs(mat_corr) - i / 100) != 0, connection="strong" 

34 )[1] 

35 ) 

36 corr_flow = np.concatenate(corr_flow).reshape(100, d) 

37 step = np.argmax(np.max(corr_flow, axis=1) == n_cible) 

38 list_components = [] 

39 for component in range(n_cible): 

40 mask = corr_flow[step] == component 

41 list_components.append(np.arange(d)[mask]) 

42 return list_components 

43 

44 

45def select_best_representant(mat_corr, list_components): 

46 best_signals = [] 

47 for i, ind_components in enumerate(list_components): 

48 ind_best = np.argmax(mat_corr[ind_components][:, ind_components].sum(axis=1)) 

49 best_signals.append(ind_components[ind_best]) 

50 return best_signals 

51 

52 

53def select_signal(data, n_cible=None): 

54 if n_cible is None: 

55 n_cible = data.shape[1] 

56 

57 mat_corr, signal_to_drop = compute_corr_and_filter(data) 

58 raw_ind = np.arange(data.shape[1]) 

59 restricted_mat_corr = mat_corr[~signal_to_drop][:, ~signal_to_drop] 

60 list_restricted_components = get_k_composants(restricted_mat_corr, n_cible) 

61 best_restricted_signals = select_best_representant( 

62 restricted_mat_corr, list_restricted_components 

63 ) 

64 best_signals = raw_ind[~signal_to_drop][best_restricted_signals] 

65 

66 print(np.max(mat_corr[best_signals], axis=0).sum() / data.shape[1]) 

67 return (best_signals, signal_to_drop) 

68 

69 

70# 

71 

72 

73def check_is_pd_date(date): 

74 if isinstance(date, str): 

75 try: 

76 date = pd.to_datetime(date) 

77 except BaseException: 

78 pass 

79 return date 

80 

81 

82# Base replacement function 

83 

84 

85def locate_near_index_values(index_scale, index_val): 

86 if (index_val > index_scale[-1]) or (index_val < index_scale[0]): 

87 return None 

88 else: 

89 ind = np.abs(index_scale - index_val).argmin() 

90 if (index_scale[ind] - index_val) > 0: 

91 return ind - 1 

92 else: 

93 return ind 

94 

95 

96def get_event_into_series( 

97 list_events, index_scale, n_type_event, dtype="datetime64[s]" 

98): 

99 """Locate flag erros of sensors in regular time_refenrencial""" 

100 nan_series = np.zeros((len(index_scale), n_type_event + 1)) 

101 index_scale = index_scale.astype(dtype).astype(float) 

102 for n, dict_events in enumerate(list_events): 

103 for index_event in list(dict_events.keys()): 

104 type_anom = dict_events[index_event] 

105 try: 

106 start_event, end_event = index_event 

107 start_index = ( 

108 pd.to_datetime(start_event) 

109 .to_datetime64() 

110 .astype(dtype) 

111 .astype(float) 

112 ) 

113 

114 end_index = ( 

115 pd.to_datetime(end_event) 

116 .to_datetime64() 

117 .astype(dtype) 

118 .astype(float) 

119 ) 

120 

121 if (start_index is None) & (end_index is None): 

122 pass 

123 else: 

124 if start_index is None: 

125 start_index = 0 

126 

127 if end_index is None: 

128 end_index = -1 

129 

130 nan_series[start_index:end_index, type_anom] += 1 

131 

132 except BaseException: 

133 index_event = ( 

134 pd.to_datetime(index_event) 

135 .to_datetime64() 

136 .astype(dtype) 

137 .astype(float) 

138 ) 

139 ind = locate_near_index_values(index_scale, index_event) 

140 nan_series[ind, type_anom] += 1 

141 

142 return nan_series 

143 

144 

145def extract_sensors_errors(series, type_sensor_error=[]): 

146 """Extract list of non floating values 

147 

148 Args: 

149 series (_type_): series of sensor_values 

150 type_sensor_error (list, optional): list of others errors. 

151 """ 

152 

153 for i in list(set(series)): 

154 try: 

155 float(i) 

156 except ValueError as e: 

157 e = str(e).split("'")[1] 

158 if e not in type_sensor_error: 

159 type_sensor_error.append(e) 

160 return type_sensor_error 

161 

162 

163def handle_nan(y): 

164 "Replace nan values by last values" 

165 nan_flag = np.isnan(y) 

166 last_non_nan = np.copy(y) 

167 for j in np.arange(len(y))[nan_flag]: 

168 if j < 1: 

169 last_non_nan[j] = np.nanmean(y, axis=0) 

170 else: 

171 last_non_nan[j] = y[j - 1] 

172 v = last_non_nan[j] 

173 y[j] = v 

174 return y 

175 

176 

177def interpolate( 

178 x, 

179 y, 

180 xnew=None, 

181 time_structure=None, 

182 type_interpolation="linear", 

183 fill_values=None, 

184 moving_average=False, 

185): 

186 """Drop nan values & perform 'interpolation' interpolation from [x,y] to [xnew,ynew] 

187 if xnew is none, compute xnew from time_structure 

188 

189 if moving_average=True perform "interpolate moving average" using int(len(xnew)/len(x))= M 

190 in order to perform mean of M interpolated point evenly distributed for each step. 

191 

192 Args: 

193 x (array): X_axis 

194 y (array): Y_axis (values) 

195 xnew (array): new X_axis 

196 moving_average (bool, optional): Perform moving average 'interpolation'. 

197 

198 Returns: 

199 ynew: new interpolated Y_axis 

200 """ 

201 if x is None: 

202 x = np.arange(len(y)) 

203 

204 if fill_values is None: 

205 fill_values = np.nanmean(y, axis=0) 

206 

207 only_fill_nan = False 

208 if (time_structure is None) and (xnew is None): 

209 only_fill_nan = True 

210 xnew = x 

211 

212 precision = 0 

213 if xnew is None: 

214 # Interpolation de la série avec prise en compte de l'échantillonage théorique selon la time structure 

215 if time_structure is None: 

216 print("time_structure is None") 

217 raise ValueError 

218 

219 delta = time_structure.get("delta", default_value=1) 

220 # If precision and frequence are provided, else default_value 

221 precision = time_structure.get("precision", default_value=0) 

222 frequence = time_structure.get("frequence", default_value=1) 

223 

224 new_time_step = delta * frequence 

225 

226 xnew = pre_struc.get_regular_step_scale( 

227 delta=new_time_step, range_=(x[-1] - x[0]), time_offset=x[0] 

228 ) 

229 

230 # Ignore Nan values 

231 nan_flag = np.isnan(y) 

232 

233 # ration of moding average 

234 ynew = np.zeros(len(xnew)) 

235 f = interp1d( 

236 x[~nan_flag], y[~nan_flag], type_interpolation, fill_value="extrapolate" 

237 ) 

238 

239 if only_fill_nan: 

240 ynew = y 

241 ynew[nan_flag] = f(xnew[nan_flag]) 

242 

243 else: 

244 ynew = np.zeros(len(xnew)) 

245 

246 # ratio of moving average 

247 ratio = 1 

248 if moving_average: 

249 ratio = int(len(x) / len(xnew)) 

250 if ratio < 1: 

251 ratio = 1 

252 

253 for i in range(ratio): 

254 ynew += f(xnew - (i * xnew[0]) / ratio) / ratio 

255 

256 if precision > 0: 

257 ynew = np.round(ynew, -int(np.floor(np.log10(precision)))) 

258 

259 return xnew, ynew 

260 

261 

262def add_row(df, date_pivot, mode="first"): 

263 """Add first or last np.Nan row to df with date_pivot as index values. 

264 

265 Args: 

266 df (_type_): dataframe 

267 date_pivot (_type_): index 

268 mode (str, optional): 'first' or 'last'. Defaults to 'first'. 

269 

270 Returns: 

271 df: dataframe augmented with one row 

272 """ 

273 new_row = copy.deepcopy(df.iloc[0:1]) 

274 for columns in new_row.columns: 

275 new_row.loc[new_row.index[0], columns] = np.NaN 

276 new_row = new_row.rename({new_row.index[0]: date_pivot}) 

277 

278 if mode == "first": 

279 df = pd.concat([new_row, df]) 

280 

281 if mode == "last": 

282 df = pd.concat([df, new_row]) 

283 

284 return df 

285 

286 

287def remove_rows(df, date_pivot, mode="first"): 

288 """Remove rows smaller/greated than date_pivot. then add apply add_row 

289 

290 Args: 

291 df (_type_): dataframe 

292 date_pivot (_type_): index_pivot 

293 mode (str, optional): 'first' or 'last'. Defaults to 'first'. 

294 

295 Returns: 

296 df: dataframe which removed values and a new bondary row 

297 """ 

298 if mode == "first": 

299 if df.index[0] > date_pivot: 

300 pass 

301 else: 

302 df = df[df.index > date_pivot] 

303 if mode == "last": 

304 if df.index[-1] < date_pivot: 

305 pass 

306 else: 

307 df = df[df.index < date_pivot] 

308 df = add_row(df, date_pivot, mode=mode) 

309 return df 

310 

311 

312def df_selection(df, start_date=None, end_date=None): 

313 """Format dataframe to obtain a new version that start at start_date and finish and end_date 

314 

315 Args: 

316 df (_type_): dataframe 

317 start_date (_type_, optional): strat_date or None. Defaults to None: do nothnig 

318 end_date (_type_, optional): end_date or None. Defaults to None: do nothnig 

319 

320 Returns: 

321 dataframe: Time formated dataframe 

322 """ 

323 

324 if start_date is not None: 

325 start_date = check_is_pd_date(start_date) 

326 df = remove_rows(df, start_date, mode="first") 

327 if end_date is not None: 

328 end_date = check_is_pd_date(end_date) 

329 df = remove_rows(df, end_date, mode="last") 

330 return df 

331 

332 

333def upscale_series( 

334 dataframe, 

335 delta, 

336 offset=None, 

337 start_date=None, 

338 end_date=None, 

339 mode="time", 

340 max_time_jump=10, 

341 replace_val=None, 

342 **kwargs 

343): 

344 """Upsample series using pandas interpolation function 

345 

346 Args: 

347 dataframe (_type_): data to resample 

348 delta (_type_): Timedelta 

349 offset (str, optional): _description_. Defaults to '-1ms'. 

350 origin (str, optional): _description_. Defaults to 'start_day'. 

351 mode (str, optional): _description_. Defaults to 'time'. 

352 max_time_jump (int, optional): _description_. Defaults to 10. 

353 replace_val (_type_, optional): _description_. Defaults to None. 

354 

355 Returns: 

356 _type_: _description_ 

357 """ 

358 # Generate upsampling with NaN values 

359 

360 start_date = pre_struc.check_date(start_date) 

361 end_date = pre_struc.check_date(end_date) 

362 delta = pre_struc.check_delta(delta) 

363 

364 origin_date = start_date 

365 if delta is not None: 

366 origin_date = origin_date - delta 

367 

368 dataframe = df_selection(dataframe, start_date=origin_date, end_date=end_date) 

369 

370 # Fill nan using interpolation 

371 if mode == "previous": 

372 upscale_dataframe = dataframe.resample( 

373 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset 

374 ).ffill(limit=max_time_jump) 

375 else: 

376 upscale_dataframe = ( 

377 dataframe.resample( 

378 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset 

379 ) 

380 .mean() 

381 .interpolate(mode, limit=max_time_jump) 

382 ) 

383 

384 if max_time_jump is not None: 

385 mask = np.isnan(upscale_dataframe.values) 

386 # Replace using by min values 

387 if replace_val is None: 

388 replace_val = dataframe.min(axis=0).values - 0.1 * np.abs( 

389 dataframe.min(axis=0).values 

390 ) 

391 

392 # Replace using default values 

393 if isinstance(replace_val, np.ndarray): 

394 for i, val in enumerate(replace_val): 

395 upscale_dataframe.iloc[mask[:, i], i] = val 

396 

397 else: 

398 upscale_dataframe[mask] = replace_val 

399 

400 return upscale_dataframe[1:] 

401 

402 

403def downscale_series( 

404 dataframe, 

405 delta, 

406 offset="-1ms", 

407 start_date=None, 

408 end_date=None, 

409 mode="mean", 

410 dtype="datetime64[s]", 

411 **kwargs 

412): 

413 end_date = pre_struc.check_date(end_date, dtype) 

414 start_date = pre_struc.check_date(start_date, dtype) 

415 delta = pre_struc.check_delta(delta, dtype) 

416 

417 dataframe = df_selection( 

418 dataframe, start_date=pd.to_datetime(start_date - delta), end_date=end_date 

419 ) 

420 

421 downscale = dataframe.resample( 

422 pd.Timedelta(delta), origin=pd.to_datetime(start_date), offset=offset 

423 ) 

424 

425 if hasattr(downscale, mode): 

426 func_ = getattr(downscale, mode) 

427 downscale_dataframe = func_().interpolate("time") 

428 else: 

429 raise (ValueError, "mode:", mode, "not handle") 

430 downscale_dataframe.index -= pd.Timedelta(offset) 

431 return downscale_dataframe 

432 

433 

434def rolling_statistics( 

435 data, delta, step=None, reduc_functions=["mean"], reduc_names=["mean"], **kwargs 

436): 

437 """Compute rollling_statistics from dataframe 

438 

439 Args: 

440 data (pd.DataFrame): dataframe (times,sources) 

441 delta (int or timedelta64): size of rolling window 

442 step (int): Evaluate the window at every ``step`` result 

443 reduc_functions (_type_): str of pandas window function (fast) or custom set->stat function (slow) 

444 reduc_names (_type_): name stored in stat_dataframe 

445 time_mask (_type_, optional): time_mask. Defaults to None. 

446 **kwargs: others paramaters provide to DataFrame.rolling 

447 

448 Returns: 

449 _type_: _description_ 

450 """ 

451 

452 new_data = [] 

453 colums_transformation = [] 

454 reduc_functions = reduc_functions.copy() 

455 reduc_names = reduc_names.copy() 

456 

457 columns = data.columns 

458 if not isinstance(delta, int): 

459 delta = pd.Timedelta(pre_struc.check_delta(delta, "datetime64[ns]")) 

460 data.index = data.index.astype("datetime64[ns]") 

461 

462 flag_extremum = False 

463 if "extremum" in reduc_functions: 

464 flag_extremum = True 

465 ind_extremum = reduc_functions.index("extremum") 

466 reduc_functions[ind_extremum] = "max" 

467 reduc_functions.append("min") 

468 reduc_names.append("min") 

469 

470 for n, reduc_func in enumerate(reduc_functions): 

471 colums_transformation.append(reduc_names[n]) 

472 if isinstance(reduc_func, str): 

473 if reduc_func == "cov_to_median": 

474 # Hold unimplementation of step 

475 df_mean = data.rolling(delta, step=step, min_periods=1, **kwargs).mean() 

476 tmp_df = df_mean.rolling(20, min_periods=1).cov(df_mean.median(axis=1)) 

477 tmp_df = tmp_df.fillna(0) 

478 elif reduc_func == "corr_to_median": 

479 # Hold unimplementation of step 

480 df_mean = data.rolling(delta, step=step, min_periods=1, **kwargs).mean() 

481 tmp_df = df_mean.rolling(20).corr(df_mean.median(axis=1)) 

482 tmp_df = tmp_df.fillna(0) 

483 else: 

484 rolling = data.rolling(delta, step=step, min_periods=1, **kwargs) 

485 func_ = getattr(rolling, reduc_func) 

486 tmp_df = func_() 

487 

488 else: 

489 rolling = data.rolling(delta, step=step, min_periods=1, **kwargs) 

490 tmp_df = rolling.apply(reduc_func) 

491 tmp_df.columns = [reduc_names[n] + "_" + str(i) for i in tmp_df.columns] 

492 new_data.append(tmp_df) 

493 

494 data = pd.concat(new_data, axis=1) 

495 if flag_extremum: 

496 for column in columns: 

497 column = str(column) 

498 # Max is already named extremum 

499 data["extremum_" + column] = np.maximum( 

500 np.abs(data["min_" + column].values-data["mean_" + column].values), 

501 np.abs(data["extremum_" + column].values-data["mean_" + column].values), 

502 ) 

503 data.drop(["min_" + column], axis=1, inplace=True) 

504 

505 return data 

506 

507 

508# Base measure 

509 

510 

511def entropy(y, set_val, v_bins=100): 

512 "Compute naive entropy score of y by tokenize values with max of v_bins" 

513 flag_nan = np.isnan(y) 

514 y[flag_nan] = -0.1 

515 if len(y) == 0: 

516 return 0 

517 else: 

518 labels = np.digitize( 

519 y, 

520 bins=np.sort( 

521 [ 

522 np.quantile(y, i / set_val) 

523 for i in np.arange(0, min(set_val, v_bins)) 

524 ] 

525 ), 

526 ) 

527 labels = labels.astype(int) 

528 

529 value, counts = np.unique(labels, return_counts=True) 

530 count = np.zeros(set_val + 1) + 1 

531 for n, i in enumerate(value): 

532 count[i] = counts[n] 

533 norm_counts = count / count.sum() 

534 return -(norm_counts * np.log(norm_counts)).sum() 

535 

536 

537# Preprocessing function: 

538 

539 

540def df_interpolation_and_fusion(list_df, target_index_scale, dtype="datetime64[s]"): 

541 """Interpolation of all sources on a same temporal referencial 

542 

543 Args: 

544 list_df (list of 2D array): List of dataframe 

545 target_index_scale (_type_): Indice of sensors 

546 dtype: 

547 

548 Returns: 

549 interpolated_data: List of interpolated array 

550 """ 

551 interpolated_data = [] 

552 list_columns = [] 

553 if type(target_index_scale): 

554 target_index_scale = target_index_scale.astype(dtype).astype(float) 

555 

556 for df in list_df: 

557 df_index = df.index.values.astype(dtype).astype(float) 

558 list_columns.append(list(df.columns)) 

559 if len(df_index) == len(target_index_scale): 

560 if (df_index - target_index_scale).sum() == 0: 

561 interpolated_data.append(df.values) 

562 else: 

563 new_channels = np.stack( 

564 [ 

565 interpolate( 

566 x=df_index, 

567 y=channels, 

568 xnew=target_index_scale, 

569 moving_average=True, 

570 )[1] 

571 for channels in df.values.T 

572 ] 

573 ).T 

574 

575 interpolated_data.append(new_channels) 

576 

577 interpolated_data = np.swapaxes(interpolated_data, 0, 1).reshape( 

578 len(target_index_scale), -1 

579 ) 

580 

581 interpolated_data = pd.DataFrame( 

582 interpolated_data, 

583 columns=np.concatenate(list_columns), 

584 index=target_index_scale.astype(dtype), 

585 ) 

586 

587 return interpolated_data 

588 

589 

590# Map reduce statistics (may be computationally suboptimal) 

591 

592 

593def Past_Moving_window_mapping(array, deta, window_size=None): 

594 # Past moving mapping using yield: Unefficient for large data ! 

595 if window_size is None: 

596 window_size = deta 

597 

598 for i in range(int(np.floor(len(array) / deta))): 

599 yield array[max(0, i * deta - window_size) : i * deta + 1] 

600 

601 

602def identity(x, **kwargs): 

603 return x 

604 

605 

606def map_reduce(data, map_=identity, map_paramaters={}, reduce=identity): 

607 mapping = Regular_Moving_window_mapping(data, **map_paramaters) 

608 reduced = np.array(list(map(reduce, mapping))) 

609 return reduced 

610 

611 

612def auto_corr_reduce(set_): 

613 mean = np.mean(set_) 

614 var = np.var(set_) 

615 set_ = set_ - mean 

616 acorr = np.correlate(set_, set_, "full")[len(set_) - 1 :] 

617 acorr = acorr / var / len(set_) 

618 return acorr 

619 

620 

621def mean_reduce(set_): 

622 mu = np.nanmean(set_, axis=0) 

623 std = np.nanstd(set_, axis=0) 

624 carac = np.concatenate([mu[:, None], std[:, None]], axis=1) 

625 return carac 

626 

627 

628def last_reduce(set_): 

629 return set_[-1] 

630 

631 

632def corrcoef_reduce(set_): 

633 return np.corrcoef(set_.T) 

634 

635 

636def fft_reduce(set_): 

637 fft = np.fft.fft(set_.T) 

638 energy = np.abs(fft)[:, 0 : int(len(set_) / 2)] / len(set_) 

639 phase = np.angle(fft)[:, 0 : int(len(set_) / 2)] 

640 carac = np.concatenate([energy, phase], axis=1) 

641 return carac 

642 

643 

644def Regular_Moving_window_mapping(array, deta, window_size, mode="left", **kwargs): 

645 if window_size is None: 

646 window_size = deta 

647 for i in range(int(np.floor(len(array) / deta))): 

648 if mode == "left": 

649 yield array[i * deta : i * deta + window_size] 

650 elif mode == "center": 

651 size_bot = int(np.ceil(window_size / 2)) 

652 size_top = int(np.floor(window_size / 2)) 

653 yield array[i * deta - size_bot : i * deta + size_top] 

654 

655 elif mode == "right": 

656 yield array[i * deta - (window_size - 1) : i * deta + 1] 

657 

658 else: 

659 raise ("error mode") 

660 

661 

662# Splitter 

663 

664 

665def identity_split(X_fit, y_fit, X_calib, y_calib): 

666 """Identity splitter that wraps an already existing data assignment""" 

667 

668 def iterable_split(X, y): 

669 """X and y are placeholders.""" 

670 iterable = [(X_fit, y_fit, X_calib, y_calib)] 

671 return iterable 

672 

673 return iterable_split 

674 

675 

676def random_split(ratio): 

677 """Random splitter that assign samples given a ratio""" 

678 

679 def iterable_split(X, y): 

680 rng = np.random.RandomState(0) 

681 fit_sample = rng.rand(len(X)) > ratio 

682 cal_sample = np.invert(fit_sample) 

683 return [(X[fit_sample], y[fit_sample], X[cal_sample], y[cal_sample])] 

684 

685 return iterable_split 

686 

687 

688def kfold_random_split(K, random_state=None): 

689 """Splitter that randomly assign data into K folds""" 

690 kfold = sklearn.model_selection.KFold(K, shuffle=True, random_state=random_state) 

691 

692 def iterable_split(X, y): 

693 iterable = [] 

694 for fit, calib in kfold.split(X): 

695 iterable.append((X[fit], y[fit], X[calib], y[calib])) 

696 return iterable 

697 

698 return iterable_split 

699 

700 

701# Encapsulated data from stored dict file: 

702 

703 

704class splitter: 

705 """Generic data-set provider (Iterable)""" 

706 

707 def __init__(self, X_split): 

708 self.X_split = X_split 

709 

710 def split(self, X): 

711 def cv_split(X_split, i): 

712 train = np.arange(len(X))[X_split < i] 

713 test = np.arange(len(X))[X_split == i] 

714 return (train, test) 

715 

716 return [ 

717 cv_split(self.X_split, i) for i in range(1, 1 + int(self.X_split.max())) 

718 ] 

719 

720 

721# Encapsulated data from array: 

722 

723 

724def dataset_generator_from_array( 

725 X, 

726 y, 

727 context=None, 

728 objective=None, 

729 sk_split=TimeSeriesSplit(5), 

730 repetition=1, 

731 remove_from_train=None, 

732 attack_name="", 

733 cv_list_name=None, 

734): 

735 """Produce data_generator (iterable [X, y, X_split, context, objective, name]) from arrays 

736 

737 Args: 

738 X (array): Inputs. 

739 y (array or None): Targets. 

740 context (array or None): Additional information. 

741 objective (array or None): Ground truth (Unsupervised task). 

742 sk_split (split strategy): Sklearn split strategy.""" 

743 

744 def select_or_none(array, sample): 

745 if array is None: 

746 return None 

747 elif isinstance(array, str): 

748 return array 

749 else: 

750 return array[sample] 

751 

752 if remove_from_train is None: 

753 remove_from_train = np.zeros(len(X)) 

754 

755 dataset_generator = [] 

756 for n_repet in np.arange(repetition): 

757 cpt = 0 

758 if n_repet == 0: 

759 str_repet = "" 

760 else: 

761 str_repet = "_bis" + str(n_repet) 

762 

763 for train_index, test_index in sk_split.split(X): 

764 X_split = np.zeros(len(X)) 

765 X_split[train_index] = 1 

766 X_split[(remove_from_train == 1) & (X_split == 1)] = -1 

767 

768 sample_cv = sorted(np.concatenate([train_index, test_index])) 

769 cv_name = "cv_" + str(cpt) + attack_name + str_repet 

770 if cv_list_name: 

771 cv_name = cv_list_name[cpt] + attack_name + str_repet 

772 dataset_generator.append( 

773 [ 

774 select_or_none(e, sample_cv) 

775 for e in [X, y, X_split, context, objective, cv_name] 

776 ] 

777 ) 

778 cpt += 1 

779 return dataset_generator 

780 

781 

782#################################################################### 

783# Raw data structure regularisation & statistics synthesize. 

784 

785 

786def raw_analysis(raw_series, time_structure): 

787 source = raw_series.columns[0] 

788 x = raw_series.index 

789 y = raw_series[source].values 

790 frequence = time_structure.get("frequence", 1) 

791 dtype = time_structure.get("dtype", "datetime64[s]") 

792 n_nan = np.isnan(y).sum() 

793 range_ = ( 

794 (x.values[-1] - x.values[0]) 

795 .astype(dtype.replace("datetime", "timedelta")) 

796 .astype(float) 

797 ) 

798 

799 if len(y) > 0: 

800 n_obs = len(y) 

801 ratio_comp = np.round(n_obs / (frequence * range_), 4) 

802 

803 else: 

804 n_val, v_entropy, n_obs, n_nan, ratio_comp = 0, 0, 0, 0, 0 

805 

806 n_val = len(set(y)) 

807 v_entropy = entropy(y, n_val) 

808 time_structure.set("n_obs", n_obs) 

809 time_structure.set("ratio_comp", ratio_comp) 

810 time_structure.set("n_val", n_val) 

811 time_structure.set("v_entropy", v_entropy) 

812 print(n_val, v_entropy, n_obs, n_nan, ratio_comp) 

813 

814 

815#################################################################### 

816# Auxilliaire Preprocessor function 

817 

818 

819def process_raw_source(self, data, query, structure): 

820 time_structure = structure 

821 

822 raw_series, dict_errors = data 

823 

824 type_sensor_error = time_structure.get("type_sensor_error") 

825 time_structure.set("list_error", dict_errors) 

826 time_structure.set("type_sensor_error", type_sensor_error) 

827 time_structure.set("dict_errors", dict_errors) 

828 raw_analysis(raw_series, time_structure) 

829 return (raw_series, time_structure) 

830 

831 

832def process_irregular_data(self, data, query, structure): 

833 """Apply interpolation & statistics extraction on data using query parameters 

834 with metadata stored in structure ['start_date','end_date','delta'] 

835 of structure are used to specificy the start, the end and the statistics_step_synthesis. 

836 ['window_size','begin_by_interpolation] of query are used to specify 

837 the final step (delta*window_size) and if there is a pre-interpolation step. 

838 

839 

840 Args: 

841 data (_type_): _description_ 

842 query (_type_): _description_ 

843 structure (_type_): _description_ 

844 

845 Returns: 

846 _type_: _description_ 

847 """ 

848 

849 time_structure = structure 

850 begin_by_interpolation = False 

851 begin_by_interpolation = time_structure.get("begin_by_interpolation", False) 

852 window_size = time_structure.get("window_size", 1) 

853 

854 if len(data) == 2: 

855 raw_series, raw_time_structure = data 

856 dict_errors = raw_time_structure.get("dict_errors") 

857 else: 

858 raw_series = data 

859 dict_errors = {} 

860 

861 time_structure.set("dict_errors", dict_errors) 

862 

863 # Specify rolling statistics 

864 delta = time_structure.get("delta", 1) 

865 dtype = time_structure.get("dtype", "datetime64[s]") 

866 

867 stats = time_structure.get("stats", ["mean", "std", "extremum", "count"]) 

868 

869 # Statistics processing experte kwdowledge based: 

870 frequence = time_structure.get("frequence", 1) 

871 precision = time_structure.get("precision", 1) 

872 

873 # Interpolation specification 

874 interpolation = time_structure.get("interpolation", "previous") 

875 max_time_jump = time_structure.get("max_time_jump", 10) 

876 replace_val = time_structure.get("replace_val", None) 

877 

878 # Regular interpolation on rolling statistics: 

879 start_date = time_structure.get("start_date", None) 

880 end_date = time_structure.get("end_date", None) 

881 

882 if begin_by_interpolation: 

883 raw_series = regular_stat_series = upscale_series( 

884 raw_series, 

885 delta * frequence, 

886 start_date=start_date, 

887 end_date=end_date, 

888 x_time_jump=max_time_jump, 

889 replace_val=replace_val, 

890 mode=interpolation, 

891 ) 

892 

893 new_delta = pre_struc.check_delta(delta * window_size, dtype=dtype) 

894 

895 # Irregular rolling statistics 

896 regular_stat_series = rolling_statistics( 

897 raw_series, delta=new_delta, step=None, reduc_functions=stats, reduc_names=stats 

898 ) 

899 

900 columns = regular_stat_series.columns 

901 flag_extremum = ["extremum" in column for column in columns] 

902 flag_count = ["count" in column for column in columns] 

903 flag_std = ["std" in column for column in columns] 

904 

905 # Normalisation of count statistics 

906 for col_name in columns[flag_std]: 

907 std_value = regular_stat_series[col_name].values 

908 std_value[np.isnan(std_value)] = 0.000001 

909 regular_stat_series[col_name] = np.maximum(precision, std_value) 

910 # Normalisation of count statistics 

911 for col_name in columns[flag_extremum]: 

912 regular_stat_series[col_name] = np.log( 

913 0.0001 + regular_stat_series[col_name].values 

914 ) 

915 

916 # Include consecutive non-observation in statistics 

917 for col_name in columns[flag_count]: 

918 consecutive_no_obs = ( 

919 -np.log10( 

920 0.5 

921 + np.maximum( 

922 ut.cum_sum_consecutive_zero( 

923 regular_stat_series[col_name].values >= 1 

924 ) 

925 - 1, 

926 0, 

927 ) 

928 ) 

929 / 3 

930 ) 

931 

932 regular_stat_series[col_name] = ( 

933 (regular_stat_series[col_name].values - consecutive_no_obs) 

934 / frequence 

935 * new_delta.astype(float) 

936 ) 

937 

938 regular_stat_series = upscale_series( 

939 regular_stat_series, 

940 new_delta, 

941 start_date=start_date, 

942 end_date=end_date, 

943 max_time_jump=max_time_jump, 

944 replace_val=replace_val, 

945 mode=interpolation, 

946 ) 

947 

948 return (regular_stat_series, time_structure) 

949 

950 

951#################################################################### 

952# Auxilliaire Preprocessor function 

953 

954 

955def process_label( 

956 label_df, sources_selection, start_date, end_date, delta=1, dtype="datetime64[s]" 

957): 

958 """Process anom label dataframe with (start: datetime64[s], end: datetime64[s],source) 

959 Into a ground truth matrix with a regular step scale of delta that start at start_date & end at end_date 

960 """ 

961 source_anom = [i for i in label_df["source"]] 

962 flag_selection = [i in sources_selection for i in source_anom] 

963 

964 step_begin_anoms = ( 

965 label_df[flag_selection]["start"].astype(dtype).astype(int).values 

966 ) 

967 step_end_anoms = label_df[flag_selection]["end"].astype(dtype).astype(int).values 

968 

969 step_begin = pre_struc.date_to_step(start_date) 

970 step_end = pre_struc.date_to_step(end_date) 

971 

972 step_scale = pre_struc.get_regular_step_scale( 

973 delta, step_end - step_begin, step_begin 

974 ) 

975 

976 label_anom = np.zeros((len(step_scale), len(sources_selection))) 

977 for i, name in enumerate(sources_selection): 

978 mask = (step_scale > step_begin_anoms[i]) & (step_scale < step_end_anoms[i]) 

979 label_anom[mask, i] += 1 

980 date_index = pre_struc.step_to_date( 

981 step_scale, delta=delta, date_init=start_date, dtype=dtype 

982 ) 

983 label_df = pd.DataFrame(data=label_anom, index=date_index) 

984 

985 return label_df