Coverage for uqmodels/preprocessing/structure.py: 23%
176 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1"""
2Specification of structure object representing operation knowledge about specific data structure.
3"""
5import datetime as dt
7import jsonpickle
8import numpy as np
9import pandas as pd
11date_init_default = "1970-01-01 00:00:00.000000"
14def get_unit(dtype):
15 return np.datetime_data(np.datetime64("0").astype(dtype=dtype))[0]
18def check_delta(delta, dtype="datetime64[s]"):
19 unit = get_unit(dtype)
21 if delta is None:
22 return None
24 elif hasattr(delta, "to_timedelta64"): # Hold pd.Timestamp
25 delta = delta.to_timedelta64().astype("timedelta64[" + unit + "]")
27 elif isinstance(delta, np.timedelta64):
28 delta = delta.astype("timedelta64[" + unit + "]")
30 elif isinstance(delta, dt.timedelta):
31 delta = np.timedelta64(delta.seconds, "s").astype("timedelta64[" + unit + "]")
32 else:
33 try:
34 delta = np.timedelta64(delta, unit)
35 except BaseException:
36 try:
37 delta = np.timedelta64(pd.Timedelta(delta), unit)
38 except BaseException:
39 ValueError("delta :", delta, "not recognized")
40 return delta
43def check_date(date, dtype="datetime64[s]"):
44 unit = get_unit(dtype)
46 if date is None:
47 return None
49 elif hasattr(date, "to_datetime64"): # Hold pd.Timestamp
50 date = date.to_timedelta64().astype("datetime64[" + unit + "]")
52 elif isinstance(date, np.datetime64):
53 date = date.astype("datetime64[" + unit + "]")
54 else:
55 try:
56 date = np.datetime64(date).astype("datetime64[" + unit + "]")
57 except BaseException:
58 ValueError("date :", date, "not recognized")
60 return date
63class Structure:
64 # Data specification :
65 # Exemple 1 : Regular time structure specifying temporal interaction.
66 # Exemple 2 : Space interaction structure.
67 def __init__(self, name, **kwargs):
68 self.name = name
69 for key in kwargs:
70 setattr(self, key, kwargs[key])
72 def set(self, key, obj):
73 """self[key] = obj using setattr function
75 Args:
76 key (str): key of attribute
77 obj (obj): attribute to store
78 """
79 setattr(self, key, obj)
81 def toJSON(self):
82 return jsonpickle.encode(self)
84 def get_structure(self, str_key, **kargs):
85 return self
87 def get(self, keys, default_value=None, **kwarg):
88 """get list of obj related to keys (or obj relate to key if not list) return default values if key not found
90 Args:
91 keys (str or list of str): key or list of ker
92 default_value (_type_, optional): default values if key not found. Defaults to None.
94 Returns:
95 objs : list of obj or a obj
96 """
98 # Get attributes form keys.
99 # Handle list of attributes keys and returs list of attributes
100 # Warning remove None if not found
101 list_keys = self.__dict__.keys()
103 not_list = False
104 if not isinstance(keys, list):
105 not_list = True
106 keys = [keys]
108 list_obj = []
109 for key in keys:
110 if key in list_keys:
111 list_obj.append(getattr(self, key))
112 else:
113 list_obj.append(default_value)
115 if not_list:
116 list_obj = list_obj[0]
118 return list_obj
121# Regular temporal representation ############################
124def regular_date_scale(start, end=None, periods=None, delta=1, dtype="datetime64[s]"):
125 """Create regular date scale of dtype using pd.date_range starting at start date,
126 and ending a end date or start + range * freq
128 Args:
129 start (str or date): start date
130 end (str or date or None, optional): end date. Defaults to None : use start + range*freq
131 periods (int, optional): number of period. Defaults to 1000.
132 delta (int or timedelta, optional): delta of scale.
133 dtype (str, optional): dtype. Defaults to "datetime64[s]".
135 Returns:
136 _type_: _description_
137 """
139 delta = pd.Timedelta(check_delta(delta, dtype))
140 start = check_date(start, dtype)
141 end = check_date(end, dtype)
143 date_scale = pd.date_range(
144 start=start, end=end, periods=periods, freq=delta
145 ).astype(dtype=dtype)
147 return date_scale
150def str_to_date(str, dtype="datetime64[s]"):
151 return check_date(str, dtype)
154def step_to_date(step, delta=1, dtype="datetime64[s]", date_init=None):
155 """Transform float_step or float_step_array into a date using datetime64[s] format and delta + d_init information
156 date format : "%Y-%m-%d %H:%M:%S.%f" deeping about precision.
157 date = (step/delta-d_init).astype(dtype).tostr()
158 Args:
159 step (float or np.array(float)): float representing step
160 delta (int, optional): delta between two step. Defaults to 1.
161 dtype (str, optional): dtype of date. Defaults to 'datetime64[s]'.
162 date_init (str, optional): str_date of first step. Defaults to None.
164 Returns:
165 date or np.array(date): date that can be cast as float using date.astype(str)
166 """
167 delta = check_delta(delta, dtype).astype(float)
168 date_init = check_date(date_init, dtype).astype(float)
170 if (not type(step) == np.array) & (not type(step) == np.ndarray):
171 step = np.array(step)
173 step_init = 0
174 if date_init is not None:
175 step_init = date_init
177 return (step * delta + step_init).astype(dtype)
180def date_to_step(date, delta=1, dtype="datetime64[s]", date_init=None):
181 """Transform date or date_array into a step using datetime64[s] format and delta + d_init information
182 date format : "%Y-%m-%d %H:%M:%S.%f" deeping about precision.
183 step = (date).astype(dtype).tofloat * delta + (date_init).astype(dtype).to_float
185 Args:
186 date (date or np.array(date)): datetime64 or str_date format : "%Y-%m-%d %H:%M:%S.%f"
187 delta (int, optional): delta between two step. Defaults to 1.
188 dtype (str, optional): dtype of date. Defaults to 'datetime64[s]'.
189 date_init (str, optional): str_date of first step. Defaults to None.
191 Returns:
192 step or np.array(step): step in float representation
193 """
195 date = check_date(date, dtype).astype(float)
196 delta = check_delta(delta, dtype).astype(float)
197 step = date / delta
199 step_init = 0
200 if date_init is not None:
201 step_init = check_date(date_init, dtype).astype(float) / delta
203 return step - step_init
206def get_regular_step_scale(delta, range_temp, time_offset=0, **kwarg):
207 """Generate regular step_scale with delta :
208 Args:
209 delta (int): size of unitary delta between windows
210 range_temp (int): temporal range
211 padding (int): Initial_state
212 mode (str): linespace or arange
214 Returns:
215 step_scale: Numeric regular time scale
216 """
217 delta = check_delta(delta).astype(int)
219 step_scale = np.arange(time_offset, time_offset + range_temp, delta)
220 return np.round(step_scale, 2)
223def get_step_mask(step, step_min, step_max, out_of_mask=True):
224 """Compute mask of step_scale array from time boundary
226 Args:
227 time (array): step_scale
228 x_min (float): Minimal considered step
229 x_max (float): Maximal considered steps
230 out_of_mask (bool, optional): if true incorporate the previous and the next out of bondary step.
232 Returns:
233 _type_: _description_
234 """
235 mask_step = np.array(((step >= step_min) & (step <= step_max)))
236 if (mask_step.sum() > 0) & out_of_mask:
237 min_ = max(np.arange(len(mask_step))[mask_step].min(), 2)
238 max_ = min(np.arange(len(mask_step))[mask_step].max(), len(step) - 2)
239 if (min_ > 2) & (max_ < (len(mask_step) - 2)):
240 (
241 mask_step[min_ - 2],
242 mask_step[min_ - 1],
243 mask_step[max_ + 1],
244 mask_step[max_ + 2],
245 ) = (True, True, True, True)
246 else:
247 min_ = max(np.arange(len(mask_step))[mask_step].min(), 1)
248 max_ = min(np.arange(len(mask_step))[mask_step].max(), len(step) - 1)
249 if max_ < (len(mask_step) - 2):
250 mask_step[min_ - 1] = True
251 mask_step[max_ + 1] = True
252 return mask_step
255def get_date_mask(
256 date,
257 date_min,
258 date_max,
259 out_of_mask=True,
260 delta=1,
261 dtype="datetime64[s]",
262 date_init=None,
263):
264 date = check_date(date, dtype)
265 step = date_to_step(date, dtype=dtype, date_init=date_init)
266 step_min = date_to_step(date_min, dtype=dtype, date_init=date_init)
267 step_max = date_to_step(date_max, dtype=dtype, date_init=date_init)
268 return get_step_mask(step, step_min, step_max, out_of_mask=out_of_mask)
271def window_expansion(step, n_expend=5, delta=1):
272 new_step = np.repeat(step, n_expend).astype(float)
273 for i in range(n_expend):
274 mask = (np.arange(len(step)) % n_expend) == i
275 new_step[mask] += (n_expend * delta / 2) - delta * i
276 return new_step
279def time_selection(x, y, x_min, x_max, out_of_mask, mode="step"):
280 if mode == "step":
281 mask = get_date_mask(x, x_min, x_max, out_of_mask)
283 if mode == "date":
284 mask = get_date_mask(x, x_min, x_max, out_of_mask)
286 return (x[mask], y[mask])
289def regular_representation(list_output, list_delta, delta_target, dim_t=0):
290 """Resample list of ndarray using np.repeat according to time representation parameters of each source
292 Args:
293 list_output (_type_): list of models output for each source
294 list_step_scale (_type_): list of times parameters for each source
296 Returns:
297 list_output with same length (using duplication)
298 """
299 list_new_output = []
300 for output, delta in zip(list_output, list_delta):
301 list_new_output.append(np.repeat(output, delta, axis=dim_t))
302 return list_new_output
305class Irregular_time(Structure):
306 # Specification of Irregular time series strustures:
307 def __init__(
308 self,
309 name,
310 start_date,
311 date_init=date_init_default,
312 dtype="datetime64[s]",
313 **kwargs
314 ):
315 super().__init__(
316 name=name, dtype=dtype, start_date=start_date, date_init=date_init, **kwargs
317 )
319 def get_date(self, step):
320 return step_to_date(
321 step, dtype=self.dtype, delta=self.start_date, date_init=self.date_init
322 )
324 def get_step(self, date):
325 return date_to_step(
326 date, dtype=self.dtype, delta=self.start_date, date_init=self.date_init
327 )
330class Regular_time(Structure):
331 # Specification of data knowledge relative to a interaction in data :
332 # Exemple 1 : Regular time structure specyfying temporal interaction.
333 # Exemple 2 : Space interaction structure.
334 def __init__(
335 self,
336 name,
337 start_date,
338 delta=np.timedelta64(1, "s"),
339 window_size=None,
340 date_init=date_init_default,
341 dtype="datetime64[s]",
342 **kargs
343 ):
344 if window_size is None:
345 window_size = delta
347 super().__init__(
348 name=name,
349 dtype=dtype,
350 delta=delta,
351 window_size=window_size,
352 start_date=start_date,
353 **kargs
354 )
356 def get_date(self, step):
357 return step_to_date(
358 step, delta=self.delta, dtype=self.dtype, date_init=self.date_init
359 )
361 def get_step(self, date):
362 return date_to_step(
363 date, delta=self.delta, dtype=self.dtype, date_init=self.date_init
364 )
366 def get_step_scale(self, start_date, end_date):
367 """Generate step_scale using specification
368 Returns:
369 step_scale : Numeric time regular array"""
371 step_begin = self.get_step(start_date)
372 step_end = self.get_step(end_date)
374 delta = self.get("delta")
376 step_scale = get_regular_step_scale(
377 delta, range_temp=step_begin - step_end, time_offset=step_begin
378 )
379 return np.round(step_scale, 2)
382# Multi_source_structure ############################
385class Multi_source(Structure):
386 def __init__(self, regular_sub_structure=True, name="Multi_sources", **kwargs):
387 list_structure = []
388 for ind, source in enumerate(kwargs["sources"]):
389 dict_time_structure = {"name": source}
390 for key in kwargs.keys():
391 values = kwargs[key]
392 if type(values) in [list, np.ndarray]:
393 if len(values) == len(kwargs["sources"]):
394 values = values[ind]
395 dict_time_structure[key] = values
397 # Put meta data in Irregular_time_structure object
398 if not (regular_sub_structure):
399 sub_structure = Irregular_time(**dict_time_structure)
401 # Put meta data in Regular_time_structure object
402 else:
403 sub_structure = Regular_time(**dict_time_structure)
405 list_structure.append(sub_structure)
407 super().__init__(
408 name,
409 list_key_source=np.arange(len(kwargs["sources"])),
410 list_structure_source=list_structure,
411 **kwargs
412 )
414 def get_structure(self, str_key):
415 try:
416 ind_key = list(self.get("sources")).index(str_key)
417 return self.get("list_structure_source")[ind_key]
418 except BaseException:
419 return self
421 def get(self, keys, default_value=None, query=dict()):
422 if "source" in query.keys():
423 return self.get_structure(query["source"]).get(
424 keys, default_value=default_value
425 )
426 else:
427 return super().get(keys, default_value=None)