Coverage for uqmodels / modelization / DL_estimator / lstm_ed.py: 91%
109 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 08:15 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 08:15 +0000
2import tensorflow as tf
3import tensorflow.keras.backend as K
4from tensorflow.keras import layers
6from uqmodels.modelization.DL_estimator.data_embedding import (
7 Factice_Time_Extension,
8 Mouving_conv_Embedding,
9 Mouving_Windows_Embedding,
10)
11from uqmodels.modelization.DL_estimator.neural_network_UQ import (
12 NN_UQ
13)
14from uqmodels.modelization.DL_estimator.metalayers import mlp
15from uqmodels.modelization.DL_estimator.utils import set_global_determinism
16from uqmodels.modelization.DL_estimator.data_generator import Folder_Generator
17from uqmodels.utils import add_random_state, stack_and_roll
19# Basic memory module
22def build_lstm_stacked(
23 size_window=20,
24 n_windows=5,
25 step=1,
26 dim_target=3,
27 dim_chan=1,
28 dim_horizon=5,
29 dim_ctx=18,
30 dim_z=200,
31 dp=0.05,
32 dp_rec=0.03,
33 type_output=None,
34 num_lstm_enc=1,
35 num_lstm_dec=1,
36 k_reg=(10e-6, 10e-6),
37 layers_enc=[75, 150, 75],
38 layers_dec=[150, 75],
39 list_strides=[2, 1],
40 list_kernels=None,
41 list_filters=None,
42 with_ctx_input=True,
43 with_convolution=True,
44 dim_dyn=None,
45 random_state=None,
46 **kwarg
47):
48 """Builder for LSTM ED UQ with convolutif preprocessing for lag values
50 Args:
51 size_window (int, optional): Size of window for lag values. Defaults to 10.
52 n_windows (int, optional): Number of window in past. Defaults to 5.
53 step (int, optional): step between windows. Defaults to 1.
54 dim_target (int, optional): dimension of TS. Defaults to 1.
55 dim_chan (int, optional): Number of channel of TS. Defaults to 1.
56 dim_horizon (int, optional): futur_horizon to predict. Defaults to 3.
57 dim_ctx (int, optional): Number of ctx_features. Defaults to 20.
58 dim_z (int, optional): Size of latent sapce. Defaults to 100.
59 layers_enc (list, optional):size of MLP preprocessing
60 (after concatenation of past values embeding + ctx) Defaults to [150].
61 layers_dec (list, optional): size of MLP interpretor. Defaults to 2.
62 dp (float, optional): dropout. Defaults to 0.05.
63 dp_rec (float, optional): transformer dropout. Defaults to 0.1.
64 k_reg (tuple, optional): _description_. Defaults to (0.00001, 0.00001).
65 with_positional_embedding (bool, optional): _description_. Defaults to False.
66 with_ctx_input (bool, optional): Expect ctx features in addition to lag. Defaults to True.
67 with_convolution (bool, optional): use convolution rather than
68 whole lag values in the windows. Defaults to True.
69 type_output (_type_, optional): mode of UQ (see NN_UQ). Defaults to None.
70 random_state (bool): handle experimental random using seed.
73 Returns:
74 transformer : multi-step forecaster with UQ
75 """
76 set_global_determinism(random_state)
78 if dim_dyn is None:
79 dim_dyn = dim_target
81 residuals_link = False
82 flag_mc = 0
83 if type_output in ["BNN", "MC_Dropout"]:
84 flag_mc = 1
86 # Embedding_interpretor
87 Interpretor = mlp(
88 dim_in=dim_z,
89 dim_out=dim_target,
90 layers_size=layers_dec,
91 dp=dp,
92 type_output=type_output,
93 name="Interpretor",
94 random_state=random_state,
95 )
97 # Encoder LSTM
98 Encoders = []
99 for i in range(num_lstm_enc):
100 Encoders.append(
101 layers.LSTM(
102 dim_z,
103 name="LSTM_enc_" + str(i),
104 return_sequences=True,
105 return_state=True,
106 activation="sigmoid",
107 recurrent_dropout=dp_rec,
108 dropout=dp,
109 kernel_regularizer=tf.keras.regularizers.l1_l2(
110 l1=k_reg[0],
111 l2=k_reg[1],
112 ),
113 seed=add_random_state(random_state, 100 + i),
114 )
115 )
117 # Encoder decoder LSTM
118 Decoders = []
119 for i in range(num_lstm_dec):
120 Decoders.append(
121 layers.LSTM(
122 dim_z,
123 name="LSTM_dec_" + str(i),
124 return_sequences=True,
125 stateful=False,
126 return_state=True,
127 activation="sigmoid",
128 recurrent_dropout=dp_rec,
129 dropout=dp,
130 kernel_regularizer=tf.keras.regularizers.l1_l2(
131 l1=k_reg[0], l2=k_reg[1]
132 ),
133 seed=add_random_state(random_state, 200 + i),
134 )
135 )
137 outputs = []
139 # Input definition.
141 list_inputs = []
143 if with_ctx_input:
144 CTX_inputs = layers.Input(shape=(n_windows, dim_ctx), name="LT")
145 list_inputs.append(CTX_inputs)
147 Y_past = layers.Input(shape=(size_window, dim_dyn), name="ST")
148 list_inputs.append(Y_past)
150 # Preprocessing layers :
151 if with_convolution:
152 MWE = Mouving_conv_Embedding(
153 size_window,
154 n_windows,
155 step=step,
156 dim_d=dim_dyn,
157 dim_chan=dim_chan,
158 use_conv2D=True,
159 list_strides=list_strides,
160 list_filters=list_filters,
161 list_kernels=list_kernels,
162 dp=0.05,
163 flag_mc=flag_mc,
164 seed=add_random_state(random_state, 300),
165 )
167 else:
168 MWE = Mouving_Windows_Embedding(
169 size_window, n_windows, step=step, dim_d=dim_dyn, dim_chan=dim_chan
170 )
172 FTE = Factice_Time_Extension(dim_horizon)
174 dim_embedding = MWE.last_shape
175 if with_ctx_input:
176 dim_embedding += dim_ctx
178 Embeddor_ctx = mlp(
179 dim_in=dim_embedding,
180 dim_out=None,
181 layers_size=layers_enc,
182 dp=dp,
183 name="Embeddor",
184 regularizer_W=k_reg,
185 flag_mc=flag_mc,
186 random_state=add_random_state(random_state, 500),
187 )
189 # Preprocessing computation
190 Data = MWE(Y_past)
191 # Concat with cat features
192 print(type(CTX_inputs), type(Data))
193 if with_ctx_input:
194 Data = layers.Concatenate(axis=-1)([CTX_inputs, Data])
196 # Factice time augmentation (actually useless but can be usefull for extended predict horizon)
198 Embedding = layers.TimeDistributed(Embeddor_ctx)(Data)
200 # Encoder part
201 Z_enc = Embedding
202 state = None
203 for Encoder in Encoders:
204 Z_enc, H_enc, C_enc = Encoder(Embedding, initial_state=state)
205 state = H_enc, C_enc
207 if residuals_link:
208 Z_enc = Z_enc + Embedding
210 Z_enc = FTE(Z_enc)
212 # Lattent embedding of each state (Z_t) and last current memory state (H et C)
213 Z_enc = layers.Dropout(dp, seed=add_random_state(random_state, 501))(
214 Z_enc, training=flag_mc
215 )
216 H_enc = layers.Dropout(dp, seed=add_random_state(random_state, 502))(
217 H_enc, training=flag_mc
218 )
219 C_enc = layers.Dropout(dp, seed=add_random_state(random_state, 503))(
220 C_enc, training=flag_mc
221 )
222 state = H_enc, C_enc
224 # Decoder part : Training inference without loop :
225 for Decoder in Decoders:
226 Z_dec, H_dec, C_dec = Decoder(Z_enc, initial_state=state)
227 state = H_dec, C_dec
229 if residuals_link:
230 Z_dec = Z_dec + Z_enc
232 outputs_training = layers.TimeDistributed(Interpretor)(
233 layers.Dropout(dp)(Z_dec[:, -dim_horizon:, :], training=flag_mc)
234 )
236 # Inference loop
237 # For i = 0 Z_enc tensor [Batch,size_window,dim_z] -> Stacked LSTM
238 # Else Z_enc tensor [Batch,1,dim_z]
239 if False:
240 Z_enc_inference = Z_enc[:, :-dim_horizon]
241 for i in range(dim_horizon):
242 Z_dec, H_dec, C_dec = Decoder(Z_enc_inference)
243 if residuals_link:
244 Z_dec = Z_dec + Z_enc_inference
246 output = Interpretor(layers.Dropout(dp)(Z_dec[:, -1, :], training=flag_mc))
247 outputs.append(output)
249 # if i != (dim_horizon) - 1:
250 # Data = Data_embedding(
251 # inputs_lt, Y_past, outputs, "encoder")
252 # Embedding = layers.TimeDistributed(Embeddor_ctx)(Data)
253 # Z_enc_inference, H_enc, C_enc = Encoder(Embedding)
254 # if residuals_link:
255 # Z_enc_inference = Z_enc_inference + Embedding
257 outputs = layers.Lambda(lambda x: K.stack(x, axis=1))(outputs)
259 if False: # Autoreg
260 pass
261 # list_input = [inputs_lt, Y_past, Y_futur]
262 # if not (loop_learning):
263 # tf.keras.Model(list_input, outputs_training)
264 # model = tf.keras.Model(list_input, outputs)
265 else:
266 model = tf.keras.Model(list_inputs, outputs_training)
267 return model
270class Lstm_ED_UQ(NN_UQ):
271 def __init__(
272 self,
273 model_parameters,
274 factory_parameters=dict(),
275 training_parameters=dict(),
276 type_output=None,
277 rescale=False,
278 n_ech=5,
279 train_ratio=0.9,
280 name="",
281 random_state=None,
282 ):
283 """LSTM_ED : Neural network with UQ using NN_UQ wrapper
285 Args:
286 model_parameters (_type_): _description_
287 factory_parameters (_type_, optional): _description_. Defaults to dict().
288 training_parameters (_type_, optional): _description_. Defaults to dict().
289 type_output (_type_, optional): _description_. Defaults to None.
290 rescale (bool, optional): _description_. Defaults to False.
291 n_ech (int, optional): _description_. Defaults to 8.
292 train_ratio (float, optional): _description_. Defaults to 0.9.
293 name (str, optional): _description_. Defaults to "Lstm_stacked".
294 random_state (bool): handle experimental random using seed.
295 """
296 if (random_state) is not None:
297 print("Warning : issues non-deterministic behaviour even with random state")
299 super().__init__(
300 model_initializer=build_lstm_stacked,
301 model_parameters=model_parameters,
302 factory_parameters=factory_parameters,
303 training_parameters=training_parameters,
304 type_output=type_output,
305 rescale=rescale,
306 n_ech=n_ech,
307 train_ratio=train_ratio,
308 name=name,
309 random_state=random_state,
310 )
312 def factory(self, X, y, mask=None, only_fit_scaler=False, **kwarg):
313 model_params = self.model_parameters
314 factory_params = self.factory_parameters
316 with_ctx_input = model_params["with_ctx_input"]
318 step = 1
319 if "step" in model_params.keys():
320 step = model_params["step"]
322 X_none = False
323 if X is None:
324 X_none = True
326 if X_none:
327 inputs = None
328 else:
329 if with_ctx_input:
330 X, X_lag = X
331 X, X_lag, mask = super().factory(X, X_lag, mask)
332 if only_fit_scaler:
333 return None
334 X_lt = stack_and_roll(
335 X,
336 model_params["n_windows"],
337 lag=factory_params["factory_lag_lt"],
338 step=step,
339 )
341 X_st = stack_and_roll(
342 X_lag,
343 model_params["size_window"],
344 lag=factory_params["factory_lag_st"] - 1,
345 step=step,
346 )
348 inputs = [X_lt, X_st]
349 else:
350 X, _, mask = super().factory(X, None, mask)
351 if only_fit_scaler:
352 return None
353 X_lag = X
354 X_st = stack_and_roll(
355 X,
356 model_params["size_window"],
357 lag=factory_params["factory_lag_st"] - 1,
358 step=step,
359 )
360 inputs = [X_st]
362 new_y = None
363 if y is not None:
364 _, new_y, mask = super().factory(None, y, mask)
365 new_y = stack_and_roll(
366 y,
367 model_params["dim_horizon"],
368 lag=model_params["dim_horizon"] - 1,
369 step=step,
370 )
371 # Cast to tuple :
372 if (type(inputs) is list):
373 inputs = tuple(inputs)
375 return inputs, new_y, mask
377 def Build_generator(self, X, y, batch=32, shuffle=True, train=True):
378 return Folder_Generator(
379 X,
380 y,
381 self,
382 batch=batch,
383 shuffle=shuffle,
384 train=train,
385 random_state=self.random_state,
386 )
389def get_params_dict(
390 dim_ctx,
391 dim_dyn,
392 dim_target,
393 size_window=20,
394 n_windows=5,
395 dim_horizon=5,
396 step=1,
397 dim_chan=1,
398 dp=0.05,
399 dp_rec=0.05,
400 dim_z=50,
401 k_reg=(10e-7, 10e-7),
402 num_lstm_enc=1,
403 num_lstm_dec=1,
404 layers_enc=[150, 75],
405 layers_dec=[200, 125, 75],
406 list_strides=[2, 1, 1, 1],
407 list_filters=[128, 128, 128],
408 list_kernels=None,
409 with_convolution=True,
410 with_ctx_input=True,
411 n_ech=3,
412 type_output="MC_Dropout",
413):
414 dict_params = {
415 "dim_ctx": dim_ctx,
416 "dim_dyn": dim_dyn,
417 "dim_target": dim_target,
418 "dim_chan": dim_chan,
419 "size_window": size_window,
420 "n_windows": n_windows,
421 "dim_horizon": dim_horizon,
422 "type_output": type_output,
423 "num_lstm_enc": num_lstm_enc,
424 "num_lstm_dec": num_lstm_dec,
425 "step": step,
426 "dim_z": dim_z,
427 "dp": dp,
428 "dp_rec": dp_rec,
429 "k_reg": k_reg,
430 "layers_enc": layers_enc,
431 "layers_dec": layers_dec,
432 "list_strides": list_strides,
433 "list_kernels": list_kernels,
434 "list_filters": list_filters,
435 "with_convolution": with_convolution,
436 "n_ech": n_ech,
437 "with_ctx_input": with_ctx_input,
438 }
439 return dict_params