Coverage for uqmodels / modelization / DL_estimator / data_embedding.py: 56%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 08:15 +0000

1import math 

2 

3import numpy as np 

4import tensorflow as tf 

5import tensorflow.keras.backend as K 

6from keras.layers import LeakyReLU 

7from tensorflow.keras import layers 

8 

9from uqmodels.modelization.DL_estimator.utils import ( 

10 find_conv_kernel, 

11 set_global_determinism, 

12) 

13 

14from ...utils import add_random_state 

15 

16# tf.keras.utils.get_custom_objects().clear() 

17 

18 

19# Restructuration de code : Functionnalité de layers-preprocessing pour NN (Transformer ?) 

20 

21 

22class Mouving_Windows_Embedding(layers.Layer): 

23 def __init__( 

24 self, size_window, n_windows=1, step=1, dim_d=1, dim_chan=1, seed=None, **kwargs 

25 ): 

26 """_summary_ 

27 

28 Args: 

29 sub_seq_size (_type_): _description_ 

30 size_window (_type_): _description_ 

31 dim_out (int, optional): _description_. Defaults to 1. 

32 padding (int, optional): _description_. Defaults to 1. 

33 seed (bool): handle experimental random using seed. 

34 """ 

35 super(Mouving_Windows_Embedding, self).__init__() 

36 self.size_window = size_window 

37 self.n_windows = n_windows 

38 self.step = step 

39 self.dim_d = dim_d 

40 self.dim_chan = dim_chan 

41 self.seed = seed 

42 self.last_shape = self.size_window * self.dim_d * self.dim_chan 

43 

44 def call(self, inputs, mode="encoder"): 

45 """_summary_ 

46 

47 Args: 

48 inputs (_type_): _description_ 

49 mode (str, optional): _description_. Defaults to "encoder". 

50 

51 Returns: 

52 _type_: _description_ 

53 """ 

54 output = inputs 

55 if len(output.shape) == 3: 

56 output = output[:, :, :, None] 

57 

58 slide_tensor = [] 

59 for i in range(self.n_windows): 

60 slide_tensor.append( 

61 inputs[:, (i * self.step): (i * self.step) + self.size_window] 

62 ) 

63 MWE_raw = K.stack(slide_tensor, axis=1) 

64 MWE = K.reshape( 

65 MWE_raw, (-1, self.n_windows, self.size_window * self.dim_d * self.dim_chan) 

66 ) 

67 

68 return MWE 

69 

70 

71class Mouving_Window_Embedding(layers.Layer): 

72 def __init__(self, sub_seq_size, size_window, dim_out=1, padding=1, seed=None): 

73 super(Mouving_Window_Embedding, self).__init__() 

74 self.sub_seq_size = sub_seq_size 

75 self.dim_out = dim_out 

76 self.size_window = size_window 

77 self.padding = padding 

78 self.seed = seed 

79 

80 def call(self, inputs, mode="encoder"): 

81 """_summary_ 

82 

83 Args: 

84 inputs (_type_): _description_ 

85 mode (str, optional): _description_. Defaults to "encoder". 

86 

87 Returns: 

88 _type_: _description_ 

89 """ 

90 size_output = None 

91 if mode == "encoder": 

92 size_output = self.size_window 

93 elif mode == "decoder": 

94 size_output = self.size_window + self.dim_out - 1 

95 else: 

96 print("Mode error") 

97 if self.sub_seq_size == 1: 

98 return inputs 

99 else: 

100 slide_tensor = [] 

101 for i in range(size_output): 

102 slide_tensor.append( 

103 inputs[ 

104 :, (i * self.padding): (i * self.padding) + self.sub_seq_size 

105 ] 

106 ) 

107 MWE_raw = K.stack(slide_tensor, axis=1) 

108 MWE = K.reshape( 

109 MWE_raw, (-1, size_output, self.sub_seq_size * inputs.shape[-1]) 

110 ) 

111 return MWE 

112 

113 

114@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

115class Conv2D(tf.keras.layers.Conv2D): 

116 pass 

117 

118 

119@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

120class Dropout(tf.keras.layers.Dropout): 

121 pass 

122 

123 

124@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

125class Conv1D(tf.keras.layers.Conv1D): 

126 pass 

127 

128 

129@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

130class Mouving_conv_Embedding(layers.Layer): 

131 def __init__( 

132 self, 

133 size_window, 

134 n_windows, 

135 step=1, 

136 dim_d=1, 

137 dim_chan=1, 

138 use_conv2D=True, 

139 list_filters=None, 

140 list_strides=[2, 1, 1], 

141 list_kernels=None, 

142 dp=0.01, 

143 flag_mc=False, 

144 seed=None, 

145 **kwargs 

146 ): 

147 """_summary_ 

148 

149 Args: 

150 sub_seq_size (_type_): _description_ 

151 size_window (_type_): _description_ 

152 dim_out (int, optional): _description_. Defaults to 1. 

153 padding (int, optional): _description_. Defaults to 1. 

154 seed (bool): handle experimental random using seed. 

155 """ 

156 

157 self.size_window = size_window 

158 self.n_windows = n_windows 

159 self.step = step 

160 self.dim_d = dim_d 

161 self.dim_chan = dim_chan 

162 self.use_conv2D = use_conv2D 

163 self.flag_mc = flag_mc 

164 self.seed = seed 

165 self.mutliscale = False 

166 set_global_determinism(self.seed) 

167 

168 if list_filters is None: 

169 list_filters = [64 for i in list_strides] 

170 

171 if list_kernels is None: 

172 list_kernels, list_strides = find_conv_kernel( 

173 self.size_window, n_windows, list_strides 

174 ) 

175 list_filters.append(list_filters[-1]) 

176 

177 super().__init__(**kwargs) 

178 

179 self.list_strides = list_strides 

180 self.list_kernels = list_kernels 

181 self.list_filters = list_filters 

182 

183 self.layers = [] 

184 for n, (filters, kernel, strides) in enumerate( 

185 zip(list_filters, list_kernels, list_strides) 

186 ): 

187 if use_conv2D: 

188 if n == 0: 

189 kernel = (kernel, dim_d) 

190 else: 

191 kernel = (kernel, 1) 

192 self.layers.append( 

193 Conv2D( 

194 filters, 

195 kernel, 

196 strides=strides, 

197 padding="valid", 

198 activation="relu", 

199 ) 

200 ) 

201 if dp > 0: 

202 self.layers.append(Dropout(dp, seed=add_random_state(seed, n))) 

203 

204 else: 

205 self.layers.append( 

206 Conv1D( 

207 filters * dim_chan * dim_d, 

208 kernel, 

209 strides=strides, 

210 padding="valid", 

211 groups=dim_chan * dim_d, 

212 activation="relu", 

213 ) 

214 ) 

215 

216 # self.layers.append(tf.keras.layers.BatchNormalization()) 

217 # self.layers.append(tf.keras.layers.Dropout(dp)) 

218 

219 if use_conv2D: 

220 self.last_shape = list_filters[-1] 

221 else: 

222 self.last_shape = list_filters[-1] * self.dim_d * self.dim_chan 

223 

224 def call(self, inputs): 

225 """_summary_ 

226 

227 Args: 

228 inputs (_type_): _description_ 

229 mode (str, optional): _description_. Defaults to "encoder". 

230 

231 Returns: 

232 _type_: _description_ 

233 """ 

234 output = inputs 

235 if len(output.shape) == 3: 

236 output = output[:, :, :, None] 

237 

238 if not (self.use_conv2D): 

239 output = K.reshape( 

240 output, (-1, 1, self.size_window, self.dim_d * self.dim_chan) 

241 ) 

242 

243 # if (self.mutliscale): 

244 # list_output = [] 

245 

246 for n, layer in enumerate(self.layers): 

247 if n == 2: # dropout layers & end of block 

248 output = layer(output, training=self.flag_mc) 

249 # if (self.mutliscale): 

250 # TO do : find how affect multiscale window to the good final step 

251 # list_output.append(output) 

252 

253 else: 

254 output = layer(output) 

255 if self.use_conv2D: 

256 output = output[:, :, 0, :] 

257 else: 

258 output = output[:, 0, :, :] 

259 

260 return output 

261 

262 def get_config(self): 

263 dict_config = {} 

264 dict_config["size_window"] = self.size_window 

265 dict_config["n_windows"] = self.n_windows 

266 dict_config["step"] = self.step 

267 dict_config["dim_d"] = self.dim_d 

268 dict_config["dim_chan"] = self.dim_chan 

269 dict_config["list_strides"] = self.list_strides 

270 dict_config["list_kernels"] = self.list_kernels 

271 dict_config["list_filters"] = self.list_filters 

272 dict_config["flag_mc"] = self.flag_mc 

273 dict_config["use_conv2D"] = self.use_conv2D 

274 dict_config["seed"] = self.seed 

275 

276 dict_config["layers"] = [] 

277 for layer in self.layers: 

278 dict_config["layers"].append(tf.keras.utils.serialize_keras_object(layer)) 

279 return dict_config 

280 

281 @classmethod 

282 def from_config(cls, config): 

283 layers_config = config.pop("layers") 

284 layers = [] 

285 for layer_config in layers_config: 

286 layer = tf.keras.utils.deserialize_keras_object(layer_config) 

287 layers.append(layer) 

288 

289 obj = cls(**config) 

290 obj.layers = layers 

291 return obj 

292 

293 

294class Data_embedding_TS(layers.Layer): 

295 def __init__( 

296 self, 

297 size_window, 

298 n_windows, 

299 step=1, 

300 dim_d=1, 

301 dim_chan=1, 

302 dim_out=1, 

303 flag_mc=False, 

304 seed=None, 

305 ): 

306 super(Data_embedding_TS, self).__init__() 

307 self.n_windows = n_windows 

308 self.size_window = size_window 

309 self.dim_d = dim_d 

310 self.seed = seed 

311 set_global_determinism(self.seed) 

312 

313 if True: 

314 

315 self.MWE = Mouving_Windows_Embedding( 

316 size_window, 

317 n_windows, 

318 step=step, 

319 dim_d=dim_d, 

320 seed=seed, 

321 ) 

322 else: 

323 

324 self.MWE = Mouving_conv_Embedding( 

325 size_window, 

326 n_windows, 

327 step=step, 

328 dim_d=dim_d, 

329 dim_chan=dim_chan, 

330 conv2D=True, 

331 list_filters=None, 

332 list_strides=[2, 1], 

333 list_kernels=None, 

334 dp=0.05, 

335 flag_mc=flag_mc, 

336 seed=seed, 

337 ) 

338 

339 self.FTE = Factice_Time_Extension(dim_out) 

340 

341 def call(self, Z_enc, Y_past, extension): 

342 """_summary_ 

343 

344 Args: 

345 Z_enc (_type_): _description_ 

346 Y_past (_type_): _description_ 

347 Y_futur (_type_): _description_ 

348 mode (str, optional): _description_. Defaults to "". 

349 

350 Returns: 

351 _type_: _description_ 

352 """ 

353 

354 MWE_past = self.MWE(Y_past) 

355 Z_enc = K.concatenate([Z_enc, MWE_past], axis=-1) 

356 Z_enc = self.FTE(Z_enc) 

357 return Z_enc 

358 

359 

360@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

361class Factice_Time_Extension(layers.Layer): 

362 def __init__(self, dim_out, **kwargs): 

363 super().__init__(**kwargs) 

364 self.dim_out = dim_out 

365 

366 def call(self, inputs, **kwargs): 

367 """_summary_ 

368 

369 Args: 

370 inputs (_type_): _description_ 

371 

372 Returns: 

373 _type_: _description_ 

374 """ 

375 last_input = inputs[:, -1, :] 

376 last_input_duplicated = K.repeat_elements( 

377 last_input[:, None, :], self.dim_out, 1 

378 ) 

379 inputs_augmented = K.concatenate([inputs, last_input_duplicated], axis=1) 

380 return inputs_augmented 

381 

382 def get_config(self): 

383 dict_config = {} 

384 dict_config["dim_out"] = self.dim_out 

385 return dict_config 

386 

387 

388# Define the way for positional encoding 

389 

390 

391class PositionalEmbedding(layers.Layer): 

392 def __init__(self, d_model, max_len=40, seed=None): 

393 super(PositionalEmbedding, self).__init__() 

394 self.seed = seed 

395 # Compute the positional encodings once in log space. 

396 pe = np.zeros((max_len, d_model), dtype=np.float32) 

397 

398 position = np.expand_dims(np.arange(0, max_len, dtype=np.float32), 1) 

399 div_term = np.exp( 

400 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model) 

401 ) 

402 

403 pe[:, 0::2] = np.sin(position * div_term) 

404 pe[:, 1::2] = np.cos(position * div_term) 

405 

406 self.pe = tf.expand_dims(tf.convert_to_tensor(pe), 0) 

407 

408 def call(self, inputs, **kwargs): 

409 """_summary_ 

410 

411 Args: 

412 inputs (_type_): _description_ 

413 

414 Returns: 

415 _type_: _description_ 

416 """ 

417 return self.pe 

418 

419 

420class ValuesEmbedding(layers.Layer): 

421 def __init__(self, d_model, seed=None): 

422 super(ValuesEmbedding, self).__init__() 

423 self.seed = seed 

424 self.tokenConv = tf.keras.layers.Conv1D( 

425 filters=d_model, kernel_size=3, padding="causal", activation="linear" 

426 ) 

427 self.activation = LeakyReLU() 

428 

429 def call(self, inputs, **kwargs): 

430 """_summary_ 

431 

432 Args: 

433 inputs (_type_): _description_ 

434 

435 Returns: 

436 _type_: _description_ 

437 """ 

438 x = self.tokenConv(inputs[:, :, 1:]) 

439 x = self.activation(x) 

440 return x 

441 

442 

443class FixedEmbedding(layers.Layer): 

444 def __init__(self, c_in, d_model, seed=None): 

445 super(FixedEmbedding, self).__init__() 

446 self.seed = seed 

447 w = np.zeros((c_in, d_model), dtype=np.float32) 

448 

449 position = np.expand_dims(np.arange(0, c_in, dtype=np.float32), 1) 

450 div_term = np.exp( 

451 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model) 

452 ) 

453 

454 w[:, 0::2] = np.sin(position * div_term) 

455 w[:, 1::2] = np.cos(position * div_term) 

456 

457 w = tf.convert_to_tensor(w) 

458 tf.stop_gradient(w) 

459 w = tf.keras.initializers.Constant(w) 

460 self.emb = tf.keras.layers.Embedding(c_in, d_model, embeddings_initializer=w) 

461 

462 def call(self, inputs, **kargs): 

463 """_summary_ 

464 

465 Args: 

466 inputs (_type_): _description_ 

467 

468 Returns: 

469 _type_: _description_ 

470 """ 

471 embedding = self.emb(inputs) 

472 return embedding 

473 

474 

475class TemporalEmbedding(layers.Layer): 

476 def __init__(self, d_model, seq_len, seed=None): 

477 super(TemporalEmbedding, self).__init__() 

478 self.time_embed = FixedEmbedding(seq_len, d_model) 

479 self.seed = seed 

480 # self.minute_embed = FixedEmbedding(60, d_model) 

481 # self.hour_embed = FixedEmbedding(24, d_model) 

482 # self.weekday_embed = FixedEmbedding(7, d_model) 

483 # self.day_embed = FixedEmbedding(32, d_model) 

484 # self.month_embed = FixedEmbedding(13, d_model) 

485 

486 def call(self, inputs, **kargs): 

487 """_summary_ 

488 

489 Args: 

490 inputs (_type_): _description_ 

491 

492 Returns: 

493 _type_: _description_ 

494 """ 

495 # x = x.long() 

496 return self.time_embed(inputs[:, :, 0]) 

497 

498 

499class DataEmbedding_ITS(layers.Layer): 

500 def __init__(self, d_model, dropout=0.1, seq_len=96, seed=None): 

501 super(DataEmbedding_ITS, self).__init__() 

502 self.seq_len = seq_len 

503 self.value_embedding = ValuesEmbedding(d_model=d_model) 

504 self.position_embedding = PositionalEmbedding(d_model=d_model) 

505 self.temporal_embedding = TemporalEmbedding(d_model=d_model, seq_len=seq_len) 

506 self.ctx_embedding = tf.keras.layers.Dense(d_model) 

507 self.dropout = tf.keras.layers.Dropout(dropout, seed=seed) 

508 self.seed = None 

509 

510 def call(self, inputs, x_mark=None, **kwargs): 

511 """_summary_ 

512 

513 Args: 

514 inputs (_type_): _description_ 

515 x_mark (_type_, optional): _description_. Defaults to None. 

516 

517 Returns: 

518 _type_: _description_ 

519 """ 

520 x = ( 

521 self.value_embedding(inputs) 

522 + self.position_embedding(inputs) 

523 + self.temporal_embedding(inputs) 

524 ) 

525 

526 return self.dropout(x)