Coverage for uqmodels/modelization/DL_estimator/utils.py: 77%
137 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import os
2import random
4import numpy as np
5import tensorflow as tf
7from uqmodels.utils import apply_mask
10def identity(*args):
11 return args
14def sum_part_prod(array):
15 """compute sum_part_prod
16 array = [k1,...,kn]
17 return (k1+k1k2+k1k2k3+..+k1..Kn)
18 """
19 s = 0
20 for n in range(len(array)):
21 s += np.prod(array[:n])
22 return s
25def size_post_conv(w, l_k, l_st):
26 """provide size post conv (with padding=valid)
27 w : size of window
28 l_k : list kernel
29 l_s : list_stride
30 """
31 curent_s = w
32 for k, st in zip(l_k, l_st):
33 curent_s = np.ceil((curent_s - k + 1) / st)
34 return curent_s
37def find_conv_kernel(window_initial, size_final, list_strides):
38 """Return size of kernel according to :
39 window_initial : size of window
40 size_final : size final
41 list_strides : list of strides
43 return(list_kernel,list_strides)
44 """
46 val = sum_part_prod(list_strides[:-1])
47 float_kernel = (size_final * np.prod(list_strides[:-1]) - window_initial) / val - 1
48 kernel = int(max(np.floor(-float_kernel) - 1, 1))
49 before_last_size = size_post_conv(
50 window_initial, [kernel for i in list_strides[:-1]], list_strides[:-1]
51 )
52 last_kernel = (before_last_size - size_final + 1) / list_strides[-1]
54 if last_kernel < 1:
55 raise (ValueError("Incompatible list_strides values"))
57 list_kernel = [kernel for i in list_strides]
58 list_kernel[-1] = int(last_kernel)
59 return (list_kernel, list_strides)
62class Generator:
63 def __init__(self, X, y, batch_min=64, shuffle=True, random_state=None):
64 self.X = X
65 self.y = y
66 self.factory = identity
67 self.shuffle = shuffle
68 self.batch_min = batch_min
69 self.random_state = random_state
71 def load(self, idx):
72 idx = idx * self.batch_min
73 seuil_min = idx * self.batch_min
74 seuil_max = (idx + 1) * self.batch_min
75 return (self.X[seuil_min:seuil_max], self.y[seuil_min:seuil_max])
77 def __len__(self):
78 return self.X.shape[0] // self.batch_min
80 def __getitem__(self, idx):
81 x, y = self.load(idx)
82 Inputs, Ouputs, _ = self.factory(x, y, fit_rescale=False)
83 return Inputs, Ouputs
85 def __call__(self):
86 step = np.arange(0, self.__len__())
87 if self.shuffle:
88 random.seed(self.random_state)
89 random.Random().shuffle(step)
90 for i in step:
91 yield self.__getitem__(i)
92 if i > self.__len__() - 1:
93 self.on_epoch_end()
95 def on_epoch_end(self):
96 pass
99def set_seeds(seed=None):
100 if seed is not None:
101 os.environ["PYTHONHASHSEED"] = str(seed)
102 random.seed(seed)
103 tf.random.set_seed(seed)
104 np.random.seed(seed)
107def set_global_determinism(seed=None):
108 if seed is not None:
109 set_seeds(seed=seed)
111 os.environ["TF_DETERMINISTIC_OPS"] = "1"
112 os.environ["TF_CUDNN_DETERMINISTIC"] = "1"
114 # tf.config.threading.set_inter_op_parallelism_threads(1)
115 # tf.config.threading.set_intra_op_parallelism_threads(1)
118class Folder_Generator(tf.keras.utils.Sequence):
119 def __init__(
120 self, X, y, metamodel, batch=64, shuffle=True, train=True, random_state=None
121 ):
122 self.X = X
123 self.y = y
124 self.random_state = random_state
125 if X is not None:
126 self.len_ = X[0].shape[0]
127 elif y is not None:
128 self.len_ = y.shape[0]
130 self.train = train
131 self.seed = 0
132 self.shuffle = shuffle
133 self.batch = batch
135 # self.scaler = metamodel.scaler
136 self.factory = metamodel.factory
137 self._format = metamodel._format
138 self.rescale = metamodel.rescale
140 self.causality_remove = None
141 self.model_parameters = metamodel.model_parameters
142 self.past_horizon = metamodel.model_parameters["size_window"]
143 self.futur_horizon = (
144 metamodel.model_parameters["dim_horizon"]
145 * metamodel.model_parameters["step"]
146 )
147 self.size_seq = self.past_horizon + self.futur_horizon + self.batch
148 self.size_window_futur = 1
150 self.n_batch = int(np.ceil(self.len_ / self.batch))
151 self.indices = np.arange(self.n_batch)
153 def load(self, idx):
154 """load seq of data locate at [idx*self.batch-past_horizon, idx*self.batch+self.futur_horizon]"""
155 idx = idx * self.batch
157 idx_min = max(0, idx - self.past_horizon)
158 idx_max = max(self.size_seq + idx_min, idx + self.futur_horizon)
159 # Hold case of last batch : load also end of previous batch to complete last batch
160 if idx > 0:
161 idx_min = max(idx_min - max(0, idx_max - self.len_), 0)
162 y_batch = None
164 if self.y is not None:
165 y_batch = self.y[idx_min:idx_max]
167 if self.X is None:
168 return ([None, None], y_batch)
170 else:
171 return ([self.X[0][idx_min:idx_max], self.X[1][idx_min:idx_max]], y_batch)
173 def __len__(self):
174 return self.n_batch
176 def __getitem__(self, idx):
177 """Get batch by loading seq, apply factory on it, and select the relevant part
179 Args:
180 idx (_type_): _description_
182 Returns:
183 _type_: _description_
184 """
185 if self.shuffle:
186 np.random.seed(self.random_state)
187 np.random.shuffle(self.indices)
188 idx = self.indices[idx]
190 x, y = self.load(idx)
192 if self.train:
193 pass
194 Inputs, Ouputs, _ = self.factory(x, y, fit_rescale=False)
195 selection = np.zeros(len(Inputs[0])) == 1
197 idx_min = max(0, idx * self.batch - self.past_horizon)
198 idx_max = max(
199 self.size_seq + idx_min, idx * self.batch + self.batch + self.futur_horizon
200 )
202 if self.train:
203 padding_test = 0
204 selection[self.past_horizon : -self.futur_horizon] = True
205 padding_test + self.past_horizon - self.futur_horizon
206 else: # hold case of predict for last batch
208 idx_min = max(0, idx * self.batch - self.past_horizon)
209 idx_max = max(
210 self.size_seq + idx_min,
211 idx * self.batch + self.batch + self.futur_horizon,
212 )
214 if idx == 0:
215 if self.batch >= self.len_:
216 selection[0:] = True
217 else:
218 selection[: -self.past_horizon - self.futur_horizon] = True
220 else:
221 # hold case of last batch
222 padding_test = max(self.futur_horizon, idx_max - self.len_)
224 selection[padding_test + self.past_horizon :] = True
226 Inputs = apply_mask(Inputs, selection)
227 Ouputs = apply_mask(Ouputs, selection)
228 return Inputs, Ouputs
230 # shuffles the dataset at the end of each epoch
231 def on_epoch_end(self):
232 if self.shuffle:
233 np.random.shuffle(self.indices)