Coverage for uqmodels / modelization / DL_estimator / data_embedding.py: 56%
224 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 08:15 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 08:15 +0000
1import math
3import numpy as np
4import tensorflow as tf
5import tensorflow.keras.backend as K
6from keras.layers import LeakyReLU
7from tensorflow.keras import layers
9from uqmodels.modelization.DL_estimator.utils import (
10 find_conv_kernel,
11 set_global_determinism,
12)
14from ...utils import add_random_state
16# tf.keras.utils.get_custom_objects().clear()
19# Restructuration de code : Functionnalité de layers-preprocessing pour NN (Transformer ?)
22class Mouving_Windows_Embedding(layers.Layer):
23 def __init__(
24 self, size_window, n_windows=1, step=1, dim_d=1, dim_chan=1, seed=None, **kwargs
25 ):
26 """_summary_
28 Args:
29 sub_seq_size (_type_): _description_
30 size_window (_type_): _description_
31 dim_out (int, optional): _description_. Defaults to 1.
32 padding (int, optional): _description_. Defaults to 1.
33 seed (bool): handle experimental random using seed.
34 """
35 super(Mouving_Windows_Embedding, self).__init__()
36 self.size_window = size_window
37 self.n_windows = n_windows
38 self.step = step
39 self.dim_d = dim_d
40 self.dim_chan = dim_chan
41 self.seed = seed
42 self.last_shape = self.size_window * self.dim_d * self.dim_chan
44 def call(self, inputs, mode="encoder"):
45 """_summary_
47 Args:
48 inputs (_type_): _description_
49 mode (str, optional): _description_. Defaults to "encoder".
51 Returns:
52 _type_: _description_
53 """
54 output = inputs
55 if len(output.shape) == 3:
56 output = output[:, :, :, None]
58 slide_tensor = []
59 for i in range(self.n_windows):
60 slide_tensor.append(
61 inputs[:, (i * self.step): (i * self.step) + self.size_window]
62 )
63 MWE_raw = K.stack(slide_tensor, axis=1)
64 MWE = K.reshape(
65 MWE_raw, (-1, self.n_windows, self.size_window * self.dim_d * self.dim_chan)
66 )
68 return MWE
71class Mouving_Window_Embedding(layers.Layer):
72 def __init__(self, sub_seq_size, size_window, dim_out=1, padding=1, seed=None):
73 super(Mouving_Window_Embedding, self).__init__()
74 self.sub_seq_size = sub_seq_size
75 self.dim_out = dim_out
76 self.size_window = size_window
77 self.padding = padding
78 self.seed = seed
80 def call(self, inputs, mode="encoder"):
81 """_summary_
83 Args:
84 inputs (_type_): _description_
85 mode (str, optional): _description_. Defaults to "encoder".
87 Returns:
88 _type_: _description_
89 """
90 size_output = None
91 if mode == "encoder":
92 size_output = self.size_window
93 elif mode == "decoder":
94 size_output = self.size_window + self.dim_out - 1
95 else:
96 print("Mode error")
97 if self.sub_seq_size == 1:
98 return inputs
99 else:
100 slide_tensor = []
101 for i in range(size_output):
102 slide_tensor.append(
103 inputs[
104 :, (i * self.padding): (i * self.padding) + self.sub_seq_size
105 ]
106 )
107 MWE_raw = K.stack(slide_tensor, axis=1)
108 MWE = K.reshape(
109 MWE_raw, (-1, size_output, self.sub_seq_size * inputs.shape[-1])
110 )
111 return MWE
114@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
115class Conv2D(tf.keras.layers.Conv2D):
116 pass
119@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
120class Dropout(tf.keras.layers.Dropout):
121 pass
124@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
125class Conv1D(tf.keras.layers.Conv1D):
126 pass
129@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
130class Mouving_conv_Embedding(layers.Layer):
131 def __init__(
132 self,
133 size_window,
134 n_windows,
135 step=1,
136 dim_d=1,
137 dim_chan=1,
138 use_conv2D=True,
139 list_filters=None,
140 list_strides=[2, 1, 1],
141 list_kernels=None,
142 dp=0.01,
143 flag_mc=False,
144 seed=None,
145 **kwargs
146 ):
147 """_summary_
149 Args:
150 sub_seq_size (_type_): _description_
151 size_window (_type_): _description_
152 dim_out (int, optional): _description_. Defaults to 1.
153 padding (int, optional): _description_. Defaults to 1.
154 seed (bool): handle experimental random using seed.
155 """
157 self.size_window = size_window
158 self.n_windows = n_windows
159 self.step = step
160 self.dim_d = dim_d
161 self.dim_chan = dim_chan
162 self.use_conv2D = use_conv2D
163 self.flag_mc = flag_mc
164 self.seed = seed
165 self.mutliscale = False
166 set_global_determinism(self.seed)
168 if list_filters is None:
169 list_filters = [64 for i in list_strides]
171 if list_kernels is None:
172 list_kernels, list_strides = find_conv_kernel(
173 self.size_window, n_windows, list_strides
174 )
175 list_filters.append(list_filters[-1])
177 super().__init__(**kwargs)
179 self.list_strides = list_strides
180 self.list_kernels = list_kernels
181 self.list_filters = list_filters
183 self.layers = []
184 for n, (filters, kernel, strides) in enumerate(
185 zip(list_filters, list_kernels, list_strides)
186 ):
187 if use_conv2D:
188 if n == 0:
189 kernel = (kernel, dim_d)
190 else:
191 kernel = (kernel, 1)
192 self.layers.append(
193 Conv2D(
194 filters,
195 kernel,
196 strides=strides,
197 padding="valid",
198 activation="relu",
199 )
200 )
201 if dp > 0:
202 self.layers.append(Dropout(dp, seed=add_random_state(seed, n)))
204 else:
205 self.layers.append(
206 Conv1D(
207 filters * dim_chan * dim_d,
208 kernel,
209 strides=strides,
210 padding="valid",
211 groups=dim_chan * dim_d,
212 activation="relu",
213 )
214 )
216 # self.layers.append(tf.keras.layers.BatchNormalization())
217 # self.layers.append(tf.keras.layers.Dropout(dp))
219 if use_conv2D:
220 self.last_shape = list_filters[-1]
221 else:
222 self.last_shape = list_filters[-1] * self.dim_d * self.dim_chan
224 def call(self, inputs):
225 """_summary_
227 Args:
228 inputs (_type_): _description_
229 mode (str, optional): _description_. Defaults to "encoder".
231 Returns:
232 _type_: _description_
233 """
234 output = inputs
235 if len(output.shape) == 3:
236 output = output[:, :, :, None]
238 if not (self.use_conv2D):
239 output = K.reshape(
240 output, (-1, 1, self.size_window, self.dim_d * self.dim_chan)
241 )
243 # if (self.mutliscale):
244 # list_output = []
246 for n, layer in enumerate(self.layers):
247 if n == 2: # dropout layers & end of block
248 output = layer(output, training=self.flag_mc)
249 # if (self.mutliscale):
250 # TO do : find how affect multiscale window to the good final step
251 # list_output.append(output)
253 else:
254 output = layer(output)
255 if self.use_conv2D:
256 output = output[:, :, 0, :]
257 else:
258 output = output[:, 0, :, :]
260 return output
262 def get_config(self):
263 dict_config = {}
264 dict_config["size_window"] = self.size_window
265 dict_config["n_windows"] = self.n_windows
266 dict_config["step"] = self.step
267 dict_config["dim_d"] = self.dim_d
268 dict_config["dim_chan"] = self.dim_chan
269 dict_config["list_strides"] = self.list_strides
270 dict_config["list_kernels"] = self.list_kernels
271 dict_config["list_filters"] = self.list_filters
272 dict_config["flag_mc"] = self.flag_mc
273 dict_config["use_conv2D"] = self.use_conv2D
274 dict_config["seed"] = self.seed
276 dict_config["layers"] = []
277 for layer in self.layers:
278 dict_config["layers"].append(tf.keras.utils.serialize_keras_object(layer))
279 return dict_config
281 @classmethod
282 def from_config(cls, config):
283 layers_config = config.pop("layers")
284 layers = []
285 for layer_config in layers_config:
286 layer = tf.keras.utils.deserialize_keras_object(layer_config)
287 layers.append(layer)
289 obj = cls(**config)
290 obj.layers = layers
291 return obj
294class Data_embedding_TS(layers.Layer):
295 def __init__(
296 self,
297 size_window,
298 n_windows,
299 step=1,
300 dim_d=1,
301 dim_chan=1,
302 dim_out=1,
303 flag_mc=False,
304 seed=None,
305 ):
306 super(Data_embedding_TS, self).__init__()
307 self.n_windows = n_windows
308 self.size_window = size_window
309 self.dim_d = dim_d
310 self.seed = seed
311 set_global_determinism(self.seed)
313 if True:
315 self.MWE = Mouving_Windows_Embedding(
316 size_window,
317 n_windows,
318 step=step,
319 dim_d=dim_d,
320 seed=seed,
321 )
322 else:
324 self.MWE = Mouving_conv_Embedding(
325 size_window,
326 n_windows,
327 step=step,
328 dim_d=dim_d,
329 dim_chan=dim_chan,
330 conv2D=True,
331 list_filters=None,
332 list_strides=[2, 1],
333 list_kernels=None,
334 dp=0.05,
335 flag_mc=flag_mc,
336 seed=seed,
337 )
339 self.FTE = Factice_Time_Extension(dim_out)
341 def call(self, Z_enc, Y_past, extension):
342 """_summary_
344 Args:
345 Z_enc (_type_): _description_
346 Y_past (_type_): _description_
347 Y_futur (_type_): _description_
348 mode (str, optional): _description_. Defaults to "".
350 Returns:
351 _type_: _description_
352 """
354 MWE_past = self.MWE(Y_past)
355 Z_enc = K.concatenate([Z_enc, MWE_past], axis=-1)
356 Z_enc = self.FTE(Z_enc)
357 return Z_enc
360@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
361class Factice_Time_Extension(layers.Layer):
362 def __init__(self, dim_out, **kwargs):
363 super().__init__(**kwargs)
364 self.dim_out = dim_out
366 def call(self, inputs, **kwargs):
367 """_summary_
369 Args:
370 inputs (_type_): _description_
372 Returns:
373 _type_: _description_
374 """
375 last_input = inputs[:, -1, :]
376 last_input_duplicated = K.repeat_elements(
377 last_input[:, None, :], self.dim_out, 1
378 )
379 inputs_augmented = K.concatenate([inputs, last_input_duplicated], axis=1)
380 return inputs_augmented
382 def get_config(self):
383 dict_config = {}
384 dict_config["dim_out"] = self.dim_out
385 return dict_config
388# Define the way for positional encoding
391class PositionalEmbedding(layers.Layer):
392 def __init__(self, d_model, max_len=40, seed=None):
393 super(PositionalEmbedding, self).__init__()
394 self.seed = seed
395 # Compute the positional encodings once in log space.
396 pe = np.zeros((max_len, d_model), dtype=np.float32)
398 position = np.expand_dims(np.arange(0, max_len, dtype=np.float32), 1)
399 div_term = np.exp(
400 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model)
401 )
403 pe[:, 0::2] = np.sin(position * div_term)
404 pe[:, 1::2] = np.cos(position * div_term)
406 self.pe = tf.expand_dims(tf.convert_to_tensor(pe), 0)
408 def call(self, inputs, **kwargs):
409 """_summary_
411 Args:
412 inputs (_type_): _description_
414 Returns:
415 _type_: _description_
416 """
417 return self.pe
420class ValuesEmbedding(layers.Layer):
421 def __init__(self, d_model, seed=None):
422 super(ValuesEmbedding, self).__init__()
423 self.seed = seed
424 self.tokenConv = tf.keras.layers.Conv1D(
425 filters=d_model, kernel_size=3, padding="causal", activation="linear"
426 )
427 self.activation = LeakyReLU()
429 def call(self, inputs, **kwargs):
430 """_summary_
432 Args:
433 inputs (_type_): _description_
435 Returns:
436 _type_: _description_
437 """
438 x = self.tokenConv(inputs[:, :, 1:])
439 x = self.activation(x)
440 return x
443class FixedEmbedding(layers.Layer):
444 def __init__(self, c_in, d_model, seed=None):
445 super(FixedEmbedding, self).__init__()
446 self.seed = seed
447 w = np.zeros((c_in, d_model), dtype=np.float32)
449 position = np.expand_dims(np.arange(0, c_in, dtype=np.float32), 1)
450 div_term = np.exp(
451 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model)
452 )
454 w[:, 0::2] = np.sin(position * div_term)
455 w[:, 1::2] = np.cos(position * div_term)
457 w = tf.convert_to_tensor(w)
458 tf.stop_gradient(w)
459 w = tf.keras.initializers.Constant(w)
460 self.emb = tf.keras.layers.Embedding(c_in, d_model, embeddings_initializer=w)
462 def call(self, inputs, **kargs):
463 """_summary_
465 Args:
466 inputs (_type_): _description_
468 Returns:
469 _type_: _description_
470 """
471 embedding = self.emb(inputs)
472 return embedding
475class TemporalEmbedding(layers.Layer):
476 def __init__(self, d_model, seq_len, seed=None):
477 super(TemporalEmbedding, self).__init__()
478 self.time_embed = FixedEmbedding(seq_len, d_model)
479 self.seed = seed
480 # self.minute_embed = FixedEmbedding(60, d_model)
481 # self.hour_embed = FixedEmbedding(24, d_model)
482 # self.weekday_embed = FixedEmbedding(7, d_model)
483 # self.day_embed = FixedEmbedding(32, d_model)
484 # self.month_embed = FixedEmbedding(13, d_model)
486 def call(self, inputs, **kargs):
487 """_summary_
489 Args:
490 inputs (_type_): _description_
492 Returns:
493 _type_: _description_
494 """
495 # x = x.long()
496 return self.time_embed(inputs[:, :, 0])
499class DataEmbedding_ITS(layers.Layer):
500 def __init__(self, d_model, dropout=0.1, seq_len=96, seed=None):
501 super(DataEmbedding_ITS, self).__init__()
502 self.seq_len = seq_len
503 self.value_embedding = ValuesEmbedding(d_model=d_model)
504 self.position_embedding = PositionalEmbedding(d_model=d_model)
505 self.temporal_embedding = TemporalEmbedding(d_model=d_model, seq_len=seq_len)
506 self.ctx_embedding = tf.keras.layers.Dense(d_model)
507 self.dropout = tf.keras.layers.Dropout(dropout, seed=seed)
508 self.seed = None
510 def call(self, inputs, x_mark=None, **kwargs):
511 """_summary_
513 Args:
514 inputs (_type_): _description_
515 x_mark (_type_, optional): _description_. Defaults to None.
517 Returns:
518 _type_: _description_
519 """
520 x = (
521 self.value_embedding(inputs)
522 + self.position_embedding(inputs)
523 + self.temporal_embedding(inputs)
524 )
526 return self.dropout(x)