Coverage for uqmodels/modelization/DL_estimator/utils.py: 77%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1import os 

2import random 

3 

4import numpy as np 

5import tensorflow as tf 

6 

7from uqmodels.utils import apply_mask 

8 

9 

10def identity(*args): 

11 return args 

12 

13 

14def sum_part_prod(array): 

15 """compute sum_part_prod 

16 array = [k1,...,kn] 

17 return (k1+k1k2+k1k2k3+..+k1..Kn) 

18 """ 

19 s = 0 

20 for n in range(len(array)): 

21 s += np.prod(array[:n]) 

22 return s 

23 

24 

25def size_post_conv(w, l_k, l_st): 

26 """provide size post conv (with padding=valid) 

27 w : size of window 

28 l_k : list kernel 

29 l_s : list_stride 

30 """ 

31 curent_s = w 

32 for k, st in zip(l_k, l_st): 

33 curent_s = np.ceil((curent_s - k + 1) / st) 

34 return curent_s 

35 

36 

37def find_conv_kernel(window_initial, size_final, list_strides): 

38 """Return size of kernel according to : 

39 window_initial : size of window 

40 size_final : size final 

41 list_strides : list of strides 

42 

43 return(list_kernel,list_strides) 

44 """ 

45 

46 val = sum_part_prod(list_strides[:-1]) 

47 float_kernel = (size_final * np.prod(list_strides[:-1]) - window_initial) / val - 1 

48 kernel = int(max(np.floor(-float_kernel) - 1, 1)) 

49 before_last_size = size_post_conv( 

50 window_initial, [kernel for i in list_strides[:-1]], list_strides[:-1] 

51 ) 

52 last_kernel = (before_last_size - size_final + 1) / list_strides[-1] 

53 

54 if last_kernel < 1: 

55 raise (ValueError("Incompatible list_strides values")) 

56 

57 list_kernel = [kernel for i in list_strides] 

58 list_kernel[-1] = int(last_kernel) 

59 return (list_kernel, list_strides) 

60 

61 

62class Generator: 

63 def __init__(self, X, y, batch_min=64, shuffle=True, random_state=None): 

64 self.X = X 

65 self.y = y 

66 self.factory = identity 

67 self.shuffle = shuffle 

68 self.batch_min = batch_min 

69 self.random_state = random_state 

70 

71 def load(self, idx): 

72 idx = idx * self.batch_min 

73 seuil_min = idx * self.batch_min 

74 seuil_max = (idx + 1) * self.batch_min 

75 return (self.X[seuil_min:seuil_max], self.y[seuil_min:seuil_max]) 

76 

77 def __len__(self): 

78 return self.X.shape[0] // self.batch_min 

79 

80 def __getitem__(self, idx): 

81 x, y = self.load(idx) 

82 Inputs, Ouputs, _ = self.factory(x, y, fit_rescale=False) 

83 return Inputs, Ouputs 

84 

85 def __call__(self): 

86 step = np.arange(0, self.__len__()) 

87 if self.shuffle: 

88 random.seed(self.random_state) 

89 random.Random().shuffle(step) 

90 for i in step: 

91 yield self.__getitem__(i) 

92 if i > self.__len__() - 1: 

93 self.on_epoch_end() 

94 

95 def on_epoch_end(self): 

96 pass 

97 

98 

99def set_seeds(seed=None): 

100 if seed is not None: 

101 os.environ["PYTHONHASHSEED"] = str(seed) 

102 random.seed(seed) 

103 tf.random.set_seed(seed) 

104 np.random.seed(seed) 

105 

106 

107def set_global_determinism(seed=None): 

108 if seed is not None: 

109 set_seeds(seed=seed) 

110 

111 os.environ["TF_DETERMINISTIC_OPS"] = "1" 

112 os.environ["TF_CUDNN_DETERMINISTIC"] = "1" 

113 

114 # tf.config.threading.set_inter_op_parallelism_threads(1) 

115 # tf.config.threading.set_intra_op_parallelism_threads(1) 

116 

117 

118class Folder_Generator(tf.keras.utils.Sequence): 

119 def __init__( 

120 self, X, y, metamodel, batch=64, shuffle=True, train=True, random_state=None 

121 ): 

122 self.X = X 

123 self.y = y 

124 self.random_state = random_state 

125 if X is not None: 

126 self.len_ = X[0].shape[0] 

127 elif y is not None: 

128 self.len_ = y.shape[0] 

129 

130 self.train = train 

131 self.seed = 0 

132 self.shuffle = shuffle 

133 self.batch = batch 

134 

135 # self.scaler = metamodel.scaler 

136 self.factory = metamodel.factory 

137 self._format = metamodel._format 

138 self.rescale = metamodel.rescale 

139 

140 self.causality_remove = None 

141 self.model_parameters = metamodel.model_parameters 

142 self.past_horizon = metamodel.model_parameters["size_window"] 

143 self.futur_horizon = ( 

144 metamodel.model_parameters["dim_horizon"] 

145 * metamodel.model_parameters["step"] 

146 ) 

147 self.size_seq = self.past_horizon + self.futur_horizon + self.batch 

148 self.size_window_futur = 1 

149 

150 self.n_batch = int(np.ceil(self.len_ / self.batch)) 

151 self.indices = np.arange(self.n_batch) 

152 

153 def load(self, idx): 

154 """load seq of data locate at [idx*self.batch-past_horizon, idx*self.batch+self.futur_horizon]""" 

155 idx = idx * self.batch 

156 

157 idx_min = max(0, idx - self.past_horizon) 

158 idx_max = max(self.size_seq + idx_min, idx + self.futur_horizon) 

159 # Hold case of last batch : load also end of previous batch to complete last batch 

160 if idx > 0: 

161 idx_min = max(idx_min - max(0, idx_max - self.len_), 0) 

162 y_batch = None 

163 

164 if self.y is not None: 

165 y_batch = self.y[idx_min:idx_max] 

166 

167 if self.X is None: 

168 return ([None, None], y_batch) 

169 

170 else: 

171 return ([self.X[0][idx_min:idx_max], self.X[1][idx_min:idx_max]], y_batch) 

172 

173 def __len__(self): 

174 return self.n_batch 

175 

176 def __getitem__(self, idx): 

177 """Get batch by loading seq, apply factory on it, and select the relevant part 

178 

179 Args: 

180 idx (_type_): _description_ 

181 

182 Returns: 

183 _type_: _description_ 

184 """ 

185 if self.shuffle: 

186 np.random.seed(self.random_state) 

187 np.random.shuffle(self.indices) 

188 idx = self.indices[idx] 

189 

190 x, y = self.load(idx) 

191 

192 if self.train: 

193 pass 

194 Inputs, Ouputs, _ = self.factory(x, y, fit_rescale=False) 

195 selection = np.zeros(len(Inputs[0])) == 1 

196 

197 idx_min = max(0, idx * self.batch - self.past_horizon) 

198 idx_max = max( 

199 self.size_seq + idx_min, idx * self.batch + self.batch + self.futur_horizon 

200 ) 

201 

202 if self.train: 

203 padding_test = 0 

204 selection[self.past_horizon : -self.futur_horizon] = True 

205 padding_test + self.past_horizon - self.futur_horizon 

206 else: # hold case of predict for last batch 

207 

208 idx_min = max(0, idx * self.batch - self.past_horizon) 

209 idx_max = max( 

210 self.size_seq + idx_min, 

211 idx * self.batch + self.batch + self.futur_horizon, 

212 ) 

213 

214 if idx == 0: 

215 if self.batch >= self.len_: 

216 selection[0:] = True 

217 else: 

218 selection[: -self.past_horizon - self.futur_horizon] = True 

219 

220 else: 

221 # hold case of last batch 

222 padding_test = max(self.futur_horizon, idx_max - self.len_) 

223 

224 selection[padding_test + self.past_horizon :] = True 

225 

226 Inputs = apply_mask(Inputs, selection) 

227 Ouputs = apply_mask(Ouputs, selection) 

228 return Inputs, Ouputs 

229 

230 # shuffles the dataset at the end of each epoch 

231 def on_epoch_end(self): 

232 if self.shuffle: 

233 np.random.shuffle(self.indices)