Coverage for uqmodels/modelization/DL_estimator/metalayers.py: 24%
375 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import numpy as np
2import tensorflow as tf
3import tensorflow.keras.backend as K
4from keras import Model, layers
5from keras.layers import RNN, Dense, Dropout, Lambda, Layer, LSTMCell, TimeDistributed
7from uqmodels.modelization.DL_estimator.utils import set_global_determinism
9from ...utils import add_random_state, generate_random_state, get_fold_nstep
11# EDL head
12# https://github.com/aamini/evidential-deep-learning/blob/main/evidential_deep_learning/layers/dense.py
14# tf.keras.utils.get_custom_objects().clear()
17@tf.keras.utils.register_keras_serializable(package="UQModels_layers")
18class EDLProcessing(Layer):
19 def __init__(self, min_logvar=-6, **kwargs):
20 self.min_logvar = min_logvar
21 super().__init__(**kwargs)
23 def compute_output_shape(self, input_shape):
24 return input_shape
26 def call(self, x):
27 """Apply EDLProcessing
29 Args:
30 x (_type_): input
32 Returns:
33 _type_: _description_
34 """
35 mu, logv, logalpha, logbeta = tf.split(x, 4, axis=-1)
36 v = tf.nn.softplus(logv) + 10e-6
37 alpha = tf.nn.softplus(logalpha) + 1
38 beta = tf.nn.softplus(logbeta) + 10e-6
39 return tf.concat([mu, v, alpha, beta], axis=-1)
41 def get_config(self):
42 return {"min_logvar": self.min_logvar}
45@tf.keras.utils.register_keras_serializable(package="UQModels_layers")
46class ProbabilisticProcessing(Layer):
47 """_summary_
49 Args:
50 Layer (_type_): _description_
51 """
53 def __init__(self, min_logvar=-10, max_logvar=10, **kwargs):
54 self.min_logvar = min_logvar
55 self.max_logvar = max_logvar
56 super().__init__(**kwargs)
58 def compute_output_shape(self, input_shape):
59 return input_shape
61 def call(self, x):
62 """Apply ProbabilisticProcessing to x
64 Args:
65 x (_type_): _description_
67 Returns:
68 _type_: _description_
69 """
70 mu, logsigma = tf.split(x, 2, axis=-1)
71 logsigma = tf.where(logsigma > self.min_logvar, logsigma, self.min_logvar)
73 logsigma = tf.where(logsigma < self.max_logvar, logsigma, self.max_logvar)
74 # logsigma = tf.nn.softplus(logsigma)
75 return tf.concat([mu, logsigma], axis=-1)
77 def get_config(self):
78 return {"min_logvar": self.min_logvar, "max_logvar": self.max_logvar}
81def mlp(
82 dim_in=10,
83 dim_out=1,
84 layers_size=[100, 50],
85 name="",
86 dp=0.01,
87 with_mc_dp=True,
88 type_output=None,
89 logvar_min=-10,
90 regularizer_W=(0.00001, 0.00001),
91 shape_2D=None,
92 shape_2D_out=None,
93 random_state=None,
94 **kwargs,
95):
96 """Generate a keras MLP model to make preprocessing or head subpart".
98 Args:
99 dim_in (int): Input dimension, erase by shape_2D if 2D input_size
100 dim_out (int or None): Input dimension, if None take the last layers_size values
101 layers_size (list of in, optional): List of size of layers. Defaults to [100, 50].
102 name (str, optional): Name of model. Defaults to "".
103 dp (float, optional): Percentage of dropout. Defaults to 0.01.
104 type_output (_type_, optional): Specify Head last layers among
105 ['None':pred,MC_Dropout:(Pred,var),"EDL":(Pred,mu,alpha,beta) ]
106 logvar_min (int, optional): Cut off for small variance estimations
107 regularizer_W (tuple, optional):Regularisation on Dense layers. Defaults to (0.00001, 0.00001).
108 shape_2D (tupple or None, optional): if intput shape is 2D. Defaults to None.
109 shape_2D_out:
110 random_state (bool): handle experimental random using seed.
112 Returns:
113 _type_: _description_
114 """
116 set_global_determinism(random_state)
118 reg_l1l2 = tf.keras.regularizers.l1_l2(l1=regularizer_W[0], l2=regularizer_W[1])
120 flag_mc = None
121 if with_mc_dp:
122 flag_mc = 1
124 if shape_2D is None:
125 inputs = tf.keras.layers.Input(shape=(dim_in), name="input_" + name)
126 output = inputs
128 else:
129 inputs = tf.keras.layers.Input(
130 shape=(shape_2D[0], shape_2D[1]), name="input_" + name
131 )
132 output = tf.keras.layers.Lambda(lambda x: K.reshape(x, shape=(-1, dim_in)))(
133 inputs
134 )
136 for n, i in enumerate(layers_size):
137 layer = tf.keras.layers.Dense(
138 i,
139 activation=tf.keras.layers.LeakyReLU(alpha=0.01),
140 name="MLP_" + str(n) + "_" + name,
141 kernel_regularizer=reg_l1l2,
142 )
143 if dp > 0:
144 output = tf.keras.layers.Dropout(
145 dp, seed=add_random_state(random_state, n)
146 )(layer(output), training=flag_mc)
148 # Probablistic NN
149 if type_output == "EDL":
150 n_param = 4
151 EDL_ouput = tf.keras.layers.Dense(
152 n_param * dim_out, name="EDL", activation=None
153 )(output)
154 output = EDLProcessing(logvar_min)(EDL_ouput)
156 elif (type_output == "MC_Dropout") or (type_output == "Deep_ensemble"):
157 n_param = 2
158 Prob_output = tf.keras.layers.Dense(
159 n_param * dim_out, name="Mu_logvar", activation=None
160 )(output)
161 output = ProbabilisticProcessing(logvar_min)(Prob_output)
163 elif type_output == "classif":
164 n_param = 1
165 tf.keras.layers.Dense(n_param * dim_out, name="Prob", activation="softmax")(
166 output
167 )
169 elif dim_out is not None:
170 n_param = 1
171 output = tf.keras.layers.Dense(n_param * dim_out, name="Output_" + name)(output)
173 else:
174 pass
176 if shape_2D_out is not None:
177 output = tf.keras.layers.Lambda(
178 lambda x: K.reshape(x, (-1, shape_2D_out[1] * n_param, shape_2D_out[0]))
179 )(output)
181 output = tf.keras.layers.Lambda(lambda x: K.permute_dimensions(x, (0, 2, 1)))(
182 output
183 )
185 mlp = tf.keras.Model(inputs, output, name="MLP_" + name)
186 return mlp
189# Improvemement to do : Transform moving_slice in Keras layers
190# Dev : Preprocessing et reconstruction basé cnn 1D for independant multivariate TS : depreciated
192# Old
195def stack_and_roll_layer(
196 inputs, size_window, size_subseq, padding, name="", format="tf_slice"
197):
198 """Layers that produce a stack rolled layers to produce a batch of subsequence
200 Args:
201 inputs (_type_): layers
202 size_window (_type_): size of subseqeuence
203 size_subseq (_type_): _description_
204 padding (_type_): _description_
205 name (str, optional): _description_. Defaults to "".
206 format (str, optional): _description_. Defaults to "tf_slice".
208 Returns:
209 _type_: _description_
210 """
211 # Slice and stack tensor to produce folded representation
212 slide_tensor = []
213 n_step = get_fold_nstep(size_window, size_subseq, padding)
214 # Implementation numpy
216 if format == "tf_slice": # tf slice based
217 n_step = get_fold_nstep(size_window, size_subseq, padding)
218 z_slice = []
219 steps = range(0, n_step * padding, padding)
220 for i, step in enumerate(steps):
221 z_slice.append(
222 tf.slice(inputs, [0, step, 0], [-1, size_subseq, -1])[:, None]
223 )
224 x = tf.concat(z_slice, axis=1)
225 return x
226 elif format == "np_slice": # np slice based
227 for i in range(n_step):
228 slide_tensor.append(
229 inputs[:, (i * padding) : (i * padding) + size_subseq, :][:, None]
230 )
231 return Lambda(lambda x: K.concatenate(x, axis=1), name=name + "_rollstack")(
232 slide_tensor
233 )
235 elif format == "tf_map": # tf map based
236 x = tf.map_fn(
237 lambda i: inputs[:, (i * padding) : (i * padding) + size_subseq, :],
238 tf.range(n_step),
239 dtype=tf.float32,
240 )
241 x = tf.transpose(x, [1, 0, 2, 3])
242 return x
244 else:
245 exit(
246 f"Unknown format {format}. Supported formats are: tf_slice, np_slice and tf_map"
247 )
250def cnn_enc_1D(
251 size_subseq_enc,
252 dim_out,
253 dim_z,
254 dim_lt,
255 type_output=None,
256 k=32,
257 dp=0.05,
258 random_state=None,
259 **kwarg,
260):
261 """cnn_enc_1D layers
263 Args:
264 size_subseq_enc (int, optional): _description_. Defaults to 100.
265 dim_out (int, optional): _description_. Defaults to 10.
266 dim_z (int, optional): _description_. Defaults to 100.
267 dim_lt (int, optional): _description_. Defaults to 100.
268 type_output (_type_, optional): _description_. Defaults to None.
269 k (int, optional): _description_. Defaults to 32.
270 dp (float, optional): _description_. Defaults to 0.05.
271 random_state (bool): handle experimental random using seed.
273 Returns:
274 _type_: _description_
275 """
277 flag_mc = False
278 if type_output in ["MC_Dropout", "Deep_ensemble"]:
279 flag_mc = True
281 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_out), name="st")
283 output = layers.SeparableConv1D(
284 k,
285 int(dim_lt / 4),
286 strides=1,
287 depth_multiplier=10,
288 activation="relu",
289 padding="same",
290 )(inputs)
292 output = layers.AveragePooling1D((5), padding="same")(output)
293 if dp > 0:
294 output = tf.keras.layers.Dropout(dp, seed=random_state)(
295 output, training=flag_mc
296 )
297 output = layers.SeparableConv1D(
298 k * 2,
299 int(dim_lt / 8),
300 strides=1,
301 depth_multiplier=10,
302 activation="relu",
303 padding="same",
304 )(output)
306 output = layers.AveragePooling1D((2), padding="same")(output)
307 if dp > 0:
308 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))(
309 output, training=flag_mc
310 )
311 output = layers.SeparableConv1D(
312 k * 2,
313 int(dim_lt / 16),
314 strides=1,
315 depth_multiplier=10,
316 activation="relu",
317 padding="same",
318 )(output)
320 output = layers.AveragePooling1D((2), padding="same")(output)
321 output = tf.keras.layers.Flatten()(output)
322 output = tf.keras.layers.Dense(dim_z)(output)
323 enc = Model(inputs, output)
324 return enc
327def cnn_dec_1D(
328 size_subseq_dec=10,
329 dim_out=10,
330 dim_z=100,
331 type_output=None,
332 k=32,
333 dp=0.05,
334 random_state=None,
335 **kwarg,
336):
337 """_summary_
339 Args:
340 size_subseq_dec (int, optional): _description_. Defaults to 10.
341 dim_out (int, optional): _description_. Defaults to 10.
342 dim_z (int, optional): _description_. Defaults to 100.
343 type_output (_type_, optional): _description_. Defaults to None.
344 k (int, optional): _description_. Defaults to 32.
345 dp (float, optional): _description_. Defaults to 0.05.
346 random_state (bool): handle experimental random using seed.
348 Returns:
349 _type_: _description_
350 """
352 dim_chan_out = 1
353 flag_mc = False
354 if type_output in ["MC_Dropout", "Deep_ensemble"]:
355 dim_chan_out = 2
356 flag_mc = True
358 elif type_output == "EDL":
359 dim_chan_out = 4
361 inputs = layers.Input(shape=(dim_z), name="st")
362 output = tf.keras.layers.Lambda(lambda x: x[:, None, :])(inputs)
363 output = layers.Conv1DTranspose(
364 k, (10), strides=5, activation="relu", padding="same"
365 )(output)
366 if dp > 0:
367 output = tf.keras.layers.Dropout(dp, seed=random_state)(
368 output, training=flag_mc
369 )
370 output = layers.Conv1DTranspose(
371 k, (50), strides=2, activation="relu", padding="same"
372 )(output)
373 if dp > 0:
374 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))(
375 output, training=flag_mc
376 )
377 output = layers.SeparableConv1D(
378 dim_out * dim_chan_out,
379 10,
380 strides=1,
381 depth_multiplier=10,
382 activation="sigmoid",
383 padding="same",
384 )(output)
385 # aamini/evidential-deep-learning
386 if type_output in ["MC_Dropout", "Deep_ensemble"]:
387 output = ProbabilisticProcessing()(output)
389 if type_output == "EDL":
390 output = EDLProcessing()(output)
392 output = tf.keras.layers.Lambda(
393 lambda x: K.reshape(x, (-1, size_subseq_dec * dim_out * dim_chan_out))
394 )(output)
395 dec = tf.keras.Model(inputs, output, name="conv_lag_dec")
396 return dec
399# Dev : Preprocessing CNN based representation layers for 2D TS without spatial structure
402def cnn_enc(
403 size_subseq_enc,
404 dim_out,
405 dim_chan=1,
406 k1=10,
407 reduction_x1=8,
408 reduction_x2=1,
409 dim_z=100,
410 dp=0.02,
411 random_state=None,
412 **kwarg,
413):
414 """Warning depreciated CNN_enc implementation
416 Args:
417 size_subseq_enc (_type_): _description_
418 dim_out (_type_): _description_
419 dim_chan (int, optional): _description_. Defaults to 1.
420 k1 (int, optional): _description_. Defaults to 10.
421 reduction_x1 (int, optional): _description_. Defaults to 8.
422 reduction_x2 (int, optional): _description_. Defaults to 1.
423 dim_z (int, optional): _description_. Defaults to 100.
424 dp (float, optional): _description_. Defaults to 0.02.
425 random_state (_type_, optional): _description_. Defaults to None.
427 Returns:
428 _type_: _description_
429 """
431 dim_out_reshape = int(dim_out / dim_chan)
432 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_out), name="st")
434 output = tf.keras.layers.Lambda(
435 lambda x: K.reshape(x, (-1, size_subseq_enc, dim_out_reshape, dim_chan))
436 )(inputs)
438 output = tf.keras.layers.Conv2D(
439 k1 * 3,
440 (reduction_x1, reduction_x2),
441 strides=1,
442 activation="relu",
443 )(output)
445 output = tf.keras.layers.AveragePooling2D((2, reduction_x2), padding="same")(output)
447 # output = tf.keras.layers.Dropout(dp)(output, training=True)
448 output = tf.keras.layers.BatchNormalization()(output)
450 output = tf.keras.layers.Conv2D(
451 k1,
452 (reduction_x1, dim_out_reshape),
453 strides=1,
454 activation="relu",
455 )(output)
457 output = tf.keras.layers.AveragePooling2D((2, 1), padding="same")(output)
459 # output = tf.keras.layers.Dropout(dp)(output, training=True)
460 output = tf.keras.layers.BatchNormalization()(output)
462 output = tf.keras.layers.Conv2D(
463 k1 * 2, (reduction_x1, 1), strides=1, activation="relu"
464 )(output)
466 output = tf.keras.layers.Flatten()(output)
467 output = tf.keras.layers.Dense(dim_z)(output)
468 if dp > 0:
469 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=True)
470 enc = tf.keras.Model(inputs, output, name="conv_lag_enc")
471 return enc
474def cnn_dec(
475 size_subseq_dec,
476 dim_out,
477 dim_chan=1,
478 type_output=None,
479 k1=10,
480 min_logvar=-6,
481 dim_z=100,
482 dp=0.01,
483 random_state=None,
484 **kwarg,
485):
486 """Warning depreciated CNN_dec implementation
488 Args:
489 size_subseq_dec (_type_): _description_
490 dim_out (_type_): _description_
491 dim_chan (int, optional): _description_. Defaults to 1.
492 type_output (_type_, optional): _description_. Defaults to None.
493 k1 (int, optional): _description_. Defaults to 10.
494 min_logvar (int, optional): _description_. Defaults to -6.
495 dim_z (int, optional): _description_. Defaults to 100.
496 dp (float, optional): _description_. Defaults to 0.01.
497 random_state (_type_, optional): _description_. Defaults to None.
499 Returns:
500 _type_: _description_
501 """
503 dim_chan_out = 1
504 if type_output in ["MC_Dropout", "Deep_ensemble"]:
505 dim_chan_out = 2
507 elif type_output == "EDL":
508 dim_chan_out = 4
510 dim_out_reshape = int(dim_out / dim_chan)
511 inputs = tf.keras.layers.Input(shape=(dim_z), name="st")
512 output = tf.keras.layers.Lambda(lambda x: x[:, None, None, :])(inputs)
514 output = tf.keras.layers.Conv2DTranspose(
515 k1 * 2,
516 (int(np.floor((size_subseq_dec + 1) / 2)), dim_out_reshape * dim_chan),
517 strides=(1, 1),
518 activation="relu",
519 )(output)
521 output = tf.keras.layers.BatchNormalization()(output)
522 if dp > 0:
523 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=True)
525 output = tf.keras.layers.Conv2DTranspose(
526 k1,
527 (int(np.ceil((size_subseq_dec + 1) / 2)), 1),
528 strides=(1, 1),
529 activation="relu",
530 )(output)
532 output = tf.keras.layers.BatchNormalization()(output)
533 if dp > 0:
534 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))(
535 output, training=True
536 )
538 output = tf.keras.layers.Conv2DTranspose(dim_chan_out, (1, 1), activation="linear")(
539 output
540 )
542 # Probablistic NN
543 if type_output in ["MC_Dropout", "Deep_ensemble"]:
544 output = ProbabilisticProcessing(min_logvar)(output)
546 # aamini/evidential-deep-learning
547 elif type_output == "EDL":
548 output = EDLProcessing(min_logvar)(output)
550 else:
551 pass
553 output = tf.keras.layers.Lambda(
554 lambda x: K.reshape(x, (-1, size_subseq_dec * dim_out * dim_chan_out))
555 )(output)
557 dec = tf.keras.Model(inputs, output, name="conv_lag_dec")
558 return dec
561def conv_block_1D(
562 inputs,
563 dim_chan,
564 filters=32,
565 kernel=2,
566 strides=2,
567 dp=0.02,
568 flag_mc=False,
569 random_state=None,
570):
572 output = tf.keras.layers.Conv1D(
573 filters * dim_chan,
574 kernel,
575 strides=strides,
576 padding="causal",
577 groups=dim_chan,
578 activation="relu",
579 )(inputs)
580 output = tf.keras.layers.BatchNormalization()(output)
581 if dp > 0:
582 output = tf.keras.layers.Dropout(dp, seed=random_state)(
583 output, training=flag_mc
584 )
585 return output
588def conv_block_2D(
589 inputs,
590 dim_chan,
591 filters=32,
592 kernel=5,
593 strides=(2, 1),
594 dp=0.02,
595 flag_mc=False,
596 random_state=None,
597):
599 output = tf.keras.layers.Conv2D(
600 filters, kernel, strides=strides, padding="valid", activation="relu"
601 )(inputs)
602 output = tf.keras.layers.BatchNormalization()(output)
603 if dp > 0:
604 output = tf.keras.layers.Dropout(dp, seed=random_state)(
605 output, training=flag_mc
606 )
607 return output
610# Version actuelle : génération d'une structure CNN_ENC parametrable
613def cnn_enc_bis(
614 size_subseq_enc=60,
615 dim_target=52,
616 dim_chan=4,
617 list_filters=[64, 64, 32],
618 list_kernels=[(10, 3), 10, 10],
619 list_strides=[(2, 1), (2, 1), (2, 1)],
620 type_output=None,
621 block="2D",
622 dim_z=200,
623 dp=0.02,
624 random_state=None,
625 **kwarg,
626):
627 """Produce a cnn_enn subpart of a deep learning predictor
629 Returns:
630 _type_: _description_
631 """
632 flag_mc = False
633 if type_output in ["MC_Dropout"]:
634 flag_mc = True
636 dim_space = int(dim_target / dim_chan)
637 if dim_chan == 1:
638 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_target), name="st")
640 output = tf.keras.layers.Lambda(lambda x: K.expand_dims(x, axis=-1))(inputs)
641 else:
642 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_target), name="st")
644 output = tf.keras.layers.Lambda(
645 lambda x: K.reshape(x, (-1, size_subseq_enc, dim_space, dim_chan))
646 )(inputs)
648 for n, (filters, kernel, strides) in enumerate(
649 zip(list_filters, list_kernels, list_strides)
650 ):
651 if block == "2D":
652 output = conv_block_2D(
653 output,
654 dim_space,
655 filters=filters,
656 kernel=kernel,
657 strides=strides,
658 flag_mc=flag_mc,
659 random_state=add_random_state(random_state, 1 + n),
660 )
662 if block == "1D":
663 output = conv_block_1D(
664 output,
665 dim_space,
666 filters=filters,
667 kernel=kernel,
668 strides=strides,
669 flag_mc=flag_mc,
670 random_state=add_random_state(random_state, 1 + len(list_filters) + n),
671 )
673 output = tf.keras.layers.Flatten()(output)
674 output = tf.keras.layers.Dense(dim_z)(output)
675 if dp > 0:
676 output = tf.keras.layers.Dropout(dp, seed=random_state)(
677 output, training=flag_mc
678 )
679 enc = tf.keras.Model(inputs, output, name="conv_lag_enc")
680 return enc
683def Tconv_block_1D(
684 inputs,
685 dim_out,
686 filters=32,
687 kernel=2,
688 strides=2,
689 dp=0.02,
690 flag_mc=False,
691 random_state=None,
692):
694 output = tf.keras.layers.Conv1DTranspose(
695 filters * dim_out,
696 kernel,
697 strides=strides,
698 padding="same",
699 groups=dim_out,
700 activation="relu",
701 )(inputs)
702 output = tf.keras.layers.BatchNormalization()(output)
703 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=flag_mc)
704 return output
707def Tconv_block_2D(
708 inputs,
709 dim_out,
710 filters=32,
711 kernel=5,
712 strides=(2, 1),
713 dp=0.02,
714 flag_mc=False,
715 random_state=None,
716):
718 output = tf.keras.layers.Conv2DTranspose(
719 filters, (kernel, dim_out), strides=strides, activation="relu"
720 )(inputs)
721 output = tf.keras.layers.BatchNormalization()(output)
722 if dp > 0:
723 output = tf.keras.layers.Dropout(dp, seed=random_state)(
724 output, training=flag_mc
725 )
726 return output
729def cnn_dec_bis(
730 size_subseq_dec,
731 dim_out,
732 dim_chan=1,
733 type_output=None,
734 min_logvar=-6,
735 list_filters=[64, 64],
736 strides=(1, 1),
737 list_kernels=[4, 4],
738 dim_z=200,
739 random_state=None,
740 **kwarg,
741):
743 flag_mc = False
744 if type_output in ["MC_Dropout"]:
745 flag_mc = True
747 dim_chan_out = 1 * dim_chan
748 if type_output in ["MC_Dropout", "Deep_ensemble"]:
749 dim_chan_out = 2 * dim_chan
751 elif type_output == "EDL":
752 dim_chan_out = 4 * dim_chan
754 dim_space = int(dim_out / size_subseq_dec)
755 inputs = tf.keras.layers.Input(shape=(dim_z), name="st")
756 output = tf.keras.layers.Lambda(lambda x: x[:, None, None, :])(inputs)
758 for n, (filters, kernel) in enumerate(zip(list_filters, list_kernels)):
759 d_tar = 1
760 if n == 0:
761 d_tar = dim_space
762 output = Tconv_block_2D(
763 output,
764 d_tar,
765 filters=filters,
766 kernel=kernel,
767 strides=strides,
768 flag_mc=flag_mc,
769 random_state=add_random_state(random_state, n),
770 )
772 output = tf.keras.layers.Conv2DTranspose(dim_chan_out, (1, 1), activation="linear")(
773 output
774 )
776 # Probablistic NN
777 if type_output in ["MC_Dropout", "Deep_ensemble"]:
778 output = ProbabilisticProcessing(min_logvar)(output)
780 # aamini/evidential-deep-learning
781 elif type_output == "EDL":
782 output = EDLProcessing(min_logvar)(output)
784 output = tf.keras.layers.Lambda(
785 lambda x: K.reshape(x, (-1, size_subseq_dec, dim_space * dim_chan_out))
786 )(output)
788 dec = tf.keras.Model(inputs, output, name="conv_lag_dec")
789 if dec.layers[-2].output.shape[1] != dec.layers[-1].output.shape[1]:
790 print("Warning : inadequate deconvolution window size : model will crash")
792 return dec
795# Dev : Preprocessing CNN based representation layers for 2D TS without spatial structure
798def dense2D_enc_dec(
799 size_subseq_enc,
800 size_subseq_dec,
801 dim_in,
802 dim_out,
803 layers_size=[100, 50],
804 dim_z=100,
805 dp=0.05,
806 enc_only=False,
807 type_output=None,
808 random_state=None,
809):
811 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_in), name="st")
812 inputs_flatten = Lambda(lambda x: K.reshape(x, (-1, size_subseq_enc * dim_in)))(
813 inputs
814 )
816 layers_size_enc = layers_size
817 layers_size_enc.append(dim_z)
819 mlp_enc = mlp(
820 size_subseq_enc * dim_in,
821 dim_out=None,
822 layers_size=layers_size_enc,
823 name="",
824 dp=dp,
825 type_output=None,
826 random_state=random_state,
827 )(inputs_flatten)
828 enc = tf.keras.Model(inputs, mlp_enc, name="MLP_enc")
830 if enc_only:
831 return (enc, None)
833 inputs = tf.keras.layers.Input(shape=(dim_z), name="embedding")
834 z_flatten = Lambda(lambda x: K.reshape(x, (-1, dim_z)))(inputs)
835 mlp_dec = mlp(
836 dim_z,
837 dim_out=size_subseq_dec * dim_out,
838 layers_size=layers_size[::-1],
839 name="",
840 dp=dp,
841 type_output=type_output,
842 random_state=add_random_state(random_state, 100),
843 )(z_flatten)
845 if type_output == "EDL":
846 dim_out = dim_out * 4
848 elif type_output == "MC_Dropout":
849 dim_out = dim_out * 2
851 reshape_dec = Lambda(lambda x: K.reshape(x, (-1, size_subseq_dec, dim_out)))(
852 mlp_dec
853 )
854 dec = tf.keras.Model(inputs, reshape_dec, name="MLP_dec")
855 return (enc, dec)
858def moving_slice_map(inputs, n_step, padding, kick_off=0, depth_slice=1):
859 """Apply Layers on n_step slices of input with padding
860 Args:
861 input (TF.tensor): Input tensors
862 Layers (Keras.model): Submodels
863 n_step (int): N_slide
864 padding (int): padding"""
866 steps = range(kick_off, kick_off + n_step * padding, padding)
867 z_slice = []
868 for i, step in enumerate(steps):
869 if len(inputs.shape) == 3:
870 slice = tf.slice(inputs, [0, step, 0], [-1, depth_slice, -1])
871 else:
872 slice = tf.slice(inputs, [0, step, 0, 0], [-1, depth_slice, -1, -1])
874 if depth_slice > 1:
875 slice = slice[:, None]
877 z_slice.append(slice)
879 z_slice = Lambda(lambda x: K.concatenate(x, axis=1))(z_slice)
880 return z_slice
883class Moving_slice_layer(Layer):
884 """Layer that apply moving_slice_map"""
886 def __init__(
887 self, n_step, padding, kick_off=0, depth_slice=1, ignore_futur=None, **kwargs
888 ):
889 self.n_step = n_step
890 self.padding = padding
891 self.kick_off = kick_off
892 self.depth_slice = depth_slice
893 self.ignore_futur = ignore_futur
894 super().__init__(**kwargs)
896 def build(self, input_shape):
897 super().build(input_shape)
899 def call(self, input_data):
900 """Apply moving_slice_map to generate window sequnce
902 Args:
903 input_data (_type_): _description_
905 Returns:
906 _type_: _description_
907 """
908 if self.ignore_futur:
909 input_data = input_data[:, : self.ignore_futur]
910 z_slice = moving_slice_map(
911 input_data, self.n_step, self.padding, self.kick_off, self.depth_slice
912 )
913 return z_slice
915 def compute_output_shape(self, input_shape):
916 step = range(
917 self.kick_off, self.kick_off + self.n_step * self.padding, self.padding
918 )
919 if self.depth_slice > 1:
920 output_shape = (input_shape[0], len(step), input_shape[-1])
921 else:
922 output_shape = (
923 input_shape[0],
924 len(step),
925 self.depth_slice,
926 input_shape[-1],
927 )
928 return output_shape
931class Double_Moving_slice_layer(Layer):
932 """Layer that apply double moving_slice_map
934 Args:
935 Layer (_type_): _description_
936 """
938 def __init__(
939 self, n_step_out, n_step_in, padding_out, padding_in, depth_slice=1, **kwargs
940 ):
941 self.n_step_out = n_step_out
942 self.n_step_in = n_step_in
943 self.padding_out = padding_out
944 self.padding_in = padding_in
945 self.depth_slice = depth_slice
946 super().__init__(**kwargs)
948 def build(self, input_shape):
949 super().build(input_shape)
951 def call(self, input_data):
952 """_summary_
954 Args:
955 input_data (_type_): _description_
957 Returns:
958 _type_: _description_
959 """
960 z_slice_out = []
961 for i in range(self.n_step_out):
962 kick_off = i * self.padding_out
963 z_slice_in = moving_slice_map(
964 input_data,
965 self.n_step_in,
966 self.padding_in,
967 kick_off=kick_off,
968 depth_slice=self.depth_slice,
969 )
970 z_slice_out.append(z_slice_in[:, None])
971 z_slice = Lambda(lambda x: K.concatenate(x, axis=1))(z_slice_out)
972 return z_slice
974 def compute_output_shape(self, input_shape):
975 step_out = range(0, self.n_step_out * self.padding_out, self.padding_out)
976 step_in = range(0, self.n_step_in * self.padding_in, self.padding_in)
978 if self.depth_slice > 1:
979 output_shape = (
980 input_shape[0],
981 len(step_out),
982 len(step_in),
983 input_shape[-1],
984 )
985 else:
986 output_shape = (
987 input_shape[0],
988 len(step_out),
989 len(step_in),
990 self.depth_slice,
991 input_shape[-1],
992 )
993 return output_shape
996# Modelisation ensemble de code lié à la construction d'un modèle LSTM ED à prédiction multihorizon
997# Processing layers : LSTM cell modification to handle state passing.
1000class LSTMCellReturnCellState(LSTMCell):
1001 """Layer LSTM returning output and state jointly
1003 Args:
1004 LSTMCell (_type_): _description_
1005 """
1007 def call(self, inputs, states, training=None):
1008 """_summary_
1010 Args:
1011 inputs (_type_): _description_
1012 states (_type_): _description_
1013 training (_type_, optional): _description_. Defaults to None.
1015 Returns:
1016 _type_: _description_
1017 """
1018 outputs, [h, c] = super().call(inputs, states, training=training)
1019 return tf.concat([h, c], axis=1), [h, c]
1022class LSTMCellMidsize(LSTMCell):
1023 """Hack to take into accout state in cell : size in dim_z*2 | size out dim_z
1025 Args:
1026 LSTMCell (_type_): _description_
1027 """
1029 def build(self, input_shape):
1030 input_shape = (None, int(input_shape[-1] / 2))
1031 super().build(input_shape)
1034# Processing layers : RNN cell
1037class RNN_states_in_inputs(RNN):
1038 """RNN class dispatching jointly [H,C]
1040 Args:
1041 RNN (_type_): _description_
1042 """
1044 def call(
1045 self, inputs, mask=None, training=None, initial_state=None, constants=None
1046 ):
1047 """_summary_
1049 Args:
1050 inputs (_type_): _description_
1051 mask (_type_, optional): _description_. Defaults to None.
1052 training (_type_, optional): _description_. Defaults to None.
1053 initial_state (_type_, optional): _description_. Defaults to None.
1054 constants (_type_, optional): _description_. Defaults to None.
1056 Returns:
1057 _type_: _description_
1058 """
1059 h, c = tf.split(inputs, 2, axis=-1)
1060 states = (h[:, -1], c[:, -1])
1061 return super().call(
1062 h, initial_state=states, mask=mask, training=training, constants=constants
1063 )
1066def LSTM_EProcessing(
1067 n_step,
1068 dim_in,
1069 dim_z,
1070 flag_mc,
1071 dp=0.05,
1072 dp_r=0.02,
1073 l1_l2_reg=(0.0000001, 0.0000001),
1074 random_state=None,
1075):
1076 """Encoder Processing as block aim to capture dynamic
1077 Input (batch,n_step,dim_in) output (batch,n_step,dim_z*2) with Z_h and Z_state concatenate)
1080 Args:
1081 n_step (_type_): _description_
1082 dim_in (_type_): _description_
1083 dim_z (_type_): _description_
1084 flag_mc (_type_): _description_
1085 dp (float, optional): _description_. Defaults to 0.05.
1086 dp_r (float, optional): _description_. Defaults to 0.02.
1087 l1_l2_reg (tuple, optional): _description_. Defaults to (0.0000001, 0.0000001).
1088 random_state (bool): handle experimental random using seed.
1089 Returns:
1090 Model: Keras model as LSTM Encoder block
1091 """
1093 lstm_cell_enc = LSTMCellReturnCellState(
1094 int(dim_z),
1095 name="LSTM_enc_0",
1096 activation="sigmoid",
1097 recurrent_dropout=dp_r,
1098 seed=random_state,
1099 kernel_regularizer=tf.keras.regularizers.l1_l2(
1100 l1=l1_l2_reg[0], l2=l1_l2_reg[1]
1101 ),
1102 )
1103 lstm_enc = tf.keras.layers.RNN(lstm_cell_enc, return_sequences=True)
1105 EProcessing_in = tf.keras.layers.Input(
1106 shape=(n_step, dim_in), name="input_EProcessing"
1107 )
1108 Z_input_lstm = Dropout(dp, seed=add_random_state(random_state, 100))(
1109 TimeDistributed(Dense(dim_z))(EProcessing_in)
1110 )
1111 Z_lstm_output = Dropout(dp, seed=add_random_state(random_state, 101))(
1112 lstm_enc(Z_input_lstm), training=flag_mc
1113 )
1115 # Extended with 0 to not affect state part in skip connexion.
1116 Z_input_lstm_extended = tf.keras.layers.Lambda(lambda x: K.concatenate([x, x * 0]))(
1117 Z_input_lstm
1118 )
1120 EProcessing_out = tf.keras.layers.Lambda(
1121 lambda x: tf.keras.layers.Add()([x[0], x[1]]), name="skip_EProcessing"
1122 )([Z_lstm_output, Z_input_lstm_extended])
1124 LSTM_EProcessing_ = tf.keras.Model(
1125 EProcessing_in, EProcessing_out, name="EProcessing"
1126 )
1128 return LSTM_EProcessing_
1131def LSTM_DProcessing(
1132 n_step,
1133 dim_z,
1134 flag_mc,
1135 dp=0.05,
1136 dp_r=0.02,
1137 l1_l2_reg=(0.0000001, 0.0000001),
1138 random_state=None,
1139):
1140 """Decoder Processing as block : aim to make temporal projection (Usefull to hold query information)
1141 Input (batch,n_step,dim_z*2) output (batch,n_step,dim_z) with Zdecoding latent space)
1143 Args:
1144 n_step (_type_): _description_
1145 dim_z (_type_): _description_
1146 flag_mc (_type_): _description_
1147 dp (float, optional): _description_. Defaults to 0.05.
1148 dp_r (float, optional): _description_. Defaults to 0.02.
1149 l1_l2_reg (tuple, optional): _description_. Defaults to (0.0000001, 0.0000001).
1150 random_state (bool): handle experimental random using seed.
1152 Returns:
1153 Model: Keras model as LSTM Decoder block
1154 """
1156 lstm_cell_dec = LSTMCellMidsize(
1157 int(dim_z),
1158 name="LSTM_dec_0",
1159 activation="sigmoid",
1160 recurrent_dropout=dp_r,
1161 seed=random_state,
1162 kernel_regularizer=tf.keras.regularizers.l1_l2(
1163 l1=l1_l2_reg[0], l2=l1_l2_reg[1]
1164 ),
1165 )
1167 lstm_dec = RNN_states_in_inputs(lstm_cell_dec, return_sequences=True)
1169 DProcessing_in = tf.keras.layers.Input(
1170 shape=(n_step, dim_z * 2), name="input_DProcessing"
1171 )
1172 Z_lstm_output = Dropout(dp, seed=add_random_state(random_state + 100))(
1173 lstm_dec(DProcessing_in), training=flag_mc
1174 )
1175 DProcessing_out = tf.keras.layers.Lambda(
1176 lambda x: tf.keras.layers.Add()([x[0], x[1][:, :, :dim_z]]),
1177 name="skip_DProcessing",
1178 )([Z_lstm_output, DProcessing_in])
1180 LSTM_DProcessing_ = tf.keras.Model(
1181 DProcessing_in, DProcessing_out, name="DProcessing"
1182 )
1183 return LSTM_DProcessing_
1186class Add_query_to_Z_Processing_with_state(Layer):
1187 def __init__(self, dim_z):
1188 self.dim_z = dim_z
1189 self.layer = Dense(dim_z)
1190 super().__init__()
1192 def compute_output_shape(self, input_shape):
1193 return (input_shape[0], input_shape[1], input_shape[2], self.dim_z * 2)
1195 def call(self, ZProcessing, Query):
1196 """_summary_
1198 Args:
1199 ZProcessing (_type_): _description_
1200 Query (_type_): _description_
1202 Returns:
1203 _type_: _description_
1204 """
1205 Z = tf.concat([ZProcessing[:, :, :, : self.dim_z], Query], axis=-1)
1206 new_Z = TimeDistributed(TimeDistributed(self.layer), name="wtf")(Z)
1207 Z = tf.concat([new_Z, ZProcessing[:, :, :, self.dim_z :]], axis=-1)
1208 return Z
1211def get_cnn_enc_params(dim_target, size_subseq_enc=1, dim_z=50, random_state=None):
1212 """Produce dict params that can instanciate cnn_enn_bloc
1213 Args:
1214 dim_target (_type_): dimension of motifs to convolute
1215 size_subseq_enc (int, optional): length of motifs to convulute
1216 dim_z (int, optional): latent dimension
1217 random_state (bool): handle experimental random using seed.
1219 """
1221 dict_params = {
1222 "builder": cnn_enc_bis,
1223 "size_subseq_enc": size_subseq_enc,
1224 "dim_target": dim_target,
1225 "dim_chan": 1,
1226 "list_filters": [64, 64, 32],
1227 "list_kernels": [10, 10, 10],
1228 "block": "2D",
1229 "dim_z": dim_z,
1230 "dp": 0.05,
1231 "random_state": random_state,
1232 }
1234 return dict_params
1237def get_cnn_dec_params(dim_target, size_subseq_dec=1, dim_z=50, random_state=None):
1238 """Produce dict params that can instanciate cnn_dec_bloc
1239 Args:
1240 dim_target (_type_): dimension of motifs to convolute
1241 size_subseq_dec (int, optional): length of motifs to convulute
1242 dim_z (int, optional): latent dimension
1243 random_state (bool): handle experimental random using seed.
1244 """
1245 dict_params = {
1246 "builder": cnn_dec_bis,
1247 "dim_z": dim_z,
1248 "size_subseq_dec": size_subseq_dec,
1249 "dim_target": dim_target,
1250 "dim_chan": 1,
1251 "list_filters": [64, 64],
1252 "strides": (2, 1),
1253 "list_kernels": [4, 4],
1254 "min_logvar": -10,
1255 "random_state": random_state,
1256 }
1258 return dict_params