import keras.backend as K
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from keras.layers import Input, Lambda, TimeDistributed
from keras.models import Model
from uqmodels.modelization.DL_estimator.metalayers import (
Add_query_to_Z_Processing_with_state,
Double_Moving_slice_layer,
LSTM_DProcessing,
LSTM_EProcessing,
Moving_slice_layer,
mlp,
)
from uqmodels.modelization.UQEstimator import UQEstimator
from uqmodels.utils import Extract_dict, apply_mask, cut, get_fold_nstep, stack_and_roll
tf.keras.backend.set_floatx("float32")
# Dev : Prototype of DEEP Neural network for dynamic capture
# LSTM ED class : Faire hΓ©riter LSTM ED de NNvar
[docs]
class Lstm(UQEstimator):
# LSTM E-D forecast on a sequence ensemble (day) of element (time step) multidimensional (station).
# Model learning on daily sequence object by reccurent process.
# y_encoder : (batch_size,dim_z)
# y_decoder : (batch_size,size_subseq_dec,dim_target*n_parameters)
# Init that not instanciate the NN (just store parameters&instanciator needed for instanciation)
def __init__(
self,
model_parameters,
model_specifications,
architecture_parameters,
training_parameters=dict(),
):
# State variable
super().__init__(
model_specifications["name"], "var_A&E", model_specifications["rescale"]
)
self.initialized = False
self.history = []
self.type_output = model_specifications["type_output"]
self.model_specifications = model_specifications
self.model_parameters = model_parameters
self.training_parameters = {
"epochs": [1000, 1000],
"b_s": [100, 20],
"l_r": [0.01, 0.005],
"verbose": 1,
"list_loss": ["mse"],
"metrics": None,
"param_loss": None,
"redundancy": 1,
}
for key in training_parameters.keys():
self.training_parameters[key] = training_parameters[key]
list_keys = [
"size_window_past",
"size_window_futur",
"size_subseq_dec",
"size_subseq_enc",
"padding_past",
"padding_futur",
]
(
size_window_past,
size_window_futur,
size_subseq_dec,
size_subseq_enc,
padding_past,
padding_futur,
) = Extract_dict(model_parameters, list_keys)
if not (
((size_window_past) % padding_past == 0)
& ((size_window_futur - size_subseq_dec) % padding_futur == 0)
):
raise Exception("size window, subsequence & padding incompatible")
self.training_parameters = {
"epochs": [1000, 1000],
"b_s": [100, 20],
"l_r": [0.01, 0.005],
"verbose": 1,
"list_loss": ["mse"],
"metrics": None,
"param_loss": None,
"generator": True,
}
for key in training_parameters.keys():
self.training_parameters[key] = training_parameters[key]
# Parameter variable
list_keys = ["dim_lt", "dim_target", "dim_target_in", "dim_z", "dp", "dp_r"]
dim_lt, dim_target, dim_target_in, dim_z, dp, dp_r = Extract_dict(
self.model_parameters, list_keys
)
dict_to_add = {
"dyn_encoder": {},
"ctx_decoder": {"dim_in": 10}, # Unused for the moment
"ctx_encoder": {
"dim_in": dim_lt,
"dp": dp,
"type_output": None,
"dim_out": None,
},
"y_encoder": {
"dp": dp,
"type_output": None,
"dim_out": None,
},
"y_decoder": {
"dim_in": dim_z,
"dp": dp,
"type_output": self.type_output,
"dim_out": size_subseq_dec * dim_target,
},
}
# filling missing subpart
for subpart in ["ctx_encoder", "y_encoder", "dyn_encoder", "y_decoder"]:
if subpart not in architecture_parameters.keys():
architecture_parameters[subpart] = {"builder": mlp, "name": subpart}
for subpart in architecture_parameters.keys():
for add_key in dict_to_add[subpart].keys():
add_values = dict_to_add[subpart][add_key]
architecture_parameters[subpart][add_key] = add_values
self.architecture_parameters = architecture_parameters
self.model = None
self.model_enc = None
self.model_pred = None
# Model instantiation from parameters&instanciator.
[docs]
def model_initializer(self):
# New functionality disabled : preform ctx + dynamic pred.
with_ctx_predictor = False
list_keys = [
"dim_lt",
"dim_target_in",
"dim_target",
"dim_z",
"dim_st",
"dp",
"dp_r",
]
flag_mc = None
if (self.type_output == "MC_Dropout") or (self.type_output == "MCDP"):
flag_mc = 1
dim_lt, dim_target_in, _dim_target, dim_z, dim_st, dp, dp_r = Extract_dict(
self.model_parameters, list_keys
)
if dp_r is None:
dp = dp_r
list_keys = [
"with_loop",
"only_futur",
"with_dyn_feature",
"with_static_feature",
"with_query",
]
(
with_loop,
only_futur,
with_dyn_feature,
with_static_feature,
with_query,
) = Extract_dict(self.model_specifications, list_keys)
list_keys = [
"size_window_past",
"size_window_futur",
"size_subseq_dec",
"size_subseq_enc",
"padding_past",
"padding_futur",
]
(
size_window_past,
size_window_futur,
size_subseq_dec,
size_subseq_enc,
padding_past,
padding_futur,
) = Extract_dict(self.model_parameters, list_keys)
# Instanciation of the y_encoder subpart
y_encoder = self.architecture_parameters["y_encoder"]["builder"](
**self.architecture_parameters["y_encoder"]
)
# Instanciation of the y_decoder subpart
y_decoder = self.architecture_parameters["y_decoder"]["builder"](
**self.architecture_parameters["y_decoder"]
)
n_step_futur = get_fold_nstep(size_window_futur, size_subseq_dec, padding_futur)
n_step_past = get_fold_nstep(
size_window_past + size_subseq_enc, size_subseq_enc, padding_past
)
if with_ctx_predictor:
# ctx_decoder = self.architecture_parameters["ctx_decoder"]["builder"](
# **self.architecture_parameters["ctx_decoder"]
# )
self.architecture_parameters["dim_in"] = dim_z
# Main layer definition.
# Factory : Contextual preprocessing layers.
z_embedding_past = []
list_input = []
# Contextual features (dim_t,dim_lt,dim_n) -> (dim_t,dim_lt,batch size).
if with_static_feature or with_query:
# Instanciation of the ctx_encoder subpart
ctx_encoder = self.architecture_parameters["ctx_encoder"]["builder"](
**self.architecture_parameters["ctx_encoder"]
)
full_query = size_window_past + 1
ignore_futur = None
if with_query:
ignore_futur = full_query
full_query = full_query + size_window_futur
Input_query = Input(shape=(full_query, dim_lt), name="f_static_past")
list_input.append(Input_query)
# Defintiion of static encoder part
if with_static_feature:
# Select and fold pastZ_futur_query
past_selected_query = Moving_slice_layer(
n_step_past, padding_past, ignore_futur=ignore_futur
)(Input_query)
# Encode pastZ_futur_query
Z_past_query = TimeDistributed(ctx_encoder, name="TD_ctx_encoder")(
past_selected_query
)
z_embedding_past.append(Z_past_query)
if with_query:
# Select and double fold "futur"Z_futur_query for prediction
futur_query = Double_Moving_slice_layer(
n_step_past,
n_step_futur,
padding_past,
padding_futur,
depth_slice=1,
)(Input_query)
Z_futur_query = TimeDistributed(
TimeDistributed(ctx_encoder), name="TD2_futur_query_encoder"
)(futur_query)
# Dynamic features -> (dim_t,size_window_futur,dim_target,dim_n) ->
# (dim_t,size_window_futur,dim_target, batch size)
if with_dyn_feature:
dyn_encoder = self.architecture_parameters["dyn_encoder"]["builder"](
**self.architecture_parameters["dyn_encoder"]
)
x_dyn_past = Input(shape=(size_window_past, dim_st), name="f_dyn_past")
list_input.append(x_dyn_past)
# Reformatting as a 3D tensor : (sequence of subsequence)
T_dyn_past = Moving_slice_layer(
n_step_past, padding_past, depth_slice=size_subseq_enc
)(x_dyn_past)
# Dynamic futur embedding
z_dyn_past = TimeDistributed(dyn_encoder, name="TD_dyn_encoder")(T_dyn_past)
z_embedding_past.append(z_dyn_past[:, 1:, :])
x_lag_past = Input(
shape=(size_window_past + size_subseq_enc, dim_target_in),
name="f_lag_past",
)
list_input.append(x_lag_past)
# Reformatting as a 3D tensor : (sequence of subsequence)
x_lag_stacked = Moving_slice_layer(
n_step_past, padding_past, depth_slice=size_subseq_enc
)(x_lag_past)
z_lag_past = TimeDistributed(y_encoder, name="TD_y_encoder")(x_lag_stacked)
z_embedding_past.append(z_lag_past)
z_lag_futur = z_lag_past
# Dynamic features -> (dim_t,size_window_futur,dim_target,dim_n) ->
# (dim_t,size_window_futur,dim_target, batch size)
# Preprocessing interconnexion Concatenate Past ctx+dyn representation
input_Processing = Lambda(lambda x: K.concatenate(x), name="Concat_past_z")(
z_embedding_past
)
# Instanciation of the Encoder block
EProcessing = LSTM_EProcessing(
n_step_past,
input_Processing.shape[-1],
dim_z,
flag_mc,
dp=dp,
dp_r=dp_r,
l1_l2_reg=(0.0000001, 0.0000001),
)
# Instanciation of the Decoder block
DProcessing = LSTM_DProcessing(
n_step_futur,
dim_z,
flag_mc,
dp=dp,
dp_r=dp_r,
l1_l2_reg=(0.0000001, 0.0000001),
)
# Improvement : Better Fusion / Attention mecanisme between feature type.
# Actually : Done be Fusion are realized using basic dense layers
# Encoding
Z_processing = EProcessing(input_Processing)
# If mode with-out explicit loop
if not with_loop:
# Duplication of Z_processing to feed DPprocessing block
T_lstm_enc = Lambda(
lambda x: K.repeat_elements(x[:, :, None, :], n_step_futur, 2),
name="repeat_h1",
)(Z_processing)
print("log : mode_init", T_lstm_enc.shape, Z_futur_query.shape)
# Add Z_query_information for each futur prediction
if with_query:
T_enc_futur = Add_query_to_Z_Processing_with_state(dim_z)(
T_lstm_enc, Z_futur_query
)
# No Z_query_information
else:
T_enc_futur = T_lstm_enc
print(
"log : mode_init",
T_lstm_enc.shape,
Z_futur_query.shape,
T_enc_futur.shape,
)
# Apply DProcessing block : For each step_representation (including present-step)
if not only_futur:
z_lstm_dec = TimeDistributed(DProcessing, name="TD_lstm_dec")(
T_enc_futur
)
print("z_lstm_dec", z_lstm_dec.shape)
# Interpret Z_DProcessing for past windows and futur window
output = TimeDistributed(
TimeDistributed(y_decoder), name="TD_TD_y_decoder"
)(z_lstm_dec)
print("TD_TD_y_decoder", output.shape)
# if with_ctx_predictor:
# print(Z_futur_query.shape)
# output_ctx = TimeDistributed(TimeDistributed(
# ctx_decoder), name='TD_TD_ctx_decoder')(Z_futur_query)
# output = output_ctx+output
# Apply & DProcessing & y_decoder only for present-step_representation
z_lstm_dec_futur = DProcessing(T_enc_futur[:, -1, :, :])
print("z_lstm_dec_futur", z_lstm_dec_futur.shape)
output_pred_NN = TimeDistributed(y_decoder)(z_lstm_dec_futur)
print("output_pred_NN", output_pred_NN.shape)
# if with_ctx_predictor:
# output_pred_NN_bis = TimeDistributed(
# ctx_decoder)(Z_futur_query[:, -1, :, :])
# output_pred_NN = output_pred_NN+output_pred_NN_bis
# Loop learning
else:
prediction = []
for i in range(1): # Init loop
if with_query:
T_enc_futur_next = Lambda(
lambda x: K.concatenate(x)[:, :, None, :], name="Concat_T_futur"
)(
[
Z_processing,
Z_futur_query[:, :, 0],
z_lag_futur[:, :, -1],
]
)
else:
T_enc_futur_next = Lambda(
lambda x: K.concatenate(x)[:, :, None, :], name="Concat_T_futur"
)([Z_processing, z_lag_futur[:, :, -1]])
z_lstm_dec = DProcessing(T_enc_futur_next)
pred_temp = TimeDistributed(y_decoder)(z_lstm_dec[:, :, 0])
prediction.append(pred_temp[:, :, None])
z_lag_futur = TimeDistributed(y_encoder)(pred_temp)
for i in range(n_step_futur - 1):
if with_query:
T_enc_futur_next = Lambda(
lambda x: K.concatenate(x)[:, :, None, :], name="Concat_T_futur"
)(
[
Z_processing[:, i: n_step_past + i],
z_lag_futur[:, :, 0, :],
z_lstm_dec[:, :, 0, :],
]
)
else:
T_enc_futur_next = Lambda(
lambda x: K.concatenate(x)[:, :, None, :], name="Concat_T_futur"
)([z_lag_futur[:, :, 0, :], z_lstm_dec[:, :, 0, :]])
z_lstm_dec = TimeDistributed(DProcessing)(T_enc_futur_next)
pred_temp = TimeDistributed(y_decoder)(z_lstm_dec[:, :, 0])
prediction.append(pred_temp[:, :, None])
z_lag_futur = TimeDistributed(y_encoder)(pred_temp)
output = Lambda(
lambda x: K.concatenate(x)[:, :, :], axis=2, name="Concat_output"
)(prediction)
# M : FULL model
if size_subseq_dec == 0:
output = output[:, :, :, :]
output_pred_NN = output_pred_NN[:, :, :]
if not (only_futur):
self.model = Model(list_input, output)
self.model_pred = Model(list_input, output_pred_NN)
else:
self.model = Model(list_input, output_pred_NN)
self.model_enc = Model(list_input, Z_processing)
# Mcut Partial model : Encoded embedding recovering.
# self.modelcut = Model(inputs, c_h1)
# Fit procedure Maybe partly redundant to NN fiting procedure : Refactoring needed
[docs]
def fit(
self,
Inputs,
Targets=None,
validation_data=None,
epochs=None,
steps_per_epoch=None,
b_s=None,
l_r=None,
list_loss=None,
param_loss=None,
shuffle=True,
sample_weight=None,
verbose=None,
metrics=None,
callbacks=None,
generator=None,
validation_freq=1,
**kwargs
):
if generator is None:
generator = self.training_parameters["generator"]
if epochs is None:
epochs = self.training_parameters["epochs"]
if callbacks is None:
callbacks = self.training_parameters["callbacks"]
if b_s is None:
b_s = self.training_parameters["b_s"]
if l_r is None:
l_r = self.training_parameters["l_r"]
if list_loss is None:
list_loss = self.training_parameters["list_loss"]
if param_loss is None:
param_loss = self.training_parameters["param_loss"]
if metrics is None:
metrics = self.training_parameters["metrics"]
if verbose is None:
verbose = self.training_parameters["verbose"]
if not (self.initialized):
self.model_initializer()
self.initialized = True
# Training function
history = []
if validation_data is None:
np.random.seed(0)
if isinstance(Inputs, list):
train = np.random.rand(len(Inputs[0])) < 0.9
else:
train = np.random.rand(len(Inputs)) < 0.9
test = np.invert(train)
validation_data = (apply_mask(Inputs, test), apply_mask(Targets, test))
Targets = apply_mask(Targets, train)
Inputs = apply_mask(Inputs, train)
if not (hasattr(self, "scaler")):
_, _, _ = self.factory(Inputs, Targets, only_fit_scaler=True)
for n, loss in enumerate(list_loss):
for i, batch_size in enumerate(b_s):
if param_loss is not None:
loss_ = loss(param_loss[n])
else:
loss_ = loss
print("l_r", l_r[i])
self.model.compile(
optimizer=tf.optimizers.Nadam(learning_rate=l_r[i]),
loss=loss_,
metrics=metrics,
)
# Instanciate generator
if generator:
In_ = self.Build_generator(
Inputs,
Targets,
batch_min=batch_size,
shuffle=shuffle,
train=True,
)
Tar_ = None
validation_data_ = self.Build_generator(
validation_data[0],
validation_data[1],
batch_min=batch_size,
shuffle=False,
train=False,
)
if steps_per_epoch is None:
steps_per_epoch = In_.__len__()
validation_steps = validation_data_.__len__()
batch_size = None
else:
In_ = Inputs
Tar_ = Targets
validation_data_ = validation_data
validation_steps = None
steps_per_epoch = None
self.history.append(
self.model.fit(
In_,
Tar_,
validation_data=validation_data_,
epochs=epochs[i],
steps_per_epoch=steps_per_epoch,
validation_steps=validation_steps,
batch_size=batch_size,
# sample_weight=sample_weight,
shuffle=shuffle,
callbacks=callbacks,
validation_freq=validation_freq,
verbose=verbose,
)
)
self.save("", "lstm_tmp")
return history
[docs]
def predict(self, Inputs, n_ech=6, mask_h=0, mask_m=[0], generator=None, **kwargs):
"""Predict procedure Maybe partly redundant to NN fiting procedure : Refactoring needed
Args:
Inputs (List of NN features): Model inputs
n_ech (int, optional): Number of MC-DP inferences.
Returns:
output: Meta-model output tuple of (Prediction,Var_A,Var_E)
"""
if generator is None:
generator = self.training_parameters["generator"]
if generator:
Inputs = self.Build_generator(
Inputs, Inputs[1], batch_min=5000, shuffle=False, train=False
)
dim_out = self.model_parameters["dim_target"]
only_futur = self.model_specifications["only_futur"]
if only_futur:
model = self.model
else:
model = self.model_pred
var_a, var_e, output = None, None, None
if self.type_output is None:
pred = model.predict(Inputs)[:, mask_h, mask_m]
var_a, var_e = pred * 0 + 1, pred * 0
UQ = np.concatenate([var_a[None, :], var_e[None, :]], axis=0)
elif self.type_output == "EDL":
out = model.predict(Inputs)[:, mask_h, mask_m]
gamma, vu, alpha, beta = tf.split(out, 4, -1)
if hasattr(gamma, "numpy"):
gamma = gamma.numpy()
pred = gamma
var_a = beta / (alpha - 1)
var_e = beta / (vu * (alpha - 1))
UQ = np.concatenate([var_a[None, :], var_e[None, :]], axis=0)
elif self.type_output == "MC_Dropout":
if generator:
pred = []
var_a = []
var_e = []
for Inputs_gen, _ in Inputs: # by batch do n inf and aggreagate results
output = []
for i in range(n_ech):
output.append(model.predict(Inputs_gen)[:, mask_h, mask_m])
output = np.array(output)
pred.append(output[:, :, :, :dim_out].mean(axis=0))
var_a.append(np.exp(output[:, :, :, dim_out:]).mean(axis=0))
var_e.append(output[:, :, :, :dim_out].var(axis=0))
pred = np.concatenate(pred, axis=0)
var_a = np.concatenate(var_a, axis=0)
var_e = np.concatenate(var_e, axis=0)
UQ = np.concatenate([var_a[None, :], var_e[None, :]], axis=0)
else:
output = []
for i in range(n_ech):
output.append(model.predict(Inputs)[:, mask_h, mask_m])
output = np.array(output)
pred = output[:, :, :, :dim_out].mean(axis=0)
var_a = np.exp(output[:, :, :, dim_out:]).mean(axis=0)
var_e = output[:, :, :, :dim_out].var(axis=0)
UQ = np.concatenate([var_a[None, :], var_e[None, :]], axis=0)
_, pred = self._format(None, pred, "inverse_transform")
_, UQ = self._format(None, UQ, "inverse_transform", mode_UQ=True)
return (pred, UQ)
def _format(self, X, y, type_transform, mode_UQ=None):
"""Formatting features : Rework needed be : would be call to supermethod of NN_UQ
Args:
X (_type_): Input to reformate
y (_type_): Target or output (Pred or var) to reforma
fit (bool, optional): True -> fit scaler
mode (_type_, optional): Specify the nature of y than impact reformatting (if pred or var)
flag_inverse (bool, optional): Specify if normalisation or inverse_normalisation
Returns:
_type_: tupple (X,y)
"""
if self.rescale:
shape = y.shape
if y is not None and len(y.shape) == 3:
print("log format : strange_reshape :", y.shape)
size_motif = y.shape[1]
y_new = [
super(Lstm, self)._format(X, y[:, i, :], type_transform, mode_UQ)[
1
][:, None, :]
for i in range(size_motif)
]
y = np.concatenate(y_new, axis=1)
else:
X, y = super(Lstm, self)._format(X, y, type_transform, mode_UQ)
if y.shape != shape:
y = y.reshape(shape)
y = np.squeeze(y)
return X, y
[docs]
def factory(
self,
X,
y,
mask=None,
cut_param=None,
fit_rescale=True,
causality_remove=None,
redundancy=None,
only_fit_scaler=False,
):
"""Feature factory Reshape and redundundization (Moving window embedding representation)
Args:
X (_type_): X list contains (X_ctx, X_seq)
y (_type_): Raw Y
mask (_type_, optional): Mask of non-data
cut_param (_type_, optional): cut paramaters on y distrubution
Returns:
(Inputs,targets,mask): model Inputs, Targets and mask.
"""
X_ctx, X_seq = X
with_static_feature = self.model_specifications["with_static_feature"]
only_futur = self.model_specifications["only_futur"]
with_query = self.model_specifications["with_query"]
if redundancy is None:
redundancy = self.training_parameters["redundancy"]
if y is None:
print("Factory log : Predict mode")
redundancy = 1
if cut_param is None:
y_cut = y
else:
min_cut, max_cut = cut_param
y_cut = cut(y, min_cut, max_cut)
X_seq = cut(X_seq, min_cut, max_cut)
X_ctx, y_cut = self._format(X_ctx, y_cut, type_transform="fit_transform")
X_ctx, X_seq = self._format(X_ctx, X_seq, type_transform="fit_transform")
if only_fit_scaler:
return ([X_ctx, X_seq], y_cut, None)
list_keys = [
"size_window_past",
"size_window_futur",
"size_subseq_dec",
"size_subseq_enc",
"padding_past",
"padding_futur",
]
(
size_window_past,
size_window_futur,
size_subseq_dec,
size_subseq_enc,
padding_past,
padding_futur,
) = Extract_dict(self.model_parameters, list_keys)
paramaters = self.model_parameters
# Factory ctx_features !
horizon = size_window_past + 1
Inputs = []
if with_static_feature:
lag = 0
if "factory_lag_lt" in paramaters.keys():
lag = paramaters["factory_lag_lt"]
if "factory_lag_target" in paramaters.keys():
lag = +paramaters["factory_lag_target"]
if not (with_query):
horizon = size_window_past + 1
x_lt = stack_and_roll(X_ctx, horizon, lag=lag)
Inputs.append(x_lt)
else:
horizon_f = size_window_futur
x_query = stack_and_roll(
X_ctx, horizon + horizon_f, lag=lag + horizon_f
)
if causality_remove is not None:
for i in range(1, horizon_f):
x_query[:, horizon + i, causality_remove] = x_query[
:, horizon, causality_remove
]
Inputs.append(x_query)
# Factory lag_features :
horizon = size_window_past + size_subseq_enc
lag = 0
if "factory_lag_st" in paramaters.keys():
lag = paramaters["factory_lag_st"]
x_lag_lstm = stack_and_roll(X_seq, horizon, lag=lag - 1)
Inputs.append(x_lag_lstm)
# Factory Target :
lag = 0
if "factory_lag_target" in paramaters.keys():
lag = paramaters["factory_lag_target"]
horizon = size_window_past + size_window_futur
y_lstm = stack_and_roll(y_cut, horizon, lag=size_window_futur - lag - 1)
y_deep = double_roll(
y_lstm,
size_window_past,
size_window_futur,
size_subseq_dec,
padding_past,
padding_futur,
only_futur,
)
Targets = [y_deep.astype(np.float32)]
if redundancy < 1:
selection = np.ones(len(Inputs[0])) == 1
removed = np.random.choice(
np.nonzero(selection)[0],
int(len(Inputs[0]) * (1 - redundancy)),
replace=False,
)
selection[removed] = False
Inputs = apply_mask(Inputs, selection)
Targets = apply_mask(Targets, selection)
# mask = apply_mask(mask, selection)
return (Inputs, Targets, mask)
[docs]
def modify_dropout(self, dp):
"""Method to modify dp : have to be extend as API modification to model features.
Args:
dp (_type_): ddropout rate.
"""
# For dropout, as rate is implictly saved in layers, we create new model with new dropout and transfer weight
# Maybe be optimized in only modify layers paramters !
self.model_parameters["dp"] = dp
self.model.save_weights("test")
self.model_initializer()
self.model.load_weights("test")
[docs]
def get_params(self):
# get parameters, refactoring needed to follows scikit standars.
dict_param = {
"model_parameters": self.model_parameters,
"architecture_parameters": self.architecture_parameters,
"model_specifications": self.model_specifications,
"training_parameters": self.training_parameters,
}
return dict_param
[docs]
def save(self, path, name=None):
# save model
if name:
self.model.save_weights(path + name)
else:
self.model.save_weights(path + "Lstm_ed_" + self.name)
[docs]
def load(self, path, name=None):
# load saved model
if name:
self.model.load_weights(path + name)
else:
self.model.load_weights(path + "Lstm_ed_" + self.name)
[docs]
def reset(self):
# reset model
if hasattr(self, "model"):
del self.model
if hasattr(self, "model_enc"):
del self.model_enc
if hasattr(self, "model_pred"):
del self.model_pred
self.initialized = False
[docs]
def delete(self):
# delete model
if hasattr(self, "model"):
del self.model
if hasattr(self, "model_enc"):
del self.model_enc
if hasattr(self, "model_pred"):
del self.model_pred
del self.model_parameters
del self.architecture_parameters
del self.model_specifications
del self.training_parameters
[docs]
def Build_generator(self, X, y, batch_min=32, shuffle=True, train=True):
return LSTM_ED_Generator(
X, y, self, batch_min=batch_min, shuffle=shuffle, train=train
)
[docs]
def plot_metrics(self, name_loss="val_loss"):
"""Plot metrics values recovers form tensorflow metrics callback
Args:
name_loss (str, optional):metrics to visualisze.
"""
metrics = [history.history[name_loss] for history in self.history]
phase = [len(i) for i in metrics]
metrics = np.concatenate(metrics)
plt.figure()
plt.plot(metrics)
for i in phase:
plt.vlines(i, metrics.min(), metrics.max(), ls="--", label="end phase")
plt.ylim(
np.quantile(metrics, 0.005) + 0.01 * np.abs(np.quantile(metrics, 0.99)),
np.quantile(metrics, 0.995) - 0.01 * np.abs(np.quantile(metrics, 0.01)),
)
plt.show()
[docs]
class LSTM_ED_Generator(tf.keras.utils.Sequence):
def __init__(self, X, y, metamodel, batch_min=64, shuffle=True, train=True):
self.X_ctx = X[0]
self.X_seq = X[1]
self.y = y
self.lenX = self.X_ctx.shape[0]
self.train = train
self.seed = 0
self.shuffle = shuffle
self.redundancy = metamodel.training_parameters["redundancy"]
self.batch_min = batch_min
# self.scaler = metamodel.scaler
self.factory = metamodel.factory
self._format = metamodel._format
self.rescale = metamodel.rescale
self.causality_remove = None
self.model_parameters = metamodel.model_parameters
self.model_specifications = metamodel.model_specifications
self.size_subseq_enc = metamodel.model_parameters["size_subseq_enc"]
self.size_window_past = metamodel.model_parameters["size_window_past"]
self.size_window_futur = metamodel.model_parameters["size_window_futur"]
if not (self.train):
set_off = 0
div = self.lenX // self.batch_min
mod = (self.lenX % self.batch_min) / self.batch_min
else:
set_off = self.size_window_futur
div = (self.lenX - set_off) // self.batch_min
mod = 0
self.len_ = div + int(np.ceil(mod))
if shuffle:
self.indices = np.arange(self.len_)
np.random.shuffle(self.indices)
[docs]
def load(self, idx):
idx = idx * self.batch_min
seuil_min = max(0, idx - (self.size_subseq_enc + self.size_window_past))
size_full_seq = (
self.size_subseq_enc
+ self.size_window_past
+ self.size_window_futur
+ self.batch_min
)
if not (self.train):
seuil_max = max(
self.size_subseq_enc
+ self.size_window_past
+ self.batch_min
- seuil_min,
idx + self.batch_min + self.size_window_futur,
)
seuil_max = min(self.lenX, seuil_max)
if (seuil_max - seuil_min) < size_full_seq:
diff = size_full_seq - (seuil_max - seuil_min)
seuil_min = max(0, seuil_min - diff)
else:
seuil_max = max(
size_full_seq + seuil_min, idx + self.batch_min + self.size_window_futur
)
return (
[self.X_ctx[seuil_min:seuil_max], self.X_seq[seuil_min:seuil_max]],
self.y[seuil_min:seuil_max],
)
def __len__(self):
return self.len_
def __getitem__(self, idx):
if self.shuffle:
idx = self.indices[idx]
x, y = self.load(idx)
redundancy = 1
if self.train:
redundancy = None
Inputs, Ouputs, _ = self.factory(
x, y, causality_remove=None, fit_rescale=False, redundancy=redundancy
)
if not (self.train):
set_off = self.size_window_futur
else:
set_off = self.size_window_futur
seq_len = len(Inputs[0])
selection = np.zeros(len(Inputs[0])) == 1
if idx * self.batch_min < (self.size_subseq_enc + self.size_window_past):
selection[idx * self.batch_min: idx * self.batch_min + self.batch_min] = (
True
)
else:
corr = 0
if (idx + 1) * self.batch_min + set_off > self.lenX:
corr = ((idx + 1) * self.batch_min + set_off) - self.lenX
selection[
seq_len - self.batch_min - set_off + corr: seq_len - set_off + corr
] = True
Inputs = apply_mask(Inputs, selection)
Ouputs = apply_mask(Ouputs, selection)
return Inputs, Ouputs
# shuffles the dataset at the end of each epoch
[docs]
def on_epoch_end(self):
if self.shuffle:
np.random.shuffle(self.indices)
[docs]
def double_roll(
y,
size_window_past,
size_window_futur,
size_subseq_dec,
padding_past,
padding_futur,
only_futur=False,
):
n_step_Zdec = get_fold_nstep(size_window_futur, size_subseq_dec, padding_futur)
n_step_Zenc = get_fold_nstep(size_window_past, n_step_Zdec, padding_past) + 1
if (padding_futur == size_subseq_dec) & (padding_past == size_window_futur):
y_n = y.reshape((y.shape[0], -1, size_subseq_dec, y.shape[-1]))
y_n = y_n.reshape(
(y.shape[0], n_step_Zenc, n_step_Zdec, size_subseq_dec, y.shape[-1])
)
if only_futur:
y_n = y_n[:, -1]
else:
if only_futur:
y_n = np.zeros((y.shape[0], n_step_Zdec, size_subseq_dec, y.shape[-1]))
for j in range(n_step_Zdec):
begin = (n_step_Zenc - 1) * padding_past + j * padding_futur
end = begin + size_subseq_dec
y_n[:, j, :, :] = y[:, begin:end]
else:
y_n = np.zeros(
(y.shape[0], n_step_Zenc, n_step_Zdec, size_subseq_dec, y.shape[-1])
)
for i in range(n_step_Zenc):
for j in range(n_step_Zdec):
begin = i * padding_past + j * padding_futur
end = begin + size_subseq_dec
y_n[:, i, j, :, :] = y[:, begin:end]
return y_n