Coverage for uqmodels/modelization/DL_estimator/data_embedding.py: 50%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1import math 

2 

3import numpy as np 

4import tensorflow as tf 

5import tensorflow.keras.backend as K 

6from keras.layers import LeakyReLU 

7from tensorflow.keras import layers 

8 

9from uqmodels.modelization.DL_estimator.utils import ( 

10 find_conv_kernel, 

11 set_global_determinism, 

12) 

13 

14from ...utils import add_random_state 

15 

16# tf.keras.utils.get_custom_objects().clear() 

17 

18 

19# Restructuration de code : Functionnalité de layers-preprocessing pour NN (Transformer ?) 

20 

21 

22class Mouving_Windows_Embedding(layers.Layer): 

23 def __init__( 

24 self, size_window, n_windows=1, step=1, dim_d=1, dim_chan=1, seed=None, **kwargs 

25 ): 

26 """_summary_ 

27 

28 Args: 

29 sub_seq_size (_type_): _description_ 

30 size_window (_type_): _description_ 

31 dim_out (int, optional): _description_. Defaults to 1. 

32 padding (int, optional): _description_. Defaults to 1. 

33 seed (bool): handle experimental random using seed. 

34 """ 

35 super(Mouving_Windows_Embedding, self).__init__() 

36 self.size_window = size_window 

37 self.n_windows = n_windows 

38 self.step = step 

39 self.dim_d = dim_d 

40 self.dim_chan = dim_chan 

41 self.seed = seed 

42 self.last_shape = self.size_window * self.dim_d * self.dim_chan 

43 

44 def call(self, inputs, mode="encoder"): 

45 """_summary_ 

46 

47 Args: 

48 inputs (_type_): _description_ 

49 mode (str, optional): _description_. Defaults to "encoder". 

50 

51 Returns: 

52 _type_: _description_ 

53 """ 

54 output = inputs 

55 if len(output.shape) == 3: 

56 output = output[:, :, :, None] 

57 

58 slide_tensor = [] 

59 for i in range(self.n_windows): 

60 slide_tensor.append( 

61 inputs[:, (i * self.step) : (i * self.step) + self.size_window] 

62 ) 

63 MWE_raw = K.stack(slide_tensor, axis=1) 

64 MWE = K.reshape( 

65 MWE_raw, (-1, self.n_windows, self.size_window * self.dim_d * self.dim_chan) 

66 ) 

67 

68 return MWE 

69 

70 

71class Mouving_Window_Embedding(layers.Layer): 

72 def __init__(self, sub_seq_size, size_window, dim_out=1, padding=1, seed=None): 

73 super(Mouving_Window_Embedding, self).__init__() 

74 self.sub_seq_size = sub_seq_size 

75 self.dim_out = dim_out 

76 self.size_window = size_window 

77 self.padding = padding 

78 self.seed = seed 

79 

80 def call(self, inputs, mode="encoder"): 

81 """_summary_ 

82 

83 Args: 

84 inputs (_type_): _description_ 

85 mode (str, optional): _description_. Defaults to "encoder". 

86 

87 Returns: 

88 _type_: _description_ 

89 """ 

90 if mode == "encoder": 

91 size_output = self.size_window 

92 elif mode == "decoder": 

93 size_output = self.size_window + self.dim_out - 1 

94 else: 

95 print("Mode error") 

96 if self.sub_seq_size == 1: 

97 return inputs 

98 else: 

99 slide_tensor = [] 

100 for i in range(size_output): 

101 slide_tensor.append( 

102 inputs[ 

103 :, (i * self.padding) : (i * self.padding) + self.sub_seq_size 

104 ] 

105 ) 

106 MWE_raw = K.stack(slide_tensor, axis=1) 

107 MWE = K.reshape( 

108 MWE_raw, (-1, size_output, self.sub_seq_size * inputs.shape[-1]) 

109 ) 

110 return MWE 

111 

112 

113@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

114class Conv2D(tf.keras.layers.Conv2D): 

115 pass 

116 

117 

118@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

119class Dropout(tf.keras.layers.Dropout): 

120 pass 

121 

122 

123@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

124class Conv1D(tf.keras.layers.Conv1D): 

125 pass 

126 

127 

128@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

129class Mouving_conv_Embedding(layers.Layer): 

130 def __init__( 

131 self, 

132 size_window, 

133 n_windows, 

134 step=1, 

135 dim_d=1, 

136 dim_chan=1, 

137 use_conv2D=True, 

138 list_filters=None, 

139 list_strides=[2, 1, 1], 

140 list_kernels=None, 

141 dp=0.01, 

142 flag_mc=False, 

143 seed=None, 

144 **kwargs 

145 ): 

146 """_summary_ 

147 

148 Args: 

149 sub_seq_size (_type_): _description_ 

150 size_window (_type_): _description_ 

151 dim_out (int, optional): _description_. Defaults to 1. 

152 padding (int, optional): _description_. Defaults to 1. 

153 seed (bool): handle experimental random using seed. 

154 """ 

155 

156 self.size_window = size_window 

157 self.n_windows = n_windows 

158 self.step = step 

159 self.dim_d = dim_d 

160 self.dim_chan = dim_chan 

161 self.use_conv2D = use_conv2D 

162 self.flag_mc = flag_mc 

163 self.seed = seed 

164 self.mutliscale = False 

165 set_global_determinism(self.seed) 

166 

167 if list_filters is None: 

168 list_filters = [64 for i in list_strides] 

169 

170 if list_kernels is None: 

171 list_kernels, list_strides = find_conv_kernel( 

172 self.size_window, n_windows, list_strides 

173 ) 

174 list_filters.append(list_filters[-1]) 

175 

176 super().__init__(**kwargs) 

177 

178 self.list_strides = list_strides 

179 self.list_kernels = list_kernels 

180 self.list_filters = list_filters 

181 

182 self.layers = [] 

183 for n, (filters, kernel, strides) in enumerate( 

184 zip(list_filters, list_kernels, list_strides) 

185 ): 

186 if use_conv2D: 

187 if n == 0: 

188 kernel = (kernel, dim_d) 

189 else: 

190 kernel = (kernel, 1) 

191 self.layers.append( 

192 Conv2D( 

193 filters, 

194 kernel, 

195 strides=strides, 

196 padding="valid", 

197 activation="relu", 

198 ) 

199 ) 

200 if dp > 0: 

201 self.layers.append(Dropout(dp, seed=add_random_state(seed, n))) 

202 

203 else: 

204 self.layers.append( 

205 Conv1D( 

206 filters * dim_chan * dim_d, 

207 kernel, 

208 strides=strides, 

209 padding="valid", 

210 groups=dim_chan * dim_d, 

211 activation="relu", 

212 ) 

213 ) 

214 

215 # self.layers.append(tf.keras.layers.BatchNormalization()) 

216 # self.layers.append(tf.keras.layers.Dropout(dp)) 

217 

218 if use_conv2D: 

219 self.last_shape = list_filters[-1] 

220 else: 

221 self.last_shape = list_filters[-1] * self.dim_d * self.dim_chan 

222 

223 def call(self, inputs): 

224 """_summary_ 

225 

226 Args: 

227 inputs (_type_): _description_ 

228 mode (str, optional): _description_. Defaults to "encoder". 

229 

230 Returns: 

231 _type_: _description_ 

232 """ 

233 output = inputs 

234 if len(output.shape) == 3: 

235 output = output[:, :, :, None] 

236 

237 if not (self.use_conv2D): 

238 output = K.reshape( 

239 output, (-1, 1, self.size_window, self.dim_d * self.dim_chan) 

240 ) 

241 

242 # if (self.mutliscale): 

243 # list_output = [] 

244 

245 for n, layer in enumerate(self.layers): 

246 if n == 2: # dropout layers & end of block 

247 output = layer(output, training=self.flag_mc) 

248 # if (self.mutliscale): 

249 # TO do : find how affect multiscale window to the good final step 

250 # list_output.append(output) 

251 

252 else: 

253 output = layer(output) 

254 if self.use_conv2D: 

255 output = output[:, :, 0, :] 

256 else: 

257 output = output[:, 0, :, :] 

258 

259 return output 

260 

261 def get_config(self): 

262 dict_config = {} 

263 dict_config["size_window"] = self.size_window 

264 dict_config["n_windows"] = self.n_windows 

265 dict_config["step"] = self.step 

266 dict_config["dim_d"] = self.dim_d 

267 dict_config["dim_chan"] = self.dim_chan 

268 dict_config["list_strides"] = self.list_strides 

269 dict_config["list_kernels"] = self.list_kernels 

270 dict_config["list_filters"] = self.list_filters 

271 dict_config["flag_mc"] = self.flag_mc 

272 dict_config["use_conv2D"] = self.use_conv2D 

273 dict_config["seed"] = self.seed 

274 

275 dict_config["layers"] = [] 

276 for layer in self.layers: 

277 dict_config["layers"].append(tf.keras.utils.serialize_keras_object(layer)) 

278 return dict_config 

279 

280 @classmethod 

281 def from_config(cls, config): 

282 layers_config = config.pop("layers") 

283 layers = [] 

284 for layer_config in layers_config: 

285 layer = tf.keras.utils.deserialize_keras_object(layer_config) 

286 layers.append(layer) 

287 

288 obj = cls(**config) 

289 obj.layers = layers 

290 return obj 

291 

292 

293class Data_embedding_TS(layers.Layer): 

294 def __init__( 

295 self, 

296 size_window, 

297 n_windows, 

298 step=1, 

299 dim_d=1, 

300 dim_chan=1, 

301 dim_out=1, 

302 flag_mc=False, 

303 seed=None, 

304 ): 

305 super(Data_embedding_TS, self).__init__() 

306 self.n_windows = n_windows 

307 self.size_window = size_window 

308 self.dim_d = dim_d 

309 self.seed = seed 

310 set_global_determinism(self.seed) 

311 

312 if True: 

313 

314 self.MWE = Mouving_Windows_Embedding( 

315 size_window, 

316 n_windows, 

317 step=step, 

318 dim_d=dim_d, 

319 seed=seed, 

320 ) 

321 else: 

322 

323 self.MWE = Mouving_conv_Embedding( 

324 size_window, 

325 n_windows, 

326 step=step, 

327 dim_d=dim_d, 

328 dim_chan=dim_chan, 

329 conv2D=True, 

330 list_filters=None, 

331 list_strides=[2, 1], 

332 list_kernels=None, 

333 dp=0.05, 

334 flag_mc=flag_mc, 

335 seed=seed, 

336 ) 

337 

338 self.FTE = Factice_Time_Extension(dim_out) 

339 

340 def call(self, Z_enc, Y_past, extension): 

341 """_summary_ 

342 

343 Args: 

344 Z_enc (_type_): _description_ 

345 Y_past (_type_): _description_ 

346 Y_futur (_type_): _description_ 

347 mode (str, optional): _description_. Defaults to "". 

348 

349 Returns: 

350 _type_: _description_ 

351 """ 

352 

353 MWE_past = self.MWE(Y_past) 

354 Z_enc = K.concatenate([Z_enc, MWE_past], axis=-1) 

355 Z_enc = self.FTE(Z_enc) 

356 return Z_enc 

357 

358 

359@tf.keras.utils.register_keras_serializable(package="UQModels_data_embedding") 

360class Factice_Time_Extension(layers.Layer): 

361 def __init__(self, dim_out, **kwargs): 

362 super().__init__(**kwargs) 

363 self.dim_out = dim_out 

364 

365 def call(self, inputs, **kwargs): 

366 """_summary_ 

367 

368 Args: 

369 inputs (_type_): _description_ 

370 

371 Returns: 

372 _type_: _description_ 

373 """ 

374 last_input = inputs[:, -1, :] 

375 last_input_duplicated = K.repeat_elements( 

376 last_input[:, None, :], self.dim_out, 1 

377 ) 

378 inputs_augmented = K.concatenate([inputs, last_input_duplicated], axis=1) 

379 return inputs_augmented 

380 

381 def get_config(self): 

382 dict_config = {} 

383 dict_config["dim_out"] = self.dim_out 

384 return dict_config 

385 

386 

387# Define the way for positional encoding 

388 

389 

390class PositionalEmbedding(layers.Layer): 

391 def __init__(self, d_model, max_len=40, seed=None): 

392 super(PositionalEmbedding, self).__init__() 

393 self.seed = seed 

394 # Compute the positional encodings once in log space. 

395 pe = np.zeros((max_len, d_model), dtype=np.float32) 

396 

397 position = np.expand_dims(np.arange(0, max_len, dtype=np.float32), 1) 

398 div_term = np.exp( 

399 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model) 

400 ) 

401 

402 pe[:, 0::2] = np.sin(position * div_term) 

403 pe[:, 1::2] = np.cos(position * div_term) 

404 

405 self.pe = tf.expand_dims(tf.convert_to_tensor(pe), 0) 

406 

407 def call(self, inputs, **kwargs): 

408 """_summary_ 

409 

410 Args: 

411 inputs (_type_): _description_ 

412 

413 Returns: 

414 _type_: _description_ 

415 """ 

416 return self.pe 

417 

418 

419class ValuesEmbedding(layers.Layer): 

420 def __init__(self, d_model, seed=None): 

421 super(ValuesEmbedding, self).__init__() 

422 self.seed = seed 

423 self.tokenConv = tf.keras.layers.Conv1D( 

424 filters=d_model, kernel_size=3, padding="causal", activation="linear" 

425 ) 

426 self.activation = LeakyReLU() 

427 

428 def call(self, inputs, **kwargs): 

429 """_summary_ 

430 

431 Args: 

432 inputs (_type_): _description_ 

433 

434 Returns: 

435 _type_: _description_ 

436 """ 

437 x = self.tokenConv(inputs[:, :, 1:]) 

438 x = self.activation(x) 

439 return x 

440 

441 

442class FixedEmbedding(layers.Layer): 

443 def __init__(self, c_in, d_model, seed=None): 

444 super(FixedEmbedding, self).__init__() 

445 self.seed = seed 

446 w = np.zeros((c_in, d_model), dtype=np.float32) 

447 

448 position = np.expand_dims(np.arange(0, c_in, dtype=np.float32), 1) 

449 div_term = np.exp( 

450 np.arange(0, d_model, 2, dtype=np.float32) * -(math.log(10000.0) / d_model) 

451 ) 

452 

453 w[:, 0::2] = np.sin(position * div_term) 

454 w[:, 1::2] = np.cos(position * div_term) 

455 

456 w = tf.convert_to_tensor(w) 

457 tf.stop_gradient(w) 

458 w = tf.keras.initializers.Constant(w) 

459 self.emb = tf.keras.layers.Embedding(c_in, d_model, embeddings_initializer=w) 

460 

461 def call(self, inputs, **kargs): 

462 """_summary_ 

463 

464 Args: 

465 inputs (_type_): _description_ 

466 

467 Returns: 

468 _type_: _description_ 

469 """ 

470 embedding = self.emb(inputs) 

471 return embedding 

472 

473 

474class TemporalEmbedding(layers.Layer): 

475 def __init__(self, d_model, seq_len, seed=None): 

476 super(TemporalEmbedding, self).__init__() 

477 self.time_embed = FixedEmbedding(seq_len, d_model) 

478 self.seed = seed 

479 # self.minute_embed = FixedEmbedding(60, d_model) 

480 # self.hour_embed = FixedEmbedding(24, d_model) 

481 # self.weekday_embed = FixedEmbedding(7, d_model) 

482 # self.day_embed = FixedEmbedding(32, d_model) 

483 # self.month_embed = FixedEmbedding(13, d_model) 

484 

485 def call(self, inputs, **kargs): 

486 """_summary_ 

487 

488 Args: 

489 inputs (_type_): _description_ 

490 

491 Returns: 

492 _type_: _description_ 

493 """ 

494 # x = x.long() 

495 return self.time_embed(inputs[:, :, 0]) 

496 

497 

498class DataEmbedding_ITS(layers.Layer): 

499 def __init__(self, d_model, dropout=0.1, seq_len=96, seed=None): 

500 super(DataEmbedding_ITS, self).__init__() 

501 self.seq_len = seq_len 

502 self.value_embedding = ValuesEmbedding(d_model=d_model) 

503 self.position_embedding = PositionalEmbedding(d_model=d_model) 

504 self.temporal_embedding = TemporalEmbedding(d_model=d_model, seq_len=seq_len) 

505 self.ctx_embedding = tf.keras.layers.Dense(d_model) 

506 self.dropout = tf.keras.layers.Dropout(dropout, seed=seed) 

507 self.seed = None 

508 

509 def call(self, inputs, x_mark=None, **kwargs): 

510 """_summary_ 

511 

512 Args: 

513 inputs (_type_): _description_ 

514 x_mark (_type_, optional): _description_. Defaults to None. 

515 

516 Returns: 

517 _type_: _description_ 

518 """ 

519 x = ( 

520 self.value_embedding(inputs) 

521 + self.position_embedding(inputs) 

522 + self.temporal_embedding(inputs) 

523 ) 

524 

525 return self.dropout(x)