Source code for uqmodels.modelization.DL_estimator.transformer_ed

import tensorflow as tf
from keras.layers import TimeDistributed
from tensorflow.keras import Input, layers
from uqmodels.modelization.DL_estimator.data_embedding import (
    Factice_Time_Extension,
    Mouving_conv_Embedding,
    Mouving_Windows_Embedding,
    PositionalEmbedding,
)
from uqmodels.modelization.DL_estimator.neural_network_UQ import (
    NN_UQ)
from uqmodels.modelization.DL_estimator.metalayers import mlp
from uqmodels.modelization.DL_estimator.utils import set_global_determinism
from uqmodels.modelization.DL_estimator.data_generator import Folder_Generator
from uqmodels.utils import add_random_state, stack_and_roll


[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class MultiHeadAttention(tf.keras.layers.MultiHeadAttention): pass
[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class LayerNormalization(tf.keras.layers.LayerNormalization): pass
[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class Dropout(tf.keras.layers.Dropout): pass
# Transformer Encoder Layer
[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class Dense(tf.keras.layers.Dense): pass
# Transformer Encoder Layer
[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class TransformerEncoder(layers.Layer): """Transformer Encoder Layer from https://keras.io/examples/audio/transformer_asr/""" def __init__( self, dim_z, num_heads, feed_forward_dim, dp_rec=0.1, flag_mc=False, random_state=None, **kwargs ): super().__init__() self.dim_z = dim_z self.num_heads = num_heads self.feed_forward_dim = feed_forward_dim self.dp_rec = dp_rec self.flag_mc = flag_mc self.random_state = random_state set_global_determinism(self.random_state) # Layers instanciation self.att = MultiHeadAttention(num_heads=num_heads, key_dim=dim_z) self.dense1 = Dense(feed_forward_dim, activation="relu") self.dense2 = Dense(dim_z) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(dp_rec, seed=self.random_state) self.dropout2 = Dropout(dp_rec, seed=add_random_state(self.random_state, 1))
[docs] def call(self, inputs, training=None): """_summary_ Args: inputs (_type_): _description_ training (_type_): _description_ Returns: _type_: _description_ """ if training is None: training = False attn_output = self.att(inputs, inputs) if self.dp_rec > 0: attn_output = self.dropout1(attn_output, training=training | self.flag_mc) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.dense2(self.dense1(out1)) if self.dp_rec > 0: ffn_output = self.dropout2(ffn_output, training=training | self.flag_mc) return self.layernorm2(out1 + ffn_output)
[docs] def get_config(self): config = { "dim_z": self.dim_z, "num_heads": self.num_heads, "feed_forward_dim": self.feed_forward_dim, "dp_rec": self.dp_rec, "flag_mc": self.flag_mc, "random_state": self.random_state, "att": tf.keras.utils.serialize_keras_object(self.att), "layernorm1": tf.keras.utils.serialize_keras_object(self.layernorm1), "layernorm2": tf.keras.utils.serialize_keras_object(self.layernorm2), "dense1": tf.keras.utils.serialize_keras_object(self.dense1), "dense2": tf.keras.utils.serialize_keras_object(self.dense2), "dropout1": tf.keras.utils.serialize_keras_object(self.dropout1), "dropout2": tf.keras.utils.serialize_keras_object(self.dropout2), } config = config return config
[docs] @classmethod def from_config(cls, config): att = config.pop("att") layernorm1 = config.pop("layernorm1") layernorm2 = config.pop("layernorm2") dropout1 = config.pop("dropout1") dropout2 = config.pop("dropout2") dense1 = config.pop("dense1") dense2 = config.pop("dense2") obj = cls(**config) print(dense1) print(att) obj.att = tf.keras.utils.deserialize_keras_object(att) obj.layernorm1 = tf.keras.utils.deserialize_keras_object(layernorm1) obj.layernorm2 = tf.keras.utils.deserialize_keras_object(layernorm2) obj.dropout1 = tf.keras.utils.deserialize_keras_object(dropout1) obj.dropout2 = tf.keras.utils.deserialize_keras_object(dropout2) obj.dense1 = tf.keras.utils.deserialize_keras_object(dense1) obj.dense2 = tf.keras.utils.deserialize_keras_object(dense2) return obj
# Transformer Decoder Layer
[docs] @tf.keras.utils.register_keras_serializable(package="UQModels_layers") class TransformerDecoder(layers.Layer): """Transformer Encoder Layer from https://keras.io/examples/audio/transformer_asr/""" def __init__( self, dim_z, dim_horizon, num_heads, feed_forward_dim, dp_rec=0.1, flag_mc=False, random_state=None, **kwargs ): super().__init__() self.dim_z = dim_z self.dim_horizon = dim_horizon self.num_heads = num_heads self.feed_forward_dim = feed_forward_dim self.dp_rec = dp_rec self.flag_mc = flag_mc self.random_state = random_state set_global_determinism(self.random_state) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.layernorm3 = LayerNormalization(epsilon=1e-6) self.self_att = MultiHeadAttention(num_heads=num_heads, key_dim=dim_z) self.enc_att = MultiHeadAttention(num_heads=num_heads, key_dim=dim_z) self.self_dropout = Dropout(dp_rec, seed=random_state) self.enc_dropout = Dropout(dp_rec, seed=add_random_state(random_state, 1)) self.ffn_dropout = Dropout(dp_rec, seed=add_random_state(random_state, 2)) self.dense1 = Dense(feed_forward_dim, activation="relu") self.dense2 = Dense(dim_z)
[docs] def causal_attention_mask(self, batch_size, n_dest, n_src, dim_horizon, dtype): """Masks the upper half of the dot product matrix in self attention. This prevents flow of information from future tokens to current token. 1's in the lower triangle, counting from the lower right corner. """ len_past = n_dest - dim_horizon i = tf.concat( [ tf.zeros(len_past, dtype=tf.int32) + len_past - 1, tf.range(dim_horizon) + len_past, ], 0, )[:, None] j = tf.range(n_src) m = (i) >= (j - n_src + n_dest) mask = tf.cast(m, dtype) mask = tf.reshape(mask, [1, n_dest, n_src]) mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0 ) return tf.tile(mask, mult)
[docs] def call(self, enc_out, target, training=None): """_summary_ Args: enc_out (_type_): _description_ target (_type_): _description_ Returns: _type_: _description_ """ if training is None: training = False input_shape = tf.shape(target) batch_size = input_shape[0] seq_len = input_shape[1] causal_mask = self.causal_attention_mask( batch_size, seq_len, seq_len, self.dim_horizon, tf.bool ) target_att = self.self_att(target, target, attention_mask=causal_mask) target_norm = self.layernorm1( target + self.self_dropout(target_att, training=training | self.flag_mc) ) enc_out = self.enc_att(target_norm, enc_out) enc_out_norm = self.layernorm2( self.enc_dropout(enc_out, training=training | self.flag_mc) + target_norm ) ffn_out = self.dense2(self.dense1(enc_out_norm)) ffn_out_norm = self.layernorm3( enc_out_norm + self.ffn_dropout(ffn_out, training=training | self.flag_mc) ) return ffn_out_norm
[docs] def get_config(self): config = { "dim_z": self.dim_z, "dim_horizon": self.dim_horizon, "num_heads": self.num_heads, "feed_forward_dim": self.feed_forward_dim, "dp_rec": self.dp_rec, "flag_mc": self.flag_mc, "random_state": self.random_state, "layernorm1": tf.keras.utils.serialize_keras_object(self.layernorm1), "layernorm2": tf.keras.utils.serialize_keras_object(self.layernorm2), "layernorm3": tf.keras.utils.serialize_keras_object(self.layernorm3), "self_att": tf.keras.utils.serialize_keras_object(self.self_att), "enc_att": tf.keras.utils.serialize_keras_object(self.enc_att), "self_dropout": tf.keras.utils.serialize_keras_object(self.self_dropout), "enc_dropout": tf.keras.utils.serialize_keras_object(self.enc_dropout), "ffn_dropout": tf.keras.utils.serialize_keras_object(self.ffn_dropout), "dense1": tf.keras.utils.serialize_keras_object(self.dense1), "dense2": tf.keras.utils.serialize_keras_object(self.dense2), } return config
[docs] @classmethod def from_config(cls, config): layernorm1 = config.pop("layernorm1") layernorm2 = config.pop("layernorm2") layernorm3 = config.pop("layernorm3") self_att = config.pop("self_att") enc_att = config.pop("enc_att") self_dropout = config.pop("self_dropout") enc_dropout = config.pop("enc_dropout") ffn_dropout = config.pop("ffn_dropout") dense1 = config.pop("dense1") dense2 = config.pop("dense2") print(config) obj = cls(**config) obj.layernorm1 = tf.keras.utils.deserialize_keras_object(layernorm1) obj.layernorm2 = tf.keras.utils.deserialize_keras_object(layernorm2) obj.layernorm3 = tf.keras.utils.deserialize_keras_object(layernorm3) obj.self_att = tf.keras.utils.deserialize_keras_object(self_att) obj.enc_att = tf.keras.utils.deserialize_keras_object(enc_att) obj.self_dropout = tf.keras.utils.deserialize_keras_object(self_dropout) obj.enc_dropout = tf.keras.utils.deserialize_keras_object(enc_dropout) obj.ffn_dropout = tf.keras.utils.deserialize_keras_object(ffn_dropout) obj.dense1 = tf.keras.utils.deserialize_keras_object(dense1) obj.dense2 = tf.keras.utils.deserialize_keras_object(dense2) return obj
# encoder
[docs] def build_transformer( size_window=10, n_windows=5, step=1, dim_target=1, dim_chan=1, dim_horizon=3, dim_ctx=20, dim_z=100, num_heads=2, num_feed_forward=128, num_layers_enc=3, num_layers_dec=2, layers_enc=[150], layers_dec=[150, 75], dp=0.05, dp_rec=0.03, k_reg=(0.00001, 0.00001), list_strides=[2, 1], list_filters=None, list_kernels=None, dim_dyn=None, with_positional_embedding=False, with_ctx_input=True, with_convolution=True, type_output=None, random_state=None, **kwargs ): """Builder for Transformer ED with convolutive preprocessing Args: size_window (int, optional): Size of window for lag values. Defaults to 10. n_windows (int, optional): Number of window in past. Defaults to 5. step (int, optional): step between windows. Defaults to 1. dim_target (int, optional): dimension of TS. Defaults to 1. dim_chan (int, optional): Number of channel of TS. Defaults to 1. dim_horizon (int, optional): futur_horizon to predict. Defaults to 3. dim_ctx (int, optional): Number of ctx_features. Defaults to 20. dim_z (int, optional): Size of latent sapce. Defaults to 100. num_heads (int, optional): num of heads transformer. Defaults to 2. num_feed_forward (int, optional): feed_forward transfomer dimension. Defaults to 128. num_layers_enc (int, optional): num of transformer enc block (after concatenation of past values embeding + ctx) . Defaults to 3. num_layers_dec (int, optional): num of transformer dec block Defaults to 2. layers_enc (list, optional):size of MLP preprocessing (after concatenation of past values embeding + ctx) Defaults to [150]. layers_dec (list, optional): size of MLP interpretor. Defaults to 2. dp (float, optional): dropout. Defaults to 0.05. dp_t (float, optional): transformer dropout. Defaults to 0.1. k_reg (tuple, optional): _description_. Defaults to (0.00001, 0.00001). dim_dyn (int, None): size of dyn inputs, if None consider dim_dyn have same size than dim target with_positional_embedding (bool, optional): _description_. Defaults to False. with_ctx_input (bool, optional): Expect ctx features in addition to lag. Defaults to True. with_convolution (bool, optional): use convolution rather than whole lag values in the windows. Defaults to True. type_output (_type_, optional): mode of UQ (see NN_UQ). Defaults to None. random_state (bool): handle experimental random using seed. Returns: transformer : multi-step forecaster with UQ """ if dim_dyn is None: dim_dyn = dim_target flag_mc = 0 if type_output in ["BNN", "MC_Dropout"]: flag_mc = 1 set_global_determinism(random_state) # Embedding_interpretor Interpretor = mlp( dim_in=dim_z, dim_out=dim_target, layers_size=layers_dec, dp=dp, type_output=type_output, name="Interpretor", random_state=random_state, ) # dim_output_size = Interpretor.output.shape[-1] Pos_Embeddor = None if with_positional_embedding: Pos_Embeddor = PositionalEmbedding(dim_z, max_len=size_window + dim_horizon - 1) # Input definition list_input = [] if with_ctx_input: CTX_inputs = Input(shape=(n_windows, dim_ctx), name="encoder_inputs") list_input.append(CTX_inputs) Y_past_in = Input(shape=(size_window, dim_dyn), name="past_inputs") list_input.append(Y_past_in) Y_past = Y_past_in # Preprocessing layers definition if with_convolution: MWE = Mouving_conv_Embedding( size_window, n_windows, step=step, dim_d=dim_dyn, dim_chan=dim_chan, use_conv2D=True, list_strides=list_strides, list_filters=list_filters, list_kernels=list_kernels, dp=0.05, flag_mc=flag_mc, seed=add_random_state(random_state, 100), ) else: MWE = Mouving_Windows_Embedding( size_window, n_windows, step=step, dim_d=dim_dyn, dim_chan=dim_chan, seed=add_random_state(random_state, 100), ) FTE = Factice_Time_Extension(dim_horizon) layers_enc.append(dim_z) dim_embedding = MWE.last_shape if with_ctx_input: dim_embedding += dim_ctx Embeddor_ctx = mlp( dim_in=dim_embedding, dim_out=None, layers_size=layers_enc, dp=dp, name="Embeddor", regularizer_W=k_reg, random_state=add_random_state(random_state, 200), ) # Preprocessing computation Data = MWE(Y_past) # Concat with cat features if with_ctx_input: Data = layers.Concatenate(axis=-1)([CTX_inputs, Data]) # Factice time augmentation (actually useless but can be usefull for extended predict horizon) Data = FTE(Data) Embedding = TimeDistributed(Embeddor_ctx)(Data) # Static Pe that encode window position if Pos_Embeddor: Pe_Embedding = Pos_Embeddor(Embedding) Embedding = Embedding + Pe_Embedding # Encoder l'information passé enc_out = Embedding[:, :(-dim_horizon), :] encoder = [] for i in range(num_layers_enc): encoder.append( TransformerEncoder( dim_z, num_heads, feed_forward_dim=50, num_feed_forward=num_feed_forward, dp_rec=dp_rec, flag_mc=flag_mc, random_state=add_random_state(random_state, 300 + i), ) ) enc_out = encoder[-1](enc_out) # For learning : decoder = [] dec_out = enc_out for i in range(num_layers_dec): decoder.append( TransformerDecoder( dim_z=dim_z, dim_horizon=dim_horizon, feed_forward_dim=50, num_heads=num_heads, num_feed_forward=num_feed_forward, dp_rec=dp_rec, flag_mc=flag_mc, random_state=add_random_state(random_state, 400 + i), ) ) dec_out = decoder[-1](dec_out, Embedding) outputs = TimeDistributed(Interpretor)(dec_out[:, -(dim_horizon):]) model = tf.keras.Model(list_input, outputs, name="model") return model
[docs] class Transformer_ED_UQ(NN_UQ): """Transformer_ED for forecasting with UQ : see build_transformer to check model parameters""" def __init__( self, model_parameters, factory_parameters={"factory_lag_lt": 0, "factory_lag_st": 0}, training_parameters=dict(), type_output=None, rescale=False, n_ech=5, train_ratio=0.9, name="Lstm_stacked", random_state=None, ): """Initialization Args: model_parameters (_type_): _description_ factory_parameters (dict, optional): _description_. Defaults to {'factory_lag_lt': 0, 'factory_lag_st': 0}. training_parameters (_type_, optional): _description_. Defaults to dict(). type_output (_type_, optional): _description_. Defaults to None. rescale (bool, optional): _description_. Defaults to False. n_ech (int, optional): _description_. Defaults to 8. train_ratio (float, optional): _description_. Defaults to 0.9. name (str, optional): _description_. Defaults to "Lstm_stacked". random_state (bool): handle experimental random using seed. """ if (random_state) is not None: print("Warning : issues non-deterministic behaviour even with random state") super().__init__( model_initializer=build_transformer, model_parameters=model_parameters, factory_parameters=factory_parameters, training_parameters=training_parameters, type_output=type_output, rescale=rescale, n_ech=n_ech, train_ratio=train_ratio, name=name, random_state=random_state, )
[docs] def factory(self, X, y, mask=None, only_fit_scaler=False, **kwarg): model_params = self.model_parameters factory_params = self.factory_parameters with_ctx_input = model_params["with_ctx_input"] step = 1 if "step" in model_params.keys(): step = model_params["step"] X_none = False if X is None: X_none = True if X_none: inputs = None else: if with_ctx_input: X, X_lag = X X, X_lag, mask = super().factory(X, X_lag, mask) if only_fit_scaler: return None X_lt = stack_and_roll( X, model_params["n_windows"], lag=factory_params["factory_lag_lt"], step=step, ) X_st = stack_and_roll( X_lag, model_params["size_window"], lag=factory_params["factory_lag_st"] - 1, step=step, ) inputs = [X_lt, X_st] else: X, _, _ = super().factory(X, None, mask) if only_fit_scaler: return None X_lag = X X_st = stack_and_roll( X, model_params["size_window"], lag=factory_params["factory_lag_st"] - 1, step=step, ) inputs = [X_st] new_y = None if y is not None: _, y, _ = super().factory(None, y, mask) new_y = stack_and_roll( y, model_params["dim_horizon"], lag=model_params["dim_horizon"] - 1, step=step, ) return inputs, new_y, mask
[docs] def Build_generator(self, X, y, batch=32, shuffle=True, train=True): return Folder_Generator( X, y, self, batch=batch, shuffle=shuffle, train=train, random_state=self.random_state, )
[docs] def get_params_dict( dim_ctx, dim_dyn, dim_target, dim_chan=1, size_window=20, n_windows=5, dim_horizon=5, dim_z=50, dp=0.05, dp_rec=0.02, num_heads=2, num_feed_forward=128, num_layers_enc=3, num_layers_dec=2, layers_enc=[75, 150, 75], layers_dec=[200, 125, 75], list_strides=[2, 1, 1, 1], list_filters=[128, 128, 128], list_kernels=None, with_convolution=True, with_ctx_input=True, n_ech=3, type_output="MC_Dropout", random_state=None, ): dict_params = { "dim_ctx": dim_ctx, "size_window": size_window, "n_windows": n_windows, "dim_horizon": dim_horizon, "dim_target": dim_target, "dim_chan": dim_chan, "step": 1, "dim_z": dim_z, "dp": dp, "dp_rec": dp_rec, "dim_dyn": dim_dyn, "type_output": type_output, "num_heads": num_heads, "num_feed_forward": num_feed_forward, "num_layers_enc": num_layers_enc, "num_layers_dec": num_layers_dec, "k_reg": (10e-6, 10e-6), "layers_enc": layers_enc, "layers_dec": layers_dec, "list_strides": list_strides, "list_filters": list_filters, "list_kernels": list_kernels, "with_convolution": with_convolution, "with_ctx_input": with_ctx_input, "n_ech": n_ech, "random_state": random_state, } return dict_params