Coverage for uqmodels/modelization/DL_estimator/lstm_ed.py: 90%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import numpy as np
2import tensorflow as tf
3import tensorflow.keras.backend as K
4from keras.layers import LSTM, Dropout, Input, Lambda, TimeDistributed
6from uqmodels.modelization.DL_estimator.data_embedding import (
7 Factice_Time_Extension,
8 Mouving_conv_Embedding,
9 Mouving_Windows_Embedding,
10)
11from uqmodels.modelization.DL_estimator.neural_network_UQ import (
12 NN_UQ,
13 get_training_parameters,
14 get_UQEstimator_parameters,
15 mlp,
16)
17from uqmodels.modelization.DL_estimator.utils import (
18 Folder_Generator,
19 set_global_determinism,
20)
21from uqmodels.utils import add_random_state, apply_mask, stack_and_roll
23# Basic memory module
26def build_lstm_stacked(
27 size_window=20,
28 n_windows=5,
29 step=1,
30 dim_target=3,
31 dim_chan=1,
32 dim_horizon=5,
33 dim_ctx=18,
34 dim_z=200,
35 dp=0.05,
36 dp_rec=0.03,
37 type_output=None,
38 num_lstm_enc=1,
39 num_lstm_dec=1,
40 k_reg=(10e-6, 10e-6),
41 layers_enc=[75, 150, 75],
42 layers_dec=[150, 75],
43 list_strides=[2, 1],
44 list_kernels=None,
45 list_filters=None,
46 with_ctx_input=True,
47 with_convolution=True,
48 dim_dyn=None,
49 random_state=None,
50 **kwarg
51):
52 """Builder for LSTM ED UQ with convolutif preprocessing for lag values
54 Args:
55 size_window (int, optional): Size of window for lag values. Defaults to 10.
56 n_windows (int, optional): Number of window in past. Defaults to 5.
57 step (int, optional): step between windows. Defaults to 1.
58 dim_target (int, optional): dimension of TS. Defaults to 1.
59 dim_chan (int, optional): Number of channel of TS. Defaults to 1.
60 dim_horizon (int, optional): futur_horizon to predict. Defaults to 3.
61 dim_ctx (int, optional): Number of ctx_features. Defaults to 20.
62 dim_z (int, optional): Size of latent sapce. Defaults to 100.
63 layers_enc (list, optional):size of MLP preprocessing
64 (after concatenation of past values embeding + ctx) Defaults to [150].
65 layers_dec (list, optional): size of MLP interpretor. Defaults to 2.
66 dp (float, optional): dropout. Defaults to 0.05.
67 dp_rec (float, optional): transformer dropout. Defaults to 0.1.
68 k_reg (tuple, optional): _description_. Defaults to (0.00001, 0.00001).
69 with_positional_embedding (bool, optional): _description_. Defaults to False.
70 with_ctx_input (bool, optional): Expect ctx features in addition to lag. Defaults to True.
71 with_convolution (bool, optional): use convolution rather than
72 whole lag values in the windows. Defaults to True.
73 type_output (_type_, optional): mode of UQ (see NN_UQ). Defaults to None.
74 random_state (bool): handle experimental random using seed.
77 Returns:
78 transformer : multi-step forecaster with UQ
79 """
80 set_global_determinism(random_state)
82 if dim_dyn is None:
83 dim_dyn = dim_target
85 residuals_link = False
86 flag_mc = 0
87 if type_output in ["BNN", "MC_Dropout"]:
88 flag_mc = 1
90 # Embedding_interpretor
91 Interpretor = mlp(
92 dim_in=dim_z,
93 dim_out=dim_target,
94 layers_size=layers_dec,
95 dp=dp,
96 type_output=type_output,
97 name="Interpretor",
98 random_state=random_state,
99 )
101 # Encoder LSTM
102 Encoders = []
103 for i in range(num_lstm_enc):
104 Encoders.append(
105 LSTM(
106 dim_z,
107 name="LSTM_enc_" + str(i),
108 return_sequences=True,
109 return_state=True,
110 activation="sigmoid",
111 recurrent_dropout=dp_rec,
112 dropout=dp,
113 kernel_regularizer=tf.keras.regularizers.l1_l2(
114 l1=k_reg[0],
115 l2=k_reg[1],
116 ),
117 seed=add_random_state(random_state, 100 + i),
118 )
119 )
121 # Encoder decoder LSTM
122 Decoders = []
123 for i in range(num_lstm_dec):
124 Decoders.append(
125 LSTM(
126 dim_z,
127 name="LSTM_dec_" + str(i),
128 return_sequences=True,
129 stateful=False,
130 return_state=True,
131 activation="sigmoid",
132 recurrent_dropout=dp_rec,
133 dropout=dp,
134 kernel_regularizer=tf.keras.regularizers.l1_l2(
135 l1=k_reg[0], l2=k_reg[1]
136 ),
137 seed=add_random_state(random_state, 200 + i),
138 )
139 )
141 outputs = []
143 # Input definition.
145 list_inputs = []
147 if with_ctx_input:
148 CTX_inputs = Input(shape=(n_windows, dim_ctx), name="LT")
149 list_inputs.append(CTX_inputs)
151 Y_past = Input(shape=(size_window, dim_dyn), name="ST")
152 list_inputs.append(Y_past)
154 # Preprocessing layers :
155 if with_convolution:
156 MWE = Mouving_conv_Embedding(
157 size_window,
158 n_windows,
159 step=step,
160 dim_d=dim_dyn,
161 dim_chan=dim_chan,
162 use_conv2D=True,
163 list_strides=list_strides,
164 list_filters=list_filters,
165 list_kernels=list_kernels,
166 dp=0.05,
167 flag_mc=flag_mc,
168 seed=add_random_state(random_state, 300),
169 )
171 else:
172 MWE = Mouving_Windows_Embedding(
173 size_window, n_windows, step=step, dim_d=dim_dyn, dim_chan=dim_chan
174 )
176 FTE = Factice_Time_Extension(dim_horizon)
178 dim_embedding = MWE.last_shape
179 if with_ctx_input:
180 dim_embedding += dim_ctx
182 Embeddor_ctx = mlp(
183 dim_in=dim_embedding,
184 dim_out=None,
185 layers_size=layers_enc,
186 dp=dp,
187 name="Embeddor",
188 regularizer_W=k_reg,
189 flag_mc=flag_mc,
190 random_state=add_random_state(random_state, 500),
191 )
193 # Preprocessing computation
194 Data = MWE(Y_past)
195 # Concat with cat features
196 if with_ctx_input:
197 Data = K.concatenate([CTX_inputs, Data], axis=-1)
199 # Factice time augmentation (actually useless but can be usefull for extended predict horizon)
201 Embedding = TimeDistributed(Embeddor_ctx)(Data)
203 # Encoder part
204 Z_enc = Embedding
205 state = None
206 for Encoder in Encoders:
207 Z_enc, H_enc, C_enc = Encoder(Embedding, initial_state=state)
208 state = H_enc, C_enc
210 if residuals_link:
211 Z_enc = Z_enc + Embedding
213 Z_enc = FTE(Z_enc)
215 # Lattent embedding of each state (Z_t) and last current memory state (H et C)
216 Z_enc = Dropout(dp, seed=add_random_state(random_state, 501))(
217 Z_enc, training=flag_mc
218 )
219 H_enc = Dropout(dp, seed=add_random_state(random_state, 502))(
220 H_enc, training=flag_mc
221 )
222 C_enc = Dropout(dp, seed=add_random_state(random_state, 503))(
223 C_enc, training=flag_mc
224 )
225 state = H_enc, C_enc
227 # Decoder part : Training inference without loop :
228 for Decoder in Decoders:
229 Z_dec, H_dec, C_dec = Decoder(Z_enc, initial_state=state)
230 state = H_dec, C_dec
232 if residuals_link:
233 Z_dec = Z_dec + Z_enc
235 outputs_training = TimeDistributed(Interpretor)(
236 Dropout(dp)(Z_dec[:, -dim_horizon:, :], training=flag_mc)
237 )
239 # Inference loop
240 # For i = 0 Z_enc tensor [Batch,size_window,dim_z] -> Stacked LSTM
241 # Else Z_enc tensor [Batch,1,dim_z]
242 if False:
243 Z_enc_inference = Z_enc[:, :-dim_horizon]
244 for i in range(dim_horizon):
245 Z_dec, H_dec, C_dec = Decoder(Z_enc_inference)
246 if residuals_link:
247 Z_dec = Z_dec + Z_enc_inference
249 output = Interpretor(Dropout(dp)(Z_dec[:, -1, :], training=flag_mc))
250 outputs.append(output)
252 # if i != (dim_horizon) - 1:
253 # Data = Data_embedding(
254 # inputs_lt, Y_past, outputs, "encoder")
255 # Embedding = TimeDistributed(Embeddor_ctx)(Data)
256 # Z_enc_inference, H_enc, C_enc = Encoder(Embedding)
257 # if residuals_link:
258 # Z_enc_inference = Z_enc_inference + Embedding
260 outputs = Lambda(lambda x: K.stack(x, axis=1))(outputs)
262 if False: # Autoreg
263 pass
264 # list_input = [inputs_lt, Y_past, Y_futur]
265 # if not (loop_learning):
266 # tf.keras.Model(list_input, outputs_training)
267 # model = tf.keras.Model(list_input, outputs)
268 else:
269 model = tf.keras.Model(list_inputs, outputs_training)
270 return model
273class Lstm_ED_UQ(NN_UQ):
274 def __init__(
275 self,
276 model_parameters,
277 factory_parameters=dict(),
278 training_parameters=dict(),
279 type_output=None,
280 rescale=False,
281 n_ech=5,
282 train_ratio=0.9,
283 name="",
284 random_state=None,
285 ):
286 """LSTM_ED : Neural network with UQ using NN_UQ wrapper
288 Args:
289 model_parameters (_type_): _description_
290 factory_parameters (_type_, optional): _description_. Defaults to dict().
291 training_parameters (_type_, optional): _description_. Defaults to dict().
292 type_output (_type_, optional): _description_. Defaults to None.
293 rescale (bool, optional): _description_. Defaults to False.
294 n_ech (int, optional): _description_. Defaults to 8.
295 train_ratio (float, optional): _description_. Defaults to 0.9.
296 name (str, optional): _description_. Defaults to "Lstm_stacked".
297 random_state (bool): handle experimental random using seed.
298 """
299 if (random_state) is not None:
300 print("Warning : issues non-deterministic behaviour even with random state")
302 super().__init__(
303 model_initializer=build_lstm_stacked,
304 model_parameters=model_parameters,
305 factory_parameters=factory_parameters,
306 training_parameters=training_parameters,
307 type_output=type_output,
308 rescale=rescale,
309 n_ech=n_ech,
310 train_ratio=train_ratio,
311 name=name,
312 random_state=random_state,
313 )
315 def factory(self, X, y, mask=None, only_fit_scaler=False, **kwarg):
316 model_params = self.model_parameters
317 factory_params = self.factory_parameters
319 with_ctx_input = model_params["with_ctx_input"]
321 step = 1
322 if "step" in model_params.keys():
323 step = model_params["step"]
325 X_none = False
326 if X is None:
327 X_none = True
329 if X_none:
330 inputs = None
331 else:
332 if with_ctx_input:
333 X, X_lag = X
334 X, X_lag, mask = super().factory(X, X_lag, mask)
335 if only_fit_scaler:
336 return None
337 X_lt = stack_and_roll(
338 X,
339 model_params["n_windows"],
340 lag=factory_params["factory_lag_lt"],
341 step=step,
342 )
344 X_st = stack_and_roll(
345 X_lag,
346 model_params["size_window"],
347 lag=factory_params["factory_lag_st"] - 1,
348 step=step,
349 )
351 inputs = [X_lt, X_st]
352 else:
353 X, _, mask = super().factory(X, None, mask)
354 if only_fit_scaler:
355 return None
356 X_lag = X
357 X_st = stack_and_roll(
358 X,
359 model_params["size_window"],
360 lag=factory_params["factory_lag_st"] - 1,
361 step=step,
362 )
363 inputs = [X_st]
365 new_y = None
366 if y is not None:
367 _, new_y, mask = super().factory(None, y, mask)
368 new_y = stack_and_roll(
369 y,
370 model_params["dim_horizon"],
371 lag=model_params["dim_horizon"] - 1,
372 step=step,
373 )
375 return inputs, new_y, mask
377 def Build_generator(self, X, y, batch=32, shuffle=True, train=True):
378 return Folder_Generator(
379 X,
380 y,
381 self,
382 batch=batch,
383 shuffle=shuffle,
384 train=train,
385 random_state=self.random_state,
386 )
389def get_params_dict(
390 dim_ctx,
391 dim_dyn,
392 dim_target,
393 size_window=20,
394 n_windows=5,
395 dim_horizon=5,
396 step=1,
397 dim_chan=1,
398 dp=0.05,
399 dp_rec=0.05,
400 dim_z=50,
401 k_reg=(10e-7, 10e-7),
402 num_lstm_enc=1,
403 num_lstm_dec=1,
404 layers_enc=[150, 75],
405 layers_dec=[200, 125, 75],
406 list_strides=[2, 1, 1, 1],
407 list_filters=[128, 128, 128],
408 list_kernels=None,
409 with_convolution=True,
410 with_ctx_input=True,
411 n_ech=3,
412 type_output="MC_Dropout",
413):
414 dict_params = {
415 "dim_ctx": dim_ctx,
416 "dim_dyn": dim_dyn,
417 "dim_target": dim_target,
418 "dim_chan": dim_chan,
419 "size_window": size_window,
420 "n_windows": n_windows,
421 "dim_horizon": dim_horizon,
422 "type_output": type_output,
423 "num_lstm_enc": num_lstm_enc,
424 "num_lstm_dec": num_lstm_dec,
425 "step": step,
426 "dim_z": dim_z,
427 "dp": dp,
428 "dp_rec": dp_rec,
429 "k_reg": k_reg,
430 "layers_enc": layers_enc,
431 "layers_dec": layers_dec,
432 "list_strides": list_strides,
433 "list_kernels": list_kernels,
434 "list_filters": list_filters,
435 "with_convolution": with_convolution,
436 "n_ech": n_ech,
437 "with_ctx_input": with_ctx_input,
438 }
439 return dict_params