Coverage for uqmodels/preprocessing/structure.py: 23%

176 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1""" 

2Specification of structure object representing operation knowledge about specific data structure. 

3""" 

4 

5import datetime as dt 

6 

7import jsonpickle 

8import numpy as np 

9import pandas as pd 

10 

11date_init_default = "1970-01-01 00:00:00.000000" 

12 

13 

14def get_unit(dtype): 

15 return np.datetime_data(np.datetime64("0").astype(dtype=dtype))[0] 

16 

17 

18def check_delta(delta, dtype="datetime64[s]"): 

19 unit = get_unit(dtype) 

20 

21 if delta is None: 

22 return None 

23 

24 elif hasattr(delta, "to_timedelta64"): # Hold pd.Timestamp 

25 delta = delta.to_timedelta64().astype("timedelta64[" + unit + "]") 

26 

27 elif isinstance(delta, np.timedelta64): 

28 delta = delta.astype("timedelta64[" + unit + "]") 

29 

30 elif isinstance(delta, dt.timedelta): 

31 delta = np.timedelta64(delta.seconds, "s").astype("timedelta64[" + unit + "]") 

32 else: 

33 try: 

34 delta = np.timedelta64(delta, unit) 

35 except BaseException: 

36 try: 

37 delta = np.timedelta64(pd.Timedelta(delta), unit) 

38 except BaseException: 

39 ValueError("delta :", delta, "not recognized") 

40 return delta 

41 

42 

43def check_date(date, dtype="datetime64[s]"): 

44 unit = get_unit(dtype) 

45 

46 if date is None: 

47 return None 

48 

49 elif hasattr(date, "to_datetime64"): # Hold pd.Timestamp 

50 date = date.to_timedelta64().astype("datetime64[" + unit + "]") 

51 

52 elif isinstance(date, np.datetime64): 

53 date = date.astype("datetime64[" + unit + "]") 

54 else: 

55 try: 

56 date = np.datetime64(date).astype("datetime64[" + unit + "]") 

57 except BaseException: 

58 ValueError("date :", date, "not recognized") 

59 

60 return date 

61 

62 

63class Structure: 

64 # Data specification : 

65 # Exemple 1 : Regular time structure specifying temporal interaction. 

66 # Exemple 2 : Space interaction structure. 

67 def __init__(self, name, **kwargs): 

68 self.name = name 

69 for key in kwargs: 

70 setattr(self, key, kwargs[key]) 

71 

72 def set(self, key, obj): 

73 """self[key] = obj using setattr function 

74 

75 Args: 

76 key (str): key of attribute 

77 obj (obj): attribute to store 

78 """ 

79 setattr(self, key, obj) 

80 

81 def toJSON(self): 

82 return jsonpickle.encode(self) 

83 

84 def get_structure(self, str_key, **kargs): 

85 return self 

86 

87 def get(self, keys, default_value=None, **kwarg): 

88 """get list of obj related to keys (or obj relate to key if not list) return default values if key not found 

89 

90 Args: 

91 keys (str or list of str): key or list of ker 

92 default_value (_type_, optional): default values if key not found. Defaults to None. 

93 

94 Returns: 

95 objs : list of obj or a obj 

96 """ 

97 

98 # Get attributes form keys. 

99 # Handle list of attributes keys and returs list of attributes 

100 # Warning remove None if not found 

101 list_keys = self.__dict__.keys() 

102 

103 not_list = False 

104 if not isinstance(keys, list): 

105 not_list = True 

106 keys = [keys] 

107 

108 list_obj = [] 

109 for key in keys: 

110 if key in list_keys: 

111 list_obj.append(getattr(self, key)) 

112 else: 

113 list_obj.append(default_value) 

114 

115 if not_list: 

116 list_obj = list_obj[0] 

117 

118 return list_obj 

119 

120 

121# Regular temporal representation ############################ 

122 

123 

124def regular_date_scale(start, end=None, periods=None, delta=1, dtype="datetime64[s]"): 

125 """Create regular date scale of dtype using pd.date_range starting at start date, 

126 and ending a end date or start + range * freq 

127 

128 Args: 

129 start (str or date): start date 

130 end (str or date or None, optional): end date. Defaults to None : use start + range*freq 

131 periods (int, optional): number of period. Defaults to 1000. 

132 delta (int or timedelta, optional): delta of scale. 

133 dtype (str, optional): dtype. Defaults to "datetime64[s]". 

134 

135 Returns: 

136 _type_: _description_ 

137 """ 

138 

139 delta = pd.Timedelta(check_delta(delta, dtype)) 

140 start = check_date(start, dtype) 

141 end = check_date(end, dtype) 

142 

143 date_scale = pd.date_range( 

144 start=start, end=end, periods=periods, freq=delta 

145 ).astype(dtype=dtype) 

146 

147 return date_scale 

148 

149 

150def str_to_date(str, dtype="datetime64[s]"): 

151 return check_date(str, dtype) 

152 

153 

154def step_to_date(step, delta=1, dtype="datetime64[s]", date_init=None): 

155 """Transform float_step or float_step_array into a date using datetime64[s] format and delta + d_init information 

156 date format : "%Y-%m-%d %H:%M:%S.%f" deeping about precision. 

157 date = (step/delta-d_init).astype(dtype).tostr() 

158 Args: 

159 step (float or np.array(float)): float representing step 

160 delta (int, optional): delta between two step. Defaults to 1. 

161 dtype (str, optional): dtype of date. Defaults to 'datetime64[s]'. 

162 date_init (str, optional): str_date of first step. Defaults to None. 

163 

164 Returns: 

165 date or np.array(date): date that can be cast as float using date.astype(str) 

166 """ 

167 delta = check_delta(delta, dtype).astype(float) 

168 date_init = check_date(date_init, dtype).astype(float) 

169 

170 if (not type(step) == np.array) & (not type(step) == np.ndarray): 

171 step = np.array(step) 

172 

173 step_init = 0 

174 if date_init is not None: 

175 step_init = date_init 

176 

177 return (step * delta + step_init).astype(dtype) 

178 

179 

180def date_to_step(date, delta=1, dtype="datetime64[s]", date_init=None): 

181 """Transform date or date_array into a step using datetime64[s] format and delta + d_init information 

182 date format : "%Y-%m-%d %H:%M:%S.%f" deeping about precision. 

183 step = (date).astype(dtype).tofloat * delta + (date_init).astype(dtype).to_float 

184 

185 Args: 

186 date (date or np.array(date)): datetime64 or str_date format : "%Y-%m-%d %H:%M:%S.%f" 

187 delta (int, optional): delta between two step. Defaults to 1. 

188 dtype (str, optional): dtype of date. Defaults to 'datetime64[s]'. 

189 date_init (str, optional): str_date of first step. Defaults to None. 

190 

191 Returns: 

192 step or np.array(step): step in float representation 

193 """ 

194 

195 date = check_date(date, dtype).astype(float) 

196 delta = check_delta(delta, dtype).astype(float) 

197 step = date / delta 

198 

199 step_init = 0 

200 if date_init is not None: 

201 step_init = check_date(date_init, dtype).astype(float) / delta 

202 

203 return step - step_init 

204 

205 

206def get_regular_step_scale(delta, range_temp, time_offset=0, **kwarg): 

207 """Generate regular step_scale with delta : 

208 Args: 

209 delta (int): size of unitary delta between windows 

210 range_temp (int): temporal range 

211 padding (int): Initial_state 

212 mode (str): linespace or arange 

213 

214 Returns: 

215 step_scale: Numeric regular time scale 

216 """ 

217 delta = check_delta(delta).astype(int) 

218 

219 step_scale = np.arange(time_offset, time_offset + range_temp, delta) 

220 return np.round(step_scale, 2) 

221 

222 

223def get_step_mask(step, step_min, step_max, out_of_mask=True): 

224 """Compute mask of step_scale array from time boundary 

225 

226 Args: 

227 time (array): step_scale 

228 x_min (float): Minimal considered step 

229 x_max (float): Maximal considered steps 

230 out_of_mask (bool, optional): if true incorporate the previous and the next out of bondary step. 

231 

232 Returns: 

233 _type_: _description_ 

234 """ 

235 mask_step = np.array(((step >= step_min) & (step <= step_max))) 

236 if (mask_step.sum() > 0) & out_of_mask: 

237 min_ = max(np.arange(len(mask_step))[mask_step].min(), 2) 

238 max_ = min(np.arange(len(mask_step))[mask_step].max(), len(step) - 2) 

239 if (min_ > 2) & (max_ < (len(mask_step) - 2)): 

240 ( 

241 mask_step[min_ - 2], 

242 mask_step[min_ - 1], 

243 mask_step[max_ + 1], 

244 mask_step[max_ + 2], 

245 ) = (True, True, True, True) 

246 else: 

247 min_ = max(np.arange(len(mask_step))[mask_step].min(), 1) 

248 max_ = min(np.arange(len(mask_step))[mask_step].max(), len(step) - 1) 

249 if max_ < (len(mask_step) - 2): 

250 mask_step[min_ - 1] = True 

251 mask_step[max_ + 1] = True 

252 return mask_step 

253 

254 

255def get_date_mask( 

256 date, 

257 date_min, 

258 date_max, 

259 out_of_mask=True, 

260 delta=1, 

261 dtype="datetime64[s]", 

262 date_init=None, 

263): 

264 date = check_date(date, dtype) 

265 step = date_to_step(date, dtype=dtype, date_init=date_init) 

266 step_min = date_to_step(date_min, dtype=dtype, date_init=date_init) 

267 step_max = date_to_step(date_max, dtype=dtype, date_init=date_init) 

268 return get_step_mask(step, step_min, step_max, out_of_mask=out_of_mask) 

269 

270 

271def window_expansion(step, n_expend=5, delta=1): 

272 new_step = np.repeat(step, n_expend).astype(float) 

273 for i in range(n_expend): 

274 mask = (np.arange(len(step)) % n_expend) == i 

275 new_step[mask] += (n_expend * delta / 2) - delta * i 

276 return new_step 

277 

278 

279def time_selection(x, y, x_min, x_max, out_of_mask, mode="step"): 

280 if mode == "step": 

281 mask = get_date_mask(x, x_min, x_max, out_of_mask) 

282 

283 if mode == "date": 

284 mask = get_date_mask(x, x_min, x_max, out_of_mask) 

285 

286 return (x[mask], y[mask]) 

287 

288 

289def regular_representation(list_output, list_delta, delta_target, dim_t=0): 

290 """Resample list of ndarray using np.repeat according to time representation parameters of each source 

291 

292 Args: 

293 list_output (_type_): list of models output for each source 

294 list_step_scale (_type_): list of times parameters for each source 

295 

296 Returns: 

297 list_output with same length (using duplication) 

298 """ 

299 list_new_output = [] 

300 for output, delta in zip(list_output, list_delta): 

301 list_new_output.append(np.repeat(output, delta, axis=dim_t)) 

302 return list_new_output 

303 

304 

305class Irregular_time(Structure): 

306 # Specification of Irregular time series strustures: 

307 def __init__( 

308 self, 

309 name, 

310 start_date, 

311 date_init=date_init_default, 

312 dtype="datetime64[s]", 

313 **kwargs 

314 ): 

315 super().__init__( 

316 name=name, dtype=dtype, start_date=start_date, date_init=date_init, **kwargs 

317 ) 

318 

319 def get_date(self, step): 

320 return step_to_date( 

321 step, dtype=self.dtype, delta=self.start_date, date_init=self.date_init 

322 ) 

323 

324 def get_step(self, date): 

325 return date_to_step( 

326 date, dtype=self.dtype, delta=self.start_date, date_init=self.date_init 

327 ) 

328 

329 

330class Regular_time(Structure): 

331 # Specification of data knowledge relative to a interaction in data : 

332 # Exemple 1 : Regular time structure specyfying temporal interaction. 

333 # Exemple 2 : Space interaction structure. 

334 def __init__( 

335 self, 

336 name, 

337 start_date, 

338 delta=np.timedelta64(1, "s"), 

339 window_size=None, 

340 date_init=date_init_default, 

341 dtype="datetime64[s]", 

342 **kargs 

343 ): 

344 if window_size is None: 

345 window_size = delta 

346 

347 super().__init__( 

348 name=name, 

349 dtype=dtype, 

350 delta=delta, 

351 window_size=window_size, 

352 start_date=start_date, 

353 **kargs 

354 ) 

355 

356 def get_date(self, step): 

357 return step_to_date( 

358 step, delta=self.delta, dtype=self.dtype, date_init=self.date_init 

359 ) 

360 

361 def get_step(self, date): 

362 return date_to_step( 

363 date, delta=self.delta, dtype=self.dtype, date_init=self.date_init 

364 ) 

365 

366 def get_step_scale(self, start_date, end_date): 

367 """Generate step_scale using specification 

368 Returns: 

369 step_scale : Numeric time regular array""" 

370 

371 step_begin = self.get_step(start_date) 

372 step_end = self.get_step(end_date) 

373 

374 delta = self.get("delta") 

375 

376 step_scale = get_regular_step_scale( 

377 delta, range_temp=step_begin - step_end, time_offset=step_begin 

378 ) 

379 return np.round(step_scale, 2) 

380 

381 

382# Multi_source_structure ############################ 

383 

384 

385class Multi_source(Structure): 

386 def __init__(self, regular_sub_structure=True, name="Multi_sources", **kwargs): 

387 list_structure = [] 

388 for ind, source in enumerate(kwargs["sources"]): 

389 dict_time_structure = {"name": source} 

390 for key in kwargs.keys(): 

391 values = kwargs[key] 

392 if type(values) in [list, np.ndarray]: 

393 if len(values) == len(kwargs["sources"]): 

394 values = values[ind] 

395 dict_time_structure[key] = values 

396 

397 # Put meta data in Irregular_time_structure object 

398 if not (regular_sub_structure): 

399 sub_structure = Irregular_time(**dict_time_structure) 

400 

401 # Put meta data in Regular_time_structure object 

402 else: 

403 sub_structure = Regular_time(**dict_time_structure) 

404 

405 list_structure.append(sub_structure) 

406 

407 super().__init__( 

408 name, 

409 list_key_source=np.arange(len(kwargs["sources"])), 

410 list_structure_source=list_structure, 

411 **kwargs 

412 ) 

413 

414 def get_structure(self, str_key): 

415 try: 

416 ind_key = list(self.get("sources")).index(str_key) 

417 return self.get("list_structure_source")[ind_key] 

418 except BaseException: 

419 return self 

420 

421 def get(self, keys, default_value=None, query=dict()): 

422 if "source" in query.keys(): 

423 return self.get_structure(query["source"]).get( 

424 keys, default_value=default_value 

425 ) 

426 else: 

427 return super().get(keys, default_value=None)