Coverage for uqmodels/modelization/DL_estimator/data_embedding.py: 50%
223 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import math
3import numpy as np
4import tensorflow as tf
5import tensorflow.keras.backend as K
6from keras.layers import LeakyReLU
7from tensorflow.keras import layers
9from uqmodels.modelization.DL_estimator.utils import (
10 find_conv_kernel,
11 set_global_determinism,
12)
14from ...utils import add_random_state
16# tf.keras.utils.get_custom_objects().clear()
19# Restructuration de code : Functionnalité de layers-preprocessing pour NN (Transformer ?)
22class Mouving_Windows_Embedding(layers.Layer):
23 def __init__(
24 self, size_window, n_windows=1, step=1, dim_d=1, dim_chan=1, seed=None, **kwargs
25 ):
26 """_summary_
28 Args:
29 sub_seq_size (_type_): _description_
30 size_window (_type_): _description_
31 dim_out (int, optional): _description_. Defaults to 1.
32 padding (int, optional): _description_. Defaults to 1.
33 seed (bool): handle experimental random using seed.
34 """
35 super(Mouving_Windows_Embedding, self).__init__()
36 self.size_window = size_window
37 self.n_windows = n_windows
38 self.step = step
39 self.dim_d = dim_d
40 self.dim_chan = dim_chan
41 self.seed = seed
42 self.last_shape = self.size_window * self.dim_d * self.dim_chan
44 def call(self, inputs, mode="encoder"):
45 """_summary_
47 Args:
48 inputs (_type_): _description_
49 mode (str, optional): _description_. Defaults to "encoder".
51 Returns:
52 _type_: _description_
53 """
54 output = inputs
55 if len(output.shape) == 3:
56 output = output[:, :, :, None]
58 slide_tensor = []
59 for i in range(self.n_windows):
60 slide_tensor.append(
61 inputs[:, (i * self.step) : (i * self.step) + self.size_window]
62 )
63 MWE_raw = K.stack(slide_tensor, axis=1)
64 MWE = K.reshape(
65 MWE_raw, (-1, self.n_windows, self.size_window * self.dim_d * self.dim_chan)
66 )
68 return MWE
71class Mouving_Window_Embedding(layers.Layer):
72 def __init__(self, sub_seq_size, size_window, dim_out=1, padding=1, seed=None):
73 super(Mouving_Window_Embedding, self).__init__()
74 self.sub_seq_size = sub_seq_size
75 self.dim_out = dim_out
76 self.size_window = size_window
77 self.padding = padding
78 self.seed = seed
80 def call(self, inputs, mode="encoder"):
81 """_summary_
83 Args:
84 inputs (_type_): _description_
85 mode (str, optional): _description_. Defaults to "encoder".
87 Returns:
88 _type_: _description_
89 """
90 if mode == "encoder":
91 size_output = self.size_window
92 elif mode == "decoder":
93 size_output = self.size_window + self.dim_out - 1
94 else:
95 print("Mode error")
96 if self.sub_seq_size == 1:
97 return inputs
98 else:
99 slide_tensor = []
100 for i in range(size_output):
101 slide_tensor.append(
102 inputs[
103 :, (i * self.padding) : (i * self.padding) + self.sub_seq_size
104 ]
105 )
106 MWE_raw = K.stack(slide_tensor, axis=1)
107 MWE = K.reshape(
108 MWE_raw, (-1, size_output, self.sub_seq_size * inputs.shape[-1])
109 )
110 return MWE
113@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
114class Conv2D(tf.keras.layers.Conv2D):
115 pass
118@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
119class Dropout(tf.keras.layers.Dropout):
120 pass
123@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
124class Conv1D(tf.keras.layers.Conv1D):
125 pass
128@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
129class Mouving_conv_Embedding(layers.Layer):
130 def __init__(
131 self,
132 size_window,
133 n_windows,
134 step=1,
135 dim_d=1,
136 dim_chan=1,
137 use_conv2D=True,
138 list_filters=None,
139 list_strides=[2, 1, 1],
140 list_kernels=None,
141 dp=0.01,
142 flag_mc=False,
143 seed=None,
144 **kwargs
145 ):
146 """_summary_
148 Args:
149 sub_seq_size (_type_): _description_
150 size_window (_type_): _description_
151 dim_out (int, optional): _description_. Defaults to 1.
152 padding (int, optional): _description_. Defaults to 1.
153 seed (bool): handle experimental random using seed.
154 """
156 self.size_window = size_window
157 self.n_windows = n_windows
158 self.step = step
159 self.dim_d = dim_d
160 self.dim_chan = dim_chan
161 self.use_conv2D = use_conv2D
162 self.flag_mc = flag_mc
163 self.seed = seed
164 self.mutliscale = False
165 set_global_determinism(self.seed)
167 if list_filters is None:
168 list_filters = [64 for i in list_strides]
170 if list_kernels is None:
171 list_kernels, list_strides = find_conv_kernel(
172 self.size_window, n_windows, list_strides
173 )
174 list_filters.append(list_filters[-1])
176 super().__init__(**kwargs)
178 self.list_strides = list_strides
179 self.list_kernels = list_kernels
180 self.list_filters = list_filters
182 self.layers = []
183 for n, (filters, kernel, strides) in enumerate(
184 zip(list_filters, list_kernels, list_strides)
185 ):
186 if use_conv2D:
187 if n == 0:
188 kernel = (kernel, dim_d)
189 else:
190 kernel = (kernel, 1)
191 self.layers.append(
192 Conv2D(
193 filters,
194 kernel,
195 strides=strides,
196 padding="valid",
197 activation="relu",
198 )
199 )
200 if dp > 0:
201 self.layers.append(Dropout(dp, seed=add_random_state(seed, n)))
203 else:
204 self.layers.append(
205 Conv1D(
206 filters * dim_chan * dim_d,
207 kernel,
208 strides=strides,
209 padding="valid",
210 groups=dim_chan * dim_d,
211 activation="relu",
212 )
213 )
215 # self.layers.append(tf.keras.layers.BatchNormalization())
216 # self.layers.append(tf.keras.layers.Dropout(dp))
218 if use_conv2D:
219 self.last_shape = list_filters[-1]
220 else:
221 self.last_shape = list_filters[-1] * self.dim_d * self.dim_chan
223 def call(self, inputs):
224 """_summary_
226 Args:
227 inputs (_type_): _description_
228 mode (str, optional): _description_. Defaults to "encoder".
230 Returns:
231 _type_: _description_
232 """
233 output = inputs
234 if len(output.shape) == 3:
235 output = output[:, :, :, None]
237 if not (self.use_conv2D):
238 output = K.reshape(
239 output, (-1, 1, self.size_window, self.dim_d * self.dim_chan)
240 )
242 # if (self.mutliscale):
243 # list_output = []
245 for n, layer in enumerate(self.layers):
246 if n == 2: # dropout layers & end of block
247 output = layer(output, training=self.flag_mc)
248 # if (self.mutliscale):
249 # TO do : find how affect multiscale window to the good final step
250 # list_output.append(output)
252 else:
253 output = layer(output)
254 if self.use_conv2D:
255 output = output[:, :, 0, :]
256 else:
257 output = output[:, 0, :, :]
259 return output
261 def get_config(self):
262 dict_config = {}
263 dict_config["size_window"] = self.size_window
264 dict_config["n_windows"] = self.n_windows
265 dict_config["step"] = self.step
266 dict_config["dim_d"] = self.dim_d
267 dict_config["dim_chan"] = self.dim_chan
268 dict_config["list_strides"] = self.list_strides
269 dict_config["list_kernels"] = self.list_kernels
270 dict_config["list_filters"] = self.list_filters
271 dict_config["flag_mc"] = self.flag_mc
272 dict_config["use_conv2D"] = self.use_conv2D
273 dict_config["seed"] = self.seed
275 dict_config["layers"] = []
276 for layer in self.layers:
277 dict_config["layers"].append(tf.keras.utils.serialize_keras_object(layer))
278 return dict_config
280 @classmethod
281 def from_config(cls, config):
282 layers_config = config.pop("layers")
283 layers = []
284 for layer_config in layers_config:
285 layer = tf.keras.utils.deserialize_keras_object(layer_config)
286 layers.append(layer)
288 obj = cls(**config)
289 obj.layers = layers
290 return obj
293class Data_embedding_TS(layers.Layer):
294 def __init__(
295 self,
296 size_window,
297 n_windows,
298 step=1,
299 dim_d=1,
300 dim_chan=1,
301 dim_out=1,
302 flag_mc=False,
303 seed=None,
304 ):
305 super(Data_embedding_TS, self).__init__()
306 self.n_windows = n_windows
307 self.size_window = size_window
308 self.dim_d = dim_d
309 self.seed = seed
310 set_global_determinism(self.seed)
312 if True:
314 self.MWE = Mouving_Windows_Embedding(
315 size_window,
316 n_windows,
317 step=step,
318 dim_d=dim_d,
319 seed=seed,
320 )
321 else:
323 self.MWE = Mouving_conv_Embedding(
324 size_window,
325 n_windows,
326 step=step,
327 dim_d=dim_d,
328 dim_chan=dim_chan,
329 conv2D=True,
330 list_filters=None,
331 list_strides=[2, 1],
332 list_kernels=None,
333 dp=0.05,
334 flag_mc=flag_mc,
335 seed=seed,
336 )
338 self.FTE = Factice_Time_Extension(dim_out)
340 def call(self, Z_enc, Y_past, extension):
341 """_summary_
343 Args:
344 Z_enc (_type_): _description_
345 Y_past (_type_): _description_
346 Y_futur (_type_): _description_
347 mode (str, optional): _description_. Defaults to "".
349 Returns:
350 _type_: _description_
351 """
353 MWE_past = self.MWE(Y_past)
354 Z_enc = K.concatenate([Z_enc, MWE_past], axis=-1)
355 Z_enc = self.FTE(Z_enc)
356 return Z_enc
359@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding")
360class Factice_Time_Extension(layers.Layer):
361 def __init__(self, dim_out, **kwargs):
362 super().__init__(**kwargs)
363 self.dim_out = dim_out
365 def call(self, inputs, **kwargs):
366 """_summary_
368 Args:
369 inputs (_type_): _description_
371 Returns:
372 _type_: _description_
373 """
374 last_input = inputs[:, -1, :]
375 last_input_duplicated = K.repeat_elements(
376 last_input[:, None, :], self.dim_out, 1
377 )
378 inputs_augmented = K.concatenate([inputs, last_input_duplicated], axis=1)
379 return inputs_augmented
381 def get_config(self):
382 dict_config = {}
383 dict_config["dim_out"] = self.dim_out
384 return dict_config
387# Define the way for positional encoding
390class PositionalEmbedding(layers.Layer):
391 def __init__(self, d_model, max_len=40, seed=None):
392 super(PositionalEmbedding, self).__init__()
393 self.seed = seed
394 # Compute the positional encodings once in log space.
395 pe = np.zeros((max_len, d_model), dtype=np.float32)
397 position = np.expand_dims(np.arange(0, max_len, dtype=np.float32), 1)
398 div_term = np.exp(
399 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model)
400 )
402 pe[:, 0::2] = np.sin(position * div_term)
403 pe[:, 1::2] = np.cos(position * div_term)
405 self.pe = tf.expand_dims(tf.convert_to_tensor(pe), 0)
407 def call(self, inputs, **kwargs):
408 """_summary_
410 Args:
411 inputs (_type_): _description_
413 Returns:
414 _type_: _description_
415 """
416 return self.pe
419class ValuesEmbedding(layers.Layer):
420 def __init__(self, d_model, seed=None):
421 super(ValuesEmbedding, self).__init__()
422 self.seed = seed
423 self.tokenConv = tf.keras.layers.Conv1D(
424 filters=d_model, kernel_size=3, padding="causal", activation="linear"
425 )
426 self.activation = LeakyReLU()
428 def call(self, inputs, **kwargs):
429 """_summary_
431 Args:
432 inputs (_type_): _description_
434 Returns:
435 _type_: _description_
436 """
437 x = self.tokenConv(inputs[:, :, 1:])
438 x = self.activation(x)
439 return x
442class FixedEmbedding(layers.Layer):
443 def __init__(self, c_in, d_model, seed=None):
444 super(FixedEmbedding, self).__init__()
445 self.seed = seed
446 w = np.zeros((c_in, d_model), dtype=np.float32)
448 position = np.expand_dims(np.arange(0, c_in, dtype=np.float32), 1)
449 div_term = np.exp(
450 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model)
451 )
453 w[:, 0::2] = np.sin(position * div_term)
454 w[:, 1::2] = np.cos(position * div_term)
456 w = tf.convert_to_tensor(w)
457 tf.stop_gradient(w)
458 w = tf.keras.initializers.Constant(w)
459 self.emb = tf.keras.layers.Embedding(c_in, d_model, embeddings_initializer=w)
461 def call(self, inputs, **kargs):
462 """_summary_
464 Args:
465 inputs (_type_): _description_
467 Returns:
468 _type_: _description_
469 """
470 embedding = self.emb(inputs)
471 return embedding
474class TemporalEmbedding(layers.Layer):
475 def __init__(self, d_model, seq_len, seed=None):
476 super(TemporalEmbedding, self).__init__()
477 self.time_embed = FixedEmbedding(seq_len, d_model)
478 self.seed = seed
479 # self.minute_embed = FixedEmbedding(60, d_model)
480 # self.hour_embed = FixedEmbedding(24, d_model)
481 # self.weekday_embed = FixedEmbedding(7, d_model)
482 # self.day_embed = FixedEmbedding(32, d_model)
483 # self.month_embed = FixedEmbedding(13, d_model)
485 def call(self, inputs, **kargs):
486 """_summary_
488 Args:
489 inputs (_type_): _description_
491 Returns:
492 _type_: _description_
493 """
494 # x = x.long()
495 return self.time_embed(inputs[:, :, 0])
498class DataEmbedding_ITS(layers.Layer):
499 def __init__(self, d_model, dropout=0.1, seq_len=96, seed=None):
500 super(DataEmbedding_ITS, self).__init__()
501 self.seq_len = seq_len
502 self.value_embedding = ValuesEmbedding(d_model=d_model)
503 self.position_embedding = PositionalEmbedding(d_model=d_model)
504 self.temporal_embedding = TemporalEmbedding(d_model=d_model, seq_len=seq_len)
505 self.ctx_embedding = tf.keras.layers.Dense(d_model)
506 self.dropout = tf.keras.layers.Dropout(dropout, seed=seed)
507 self.seed = None
509 def call(self, inputs, x_mark=None, **kwargs):
510 """_summary_
512 Args:
513 inputs (_type_): _description_
514 x_mark (_type_, optional): _description_. Defaults to None.
516 Returns:
517 _type_: _description_
518 """
519 x = (
520 self.value_embedding(inputs)
521 + self.position_embedding(inputs)
522 + self.temporal_embedding(inputs)
523 )
525 return self.dropout(x)