Coverage for uqmodels/modelization/DL_estimator/metalayers.py: 24%

375 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1import numpy as np 

2import tensorflow as tf 

3import tensorflow.keras.backend as K 

4from keras import Model, layers 

5from keras.layers import RNN, Dense, Dropout, Lambda, Layer, LSTMCell, TimeDistributed 

6 

7from uqmodels.modelization.DL_estimator.utils import set_global_determinism 

8 

9from ...utils import add_random_state, generate_random_state, get_fold_nstep 

10 

11# EDL head 

12# https://github.com/aamini/evidential-deep-learning/blob/main/evidential_deep_learning/layers/dense.py 

13 

14# tf.keras.utils.get_custom_objects().clear() 

15 

16 

17@tf.keras.utils.register_keras_serializable(package="UQModels_layers") 

18class EDLProcessing(Layer): 

19 def __init__(self, min_logvar=-6, **kwargs): 

20 self.min_logvar = min_logvar 

21 super().__init__(**kwargs) 

22 

23 def compute_output_shape(self, input_shape): 

24 return input_shape 

25 

26 def call(self, x): 

27 """Apply EDLProcessing 

28 

29 Args: 

30 x (_type_): input 

31 

32 Returns: 

33 _type_: _description_ 

34 """ 

35 mu, logv, logalpha, logbeta = tf.split(x, 4, axis=-1) 

36 v = tf.nn.softplus(logv) + 10e-6 

37 alpha = tf.nn.softplus(logalpha) + 1 

38 beta = tf.nn.softplus(logbeta) + 10e-6 

39 return tf.concat([mu, v, alpha, beta], axis=-1) 

40 

41 def get_config(self): 

42 return {"min_logvar": self.min_logvar} 

43 

44 

45@tf.keras.utils.register_keras_serializable(package="UQModels_layers") 

46class ProbabilisticProcessing(Layer): 

47 """_summary_ 

48 

49 Args: 

50 Layer (_type_): _description_ 

51 """ 

52 

53 def __init__(self, min_logvar=-10, max_logvar=10, **kwargs): 

54 self.min_logvar = min_logvar 

55 self.max_logvar = max_logvar 

56 super().__init__(**kwargs) 

57 

58 def compute_output_shape(self, input_shape): 

59 return input_shape 

60 

61 def call(self, x): 

62 """Apply ProbabilisticProcessing to x 

63 

64 Args: 

65 x (_type_): _description_ 

66 

67 Returns: 

68 _type_: _description_ 

69 """ 

70 mu, logsigma = tf.split(x, 2, axis=-1) 

71 logsigma = tf.where(logsigma > self.min_logvar, logsigma, self.min_logvar) 

72 

73 logsigma = tf.where(logsigma < self.max_logvar, logsigma, self.max_logvar) 

74 # logsigma = tf.nn.softplus(logsigma) 

75 return tf.concat([mu, logsigma], axis=-1) 

76 

77 def get_config(self): 

78 return {"min_logvar": self.min_logvar, "max_logvar": self.max_logvar} 

79 

80 

81def mlp( 

82 dim_in=10, 

83 dim_out=1, 

84 layers_size=[100, 50], 

85 name="", 

86 dp=0.01, 

87 with_mc_dp=True, 

88 type_output=None, 

89 logvar_min=-10, 

90 regularizer_W=(0.00001, 0.00001), 

91 shape_2D=None, 

92 shape_2D_out=None, 

93 random_state=None, 

94 **kwargs, 

95): 

96 """Generate a keras MLP model to make preprocessing or head subpart". 

97 

98 Args: 

99 dim_in (int): Input dimension, erase by shape_2D if 2D input_size 

100 dim_out (int or None): Input dimension, if None take the last layers_size values 

101 layers_size (list of in, optional): List of size of layers. Defaults to [100, 50]. 

102 name (str, optional): Name of model. Defaults to "". 

103 dp (float, optional): Percentage of dropout. Defaults to 0.01. 

104 type_output (_type_, optional): Specify Head last layers among 

105 ['None':pred,MC_Dropout:(Pred,var),"EDL":(Pred,mu,alpha,beta) ] 

106 logvar_min (int, optional): Cut off for small variance estimations 

107 regularizer_W (tuple, optional):Regularisation on Dense layers. Defaults to (0.00001, 0.00001). 

108 shape_2D (tupple or None, optional): if intput shape is 2D. Defaults to None. 

109 shape_2D_out: 

110 random_state (bool): handle experimental random using seed. 

111 

112 Returns: 

113 _type_: _description_ 

114 """ 

115 

116 set_global_determinism(random_state) 

117 

118 reg_l1l2 = tf.keras.regularizers.l1_l2(l1=regularizer_W[0], l2=regularizer_W[1]) 

119 

120 flag_mc = None 

121 if with_mc_dp: 

122 flag_mc = 1 

123 

124 if shape_2D is None: 

125 inputs = tf.keras.layers.Input(shape=(dim_in), name="input_" + name) 

126 output = inputs 

127 

128 else: 

129 inputs = tf.keras.layers.Input( 

130 shape=(shape_2D[0], shape_2D[1]), name="input_" + name 

131 ) 

132 output = tf.keras.layers.Lambda(lambda x: K.reshape(x, shape=(-1, dim_in)))( 

133 inputs 

134 ) 

135 

136 for n, i in enumerate(layers_size): 

137 layer = tf.keras.layers.Dense( 

138 i, 

139 activation=tf.keras.layers.LeakyReLU(alpha=0.01), 

140 name="MLP_" + str(n) + "_" + name, 

141 kernel_regularizer=reg_l1l2, 

142 ) 

143 if dp > 0: 

144 output = tf.keras.layers.Dropout( 

145 dp, seed=add_random_state(random_state, n) 

146 )(layer(output), training=flag_mc) 

147 

148 # Probablistic NN 

149 if type_output == "EDL": 

150 n_param = 4 

151 EDL_ouput = tf.keras.layers.Dense( 

152 n_param * dim_out, name="EDL", activation=None 

153 )(output) 

154 output = EDLProcessing(logvar_min)(EDL_ouput) 

155 

156 elif (type_output == "MC_Dropout") or (type_output == "Deep_ensemble"): 

157 n_param = 2 

158 Prob_output = tf.keras.layers.Dense( 

159 n_param * dim_out, name="Mu_logvar", activation=None 

160 )(output) 

161 output = ProbabilisticProcessing(logvar_min)(Prob_output) 

162 

163 elif type_output == "classif": 

164 n_param = 1 

165 tf.keras.layers.Dense(n_param * dim_out, name="Prob", activation="softmax")( 

166 output 

167 ) 

168 

169 elif dim_out is not None: 

170 n_param = 1 

171 output = tf.keras.layers.Dense(n_param * dim_out, name="Output_" + name)(output) 

172 

173 else: 

174 pass 

175 

176 if shape_2D_out is not None: 

177 output = tf.keras.layers.Lambda( 

178 lambda x: K.reshape(x, (-1, shape_2D_out[1] * n_param, shape_2D_out[0])) 

179 )(output) 

180 

181 output = tf.keras.layers.Lambda(lambda x: K.permute_dimensions(x, (0, 2, 1)))( 

182 output 

183 ) 

184 

185 mlp = tf.keras.Model(inputs, output, name="MLP_" + name) 

186 return mlp 

187 

188 

189# Improvemement to do : Transform moving_slice in Keras layers 

190# Dev : Preprocessing et reconstruction basé cnn 1D for independant multivariate TS : depreciated 

191 

192# Old 

193 

194 

195def stack_and_roll_layer( 

196 inputs, size_window, size_subseq, padding, name="", format="tf_slice" 

197): 

198 """Layers that produce a stack rolled layers to produce a batch of subsequence 

199 

200 Args: 

201 inputs (_type_): layers 

202 size_window (_type_): size of subseqeuence 

203 size_subseq (_type_): _description_ 

204 padding (_type_): _description_ 

205 name (str, optional): _description_. Defaults to "". 

206 format (str, optional): _description_. Defaults to "tf_slice". 

207 

208 Returns: 

209 _type_: _description_ 

210 """ 

211 # Slice and stack tensor to produce folded representation 

212 slide_tensor = [] 

213 n_step = get_fold_nstep(size_window, size_subseq, padding) 

214 # Implementation numpy 

215 

216 if format == "tf_slice": # tf slice based 

217 n_step = get_fold_nstep(size_window, size_subseq, padding) 

218 z_slice = [] 

219 steps = range(0, n_step * padding, padding) 

220 for i, step in enumerate(steps): 

221 z_slice.append( 

222 tf.slice(inputs, [0, step, 0], [-1, size_subseq, -1])[:, None] 

223 ) 

224 x = tf.concat(z_slice, axis=1) 

225 return x 

226 elif format == "np_slice": # np slice based 

227 for i in range(n_step): 

228 slide_tensor.append( 

229 inputs[:, (i * padding) : (i * padding) + size_subseq, :][:, None] 

230 ) 

231 return Lambda(lambda x: K.concatenate(x, axis=1), name=name + "_rollstack")( 

232 slide_tensor 

233 ) 

234 

235 elif format == "tf_map": # tf map based 

236 x = tf.map_fn( 

237 lambda i: inputs[:, (i * padding) : (i * padding) + size_subseq, :], 

238 tf.range(n_step), 

239 dtype=tf.float32, 

240 ) 

241 x = tf.transpose(x, [1, 0, 2, 3]) 

242 return x 

243 

244 else: 

245 exit( 

246 f"Unknown format {format}. Supported formats are: tf_slice, np_slice and tf_map" 

247 ) 

248 

249 

250def cnn_enc_1D( 

251 size_subseq_enc, 

252 dim_out, 

253 dim_z, 

254 dim_lt, 

255 type_output=None, 

256 k=32, 

257 dp=0.05, 

258 random_state=None, 

259 **kwarg, 

260): 

261 """cnn_enc_1D layers 

262 

263 Args: 

264 size_subseq_enc (int, optional): _description_. Defaults to 100. 

265 dim_out (int, optional): _description_. Defaults to 10. 

266 dim_z (int, optional): _description_. Defaults to 100. 

267 dim_lt (int, optional): _description_. Defaults to 100. 

268 type_output (_type_, optional): _description_. Defaults to None. 

269 k (int, optional): _description_. Defaults to 32. 

270 dp (float, optional): _description_. Defaults to 0.05. 

271 random_state (bool): handle experimental random using seed. 

272 

273 Returns: 

274 _type_: _description_ 

275 """ 

276 

277 flag_mc = False 

278 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

279 flag_mc = True 

280 

281 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_out), name="st") 

282 

283 output = layers.SeparableConv1D( 

284 k, 

285 int(dim_lt / 4), 

286 strides=1, 

287 depth_multiplier=10, 

288 activation="relu", 

289 padding="same", 

290 )(inputs) 

291 

292 output = layers.AveragePooling1D((5), padding="same")(output) 

293 if dp > 0: 

294 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

295 output, training=flag_mc 

296 ) 

297 output = layers.SeparableConv1D( 

298 k * 2, 

299 int(dim_lt / 8), 

300 strides=1, 

301 depth_multiplier=10, 

302 activation="relu", 

303 padding="same", 

304 )(output) 

305 

306 output = layers.AveragePooling1D((2), padding="same")(output) 

307 if dp > 0: 

308 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))( 

309 output, training=flag_mc 

310 ) 

311 output = layers.SeparableConv1D( 

312 k * 2, 

313 int(dim_lt / 16), 

314 strides=1, 

315 depth_multiplier=10, 

316 activation="relu", 

317 padding="same", 

318 )(output) 

319 

320 output = layers.AveragePooling1D((2), padding="same")(output) 

321 output = tf.keras.layers.Flatten()(output) 

322 output = tf.keras.layers.Dense(dim_z)(output) 

323 enc = Model(inputs, output) 

324 return enc 

325 

326 

327def cnn_dec_1D( 

328 size_subseq_dec=10, 

329 dim_out=10, 

330 dim_z=100, 

331 type_output=None, 

332 k=32, 

333 dp=0.05, 

334 random_state=None, 

335 **kwarg, 

336): 

337 """_summary_ 

338 

339 Args: 

340 size_subseq_dec (int, optional): _description_. Defaults to 10. 

341 dim_out (int, optional): _description_. Defaults to 10. 

342 dim_z (int, optional): _description_. Defaults to 100. 

343 type_output (_type_, optional): _description_. Defaults to None. 

344 k (int, optional): _description_. Defaults to 32. 

345 dp (float, optional): _description_. Defaults to 0.05. 

346 random_state (bool): handle experimental random using seed. 

347 

348 Returns: 

349 _type_: _description_ 

350 """ 

351 

352 dim_chan_out = 1 

353 flag_mc = False 

354 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

355 dim_chan_out = 2 

356 flag_mc = True 

357 

358 elif type_output == "EDL": 

359 dim_chan_out = 4 

360 

361 inputs = layers.Input(shape=(dim_z), name="st") 

362 output = tf.keras.layers.Lambda(lambda x: x[:, None, :])(inputs) 

363 output = layers.Conv1DTranspose( 

364 k, (10), strides=5, activation="relu", padding="same" 

365 )(output) 

366 if dp > 0: 

367 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

368 output, training=flag_mc 

369 ) 

370 output = layers.Conv1DTranspose( 

371 k, (50), strides=2, activation="relu", padding="same" 

372 )(output) 

373 if dp > 0: 

374 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))( 

375 output, training=flag_mc 

376 ) 

377 output = layers.SeparableConv1D( 

378 dim_out * dim_chan_out, 

379 10, 

380 strides=1, 

381 depth_multiplier=10, 

382 activation="sigmoid", 

383 padding="same", 

384 )(output) 

385 # aamini/evidential-deep-learning 

386 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

387 output = ProbabilisticProcessing()(output) 

388 

389 if type_output == "EDL": 

390 output = EDLProcessing()(output) 

391 

392 output = tf.keras.layers.Lambda( 

393 lambda x: K.reshape(x, (-1, size_subseq_dec * dim_out * dim_chan_out)) 

394 )(output) 

395 dec = tf.keras.Model(inputs, output, name="conv_lag_dec") 

396 return dec 

397 

398 

399# Dev : Preprocessing CNN based representation layers for 2D TS without spatial structure 

400 

401 

402def cnn_enc( 

403 size_subseq_enc, 

404 dim_out, 

405 dim_chan=1, 

406 k1=10, 

407 reduction_x1=8, 

408 reduction_x2=1, 

409 dim_z=100, 

410 dp=0.02, 

411 random_state=None, 

412 **kwarg, 

413): 

414 """Warning depreciated CNN_enc implementation 

415 

416 Args: 

417 size_subseq_enc (_type_): _description_ 

418 dim_out (_type_): _description_ 

419 dim_chan (int, optional): _description_. Defaults to 1. 

420 k1 (int, optional): _description_. Defaults to 10. 

421 reduction_x1 (int, optional): _description_. Defaults to 8. 

422 reduction_x2 (int, optional): _description_. Defaults to 1. 

423 dim_z (int, optional): _description_. Defaults to 100. 

424 dp (float, optional): _description_. Defaults to 0.02. 

425 random_state (_type_, optional): _description_. Defaults to None. 

426 

427 Returns: 

428 _type_: _description_ 

429 """ 

430 

431 dim_out_reshape = int(dim_out / dim_chan) 

432 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_out), name="st") 

433 

434 output = tf.keras.layers.Lambda( 

435 lambda x: K.reshape(x, (-1, size_subseq_enc, dim_out_reshape, dim_chan)) 

436 )(inputs) 

437 

438 output = tf.keras.layers.Conv2D( 

439 k1 * 3, 

440 (reduction_x1, reduction_x2), 

441 strides=1, 

442 activation="relu", 

443 )(output) 

444 

445 output = tf.keras.layers.AveragePooling2D((2, reduction_x2), padding="same")(output) 

446 

447 # output = tf.keras.layers.Dropout(dp)(output, training=True) 

448 output = tf.keras.layers.BatchNormalization()(output) 

449 

450 output = tf.keras.layers.Conv2D( 

451 k1, 

452 (reduction_x1, dim_out_reshape), 

453 strides=1, 

454 activation="relu", 

455 )(output) 

456 

457 output = tf.keras.layers.AveragePooling2D((2, 1), padding="same")(output) 

458 

459 # output = tf.keras.layers.Dropout(dp)(output, training=True) 

460 output = tf.keras.layers.BatchNormalization()(output) 

461 

462 output = tf.keras.layers.Conv2D( 

463 k1 * 2, (reduction_x1, 1), strides=1, activation="relu" 

464 )(output) 

465 

466 output = tf.keras.layers.Flatten()(output) 

467 output = tf.keras.layers.Dense(dim_z)(output) 

468 if dp > 0: 

469 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=True) 

470 enc = tf.keras.Model(inputs, output, name="conv_lag_enc") 

471 return enc 

472 

473 

474def cnn_dec( 

475 size_subseq_dec, 

476 dim_out, 

477 dim_chan=1, 

478 type_output=None, 

479 k1=10, 

480 min_logvar=-6, 

481 dim_z=100, 

482 dp=0.01, 

483 random_state=None, 

484 **kwarg, 

485): 

486 """Warning depreciated CNN_dec implementation 

487 

488 Args: 

489 size_subseq_dec (_type_): _description_ 

490 dim_out (_type_): _description_ 

491 dim_chan (int, optional): _description_. Defaults to 1. 

492 type_output (_type_, optional): _description_. Defaults to None. 

493 k1 (int, optional): _description_. Defaults to 10. 

494 min_logvar (int, optional): _description_. Defaults to -6. 

495 dim_z (int, optional): _description_. Defaults to 100. 

496 dp (float, optional): _description_. Defaults to 0.01. 

497 random_state (_type_, optional): _description_. Defaults to None. 

498 

499 Returns: 

500 _type_: _description_ 

501 """ 

502 

503 dim_chan_out = 1 

504 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

505 dim_chan_out = 2 

506 

507 elif type_output == "EDL": 

508 dim_chan_out = 4 

509 

510 dim_out_reshape = int(dim_out / dim_chan) 

511 inputs = tf.keras.layers.Input(shape=(dim_z), name="st") 

512 output = tf.keras.layers.Lambda(lambda x: x[:, None, None, :])(inputs) 

513 

514 output = tf.keras.layers.Conv2DTranspose( 

515 k1 * 2, 

516 (int(np.floor((size_subseq_dec + 1) / 2)), dim_out_reshape * dim_chan), 

517 strides=(1, 1), 

518 activation="relu", 

519 )(output) 

520 

521 output = tf.keras.layers.BatchNormalization()(output) 

522 if dp > 0: 

523 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=True) 

524 

525 output = tf.keras.layers.Conv2DTranspose( 

526 k1, 

527 (int(np.ceil((size_subseq_dec + 1) / 2)), 1), 

528 strides=(1, 1), 

529 activation="relu", 

530 )(output) 

531 

532 output = tf.keras.layers.BatchNormalization()(output) 

533 if dp > 0: 

534 output = tf.keras.layers.Dropout(dp, seed=add_random_state(random_state, 1))( 

535 output, training=True 

536 ) 

537 

538 output = tf.keras.layers.Conv2DTranspose(dim_chan_out, (1, 1), activation="linear")( 

539 output 

540 ) 

541 

542 # Probablistic NN 

543 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

544 output = ProbabilisticProcessing(min_logvar)(output) 

545 

546 # aamini/evidential-deep-learning 

547 elif type_output == "EDL": 

548 output = EDLProcessing(min_logvar)(output) 

549 

550 else: 

551 pass 

552 

553 output = tf.keras.layers.Lambda( 

554 lambda x: K.reshape(x, (-1, size_subseq_dec * dim_out * dim_chan_out)) 

555 )(output) 

556 

557 dec = tf.keras.Model(inputs, output, name="conv_lag_dec") 

558 return dec 

559 

560 

561def conv_block_1D( 

562 inputs, 

563 dim_chan, 

564 filters=32, 

565 kernel=2, 

566 strides=2, 

567 dp=0.02, 

568 flag_mc=False, 

569 random_state=None, 

570): 

571 

572 output = tf.keras.layers.Conv1D( 

573 filters * dim_chan, 

574 kernel, 

575 strides=strides, 

576 padding="causal", 

577 groups=dim_chan, 

578 activation="relu", 

579 )(inputs) 

580 output = tf.keras.layers.BatchNormalization()(output) 

581 if dp > 0: 

582 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

583 output, training=flag_mc 

584 ) 

585 return output 

586 

587 

588def conv_block_2D( 

589 inputs, 

590 dim_chan, 

591 filters=32, 

592 kernel=5, 

593 strides=(2, 1), 

594 dp=0.02, 

595 flag_mc=False, 

596 random_state=None, 

597): 

598 

599 output = tf.keras.layers.Conv2D( 

600 filters, kernel, strides=strides, padding="valid", activation="relu" 

601 )(inputs) 

602 output = tf.keras.layers.BatchNormalization()(output) 

603 if dp > 0: 

604 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

605 output, training=flag_mc 

606 ) 

607 return output 

608 

609 

610# Version actuelle : génération d'une structure CNN_ENC parametrable 

611 

612 

613def cnn_enc_bis( 

614 size_subseq_enc=60, 

615 dim_target=52, 

616 dim_chan=4, 

617 list_filters=[64, 64, 32], 

618 list_kernels=[(10, 3), 10, 10], 

619 list_strides=[(2, 1), (2, 1), (2, 1)], 

620 type_output=None, 

621 block="2D", 

622 dim_z=200, 

623 dp=0.02, 

624 random_state=None, 

625 **kwarg, 

626): 

627 """Produce a cnn_enn subpart of a deep learning predictor 

628 

629 Returns: 

630 _type_: _description_ 

631 """ 

632 flag_mc = False 

633 if type_output in ["MC_Dropout"]: 

634 flag_mc = True 

635 

636 dim_space = int(dim_target / dim_chan) 

637 if dim_chan == 1: 

638 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_target), name="st") 

639 

640 output = tf.keras.layers.Lambda(lambda x: K.expand_dims(x, axis=-1))(inputs) 

641 else: 

642 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_target), name="st") 

643 

644 output = tf.keras.layers.Lambda( 

645 lambda x: K.reshape(x, (-1, size_subseq_enc, dim_space, dim_chan)) 

646 )(inputs) 

647 

648 for n, (filters, kernel, strides) in enumerate( 

649 zip(list_filters, list_kernels, list_strides) 

650 ): 

651 if block == "2D": 

652 output = conv_block_2D( 

653 output, 

654 dim_space, 

655 filters=filters, 

656 kernel=kernel, 

657 strides=strides, 

658 flag_mc=flag_mc, 

659 random_state=add_random_state(random_state, 1 + n), 

660 ) 

661 

662 if block == "1D": 

663 output = conv_block_1D( 

664 output, 

665 dim_space, 

666 filters=filters, 

667 kernel=kernel, 

668 strides=strides, 

669 flag_mc=flag_mc, 

670 random_state=add_random_state(random_state, 1 + len(list_filters) + n), 

671 ) 

672 

673 output = tf.keras.layers.Flatten()(output) 

674 output = tf.keras.layers.Dense(dim_z)(output) 

675 if dp > 0: 

676 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

677 output, training=flag_mc 

678 ) 

679 enc = tf.keras.Model(inputs, output, name="conv_lag_enc") 

680 return enc 

681 

682 

683def Tconv_block_1D( 

684 inputs, 

685 dim_out, 

686 filters=32, 

687 kernel=2, 

688 strides=2, 

689 dp=0.02, 

690 flag_mc=False, 

691 random_state=None, 

692): 

693 

694 output = tf.keras.layers.Conv1DTranspose( 

695 filters * dim_out, 

696 kernel, 

697 strides=strides, 

698 padding="same", 

699 groups=dim_out, 

700 activation="relu", 

701 )(inputs) 

702 output = tf.keras.layers.BatchNormalization()(output) 

703 output = tf.keras.layers.Dropout(dp, seed=random_state)(output, training=flag_mc) 

704 return output 

705 

706 

707def Tconv_block_2D( 

708 inputs, 

709 dim_out, 

710 filters=32, 

711 kernel=5, 

712 strides=(2, 1), 

713 dp=0.02, 

714 flag_mc=False, 

715 random_state=None, 

716): 

717 

718 output = tf.keras.layers.Conv2DTranspose( 

719 filters, (kernel, dim_out), strides=strides, activation="relu" 

720 )(inputs) 

721 output = tf.keras.layers.BatchNormalization()(output) 

722 if dp > 0: 

723 output = tf.keras.layers.Dropout(dp, seed=random_state)( 

724 output, training=flag_mc 

725 ) 

726 return output 

727 

728 

729def cnn_dec_bis( 

730 size_subseq_dec, 

731 dim_out, 

732 dim_chan=1, 

733 type_output=None, 

734 min_logvar=-6, 

735 list_filters=[64, 64], 

736 strides=(1, 1), 

737 list_kernels=[4, 4], 

738 dim_z=200, 

739 random_state=None, 

740 **kwarg, 

741): 

742 

743 flag_mc = False 

744 if type_output in ["MC_Dropout"]: 

745 flag_mc = True 

746 

747 dim_chan_out = 1 * dim_chan 

748 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

749 dim_chan_out = 2 * dim_chan 

750 

751 elif type_output == "EDL": 

752 dim_chan_out = 4 * dim_chan 

753 

754 dim_space = int(dim_out / size_subseq_dec) 

755 inputs = tf.keras.layers.Input(shape=(dim_z), name="st") 

756 output = tf.keras.layers.Lambda(lambda x: x[:, None, None, :])(inputs) 

757 

758 for n, (filters, kernel) in enumerate(zip(list_filters, list_kernels)): 

759 d_tar = 1 

760 if n == 0: 

761 d_tar = dim_space 

762 output = Tconv_block_2D( 

763 output, 

764 d_tar, 

765 filters=filters, 

766 kernel=kernel, 

767 strides=strides, 

768 flag_mc=flag_mc, 

769 random_state=add_random_state(random_state, n), 

770 ) 

771 

772 output = tf.keras.layers.Conv2DTranspose(dim_chan_out, (1, 1), activation="linear")( 

773 output 

774 ) 

775 

776 # Probablistic NN 

777 if type_output in ["MC_Dropout", "Deep_ensemble"]: 

778 output = ProbabilisticProcessing(min_logvar)(output) 

779 

780 # aamini/evidential-deep-learning 

781 elif type_output == "EDL": 

782 output = EDLProcessing(min_logvar)(output) 

783 

784 output = tf.keras.layers.Lambda( 

785 lambda x: K.reshape(x, (-1, size_subseq_dec, dim_space * dim_chan_out)) 

786 )(output) 

787 

788 dec = tf.keras.Model(inputs, output, name="conv_lag_dec") 

789 if dec.layers[-2].output.shape[1] != dec.layers[-1].output.shape[1]: 

790 print("Warning : inadequate deconvolution window size : model will crash") 

791 

792 return dec 

793 

794 

795# Dev : Preprocessing CNN based representation layers for 2D TS without spatial structure 

796 

797 

798def dense2D_enc_dec( 

799 size_subseq_enc, 

800 size_subseq_dec, 

801 dim_in, 

802 dim_out, 

803 layers_size=[100, 50], 

804 dim_z=100, 

805 dp=0.05, 

806 enc_only=False, 

807 type_output=None, 

808 random_state=None, 

809): 

810 

811 inputs = tf.keras.layers.Input(shape=(size_subseq_enc, dim_in), name="st") 

812 inputs_flatten = Lambda(lambda x: K.reshape(x, (-1, size_subseq_enc * dim_in)))( 

813 inputs 

814 ) 

815 

816 layers_size_enc = layers_size 

817 layers_size_enc.append(dim_z) 

818 

819 mlp_enc = mlp( 

820 size_subseq_enc * dim_in, 

821 dim_out=None, 

822 layers_size=layers_size_enc, 

823 name="", 

824 dp=dp, 

825 type_output=None, 

826 random_state=random_state, 

827 )(inputs_flatten) 

828 enc = tf.keras.Model(inputs, mlp_enc, name="MLP_enc") 

829 

830 if enc_only: 

831 return (enc, None) 

832 

833 inputs = tf.keras.layers.Input(shape=(dim_z), name="embedding") 

834 z_flatten = Lambda(lambda x: K.reshape(x, (-1, dim_z)))(inputs) 

835 mlp_dec = mlp( 

836 dim_z, 

837 dim_out=size_subseq_dec * dim_out, 

838 layers_size=layers_size[::-1], 

839 name="", 

840 dp=dp, 

841 type_output=type_output, 

842 random_state=add_random_state(random_state, 100), 

843 )(z_flatten) 

844 

845 if type_output == "EDL": 

846 dim_out = dim_out * 4 

847 

848 elif type_output == "MC_Dropout": 

849 dim_out = dim_out * 2 

850 

851 reshape_dec = Lambda(lambda x: K.reshape(x, (-1, size_subseq_dec, dim_out)))( 

852 mlp_dec 

853 ) 

854 dec = tf.keras.Model(inputs, reshape_dec, name="MLP_dec") 

855 return (enc, dec) 

856 

857 

858def moving_slice_map(inputs, n_step, padding, kick_off=0, depth_slice=1): 

859 """Apply Layers on n_step slices of input with padding 

860 Args: 

861 input (TF.tensor): Input tensors 

862 Layers (Keras.model): Submodels 

863 n_step (int): N_slide 

864 padding (int): padding""" 

865 

866 steps = range(kick_off, kick_off + n_step * padding, padding) 

867 z_slice = [] 

868 for i, step in enumerate(steps): 

869 if len(inputs.shape) == 3: 

870 slice = tf.slice(inputs, [0, step, 0], [-1, depth_slice, -1]) 

871 else: 

872 slice = tf.slice(inputs, [0, step, 0, 0], [-1, depth_slice, -1, -1]) 

873 

874 if depth_slice > 1: 

875 slice = slice[:, None] 

876 

877 z_slice.append(slice) 

878 

879 z_slice = Lambda(lambda x: K.concatenate(x, axis=1))(z_slice) 

880 return z_slice 

881 

882 

883class Moving_slice_layer(Layer): 

884 """Layer that apply moving_slice_map""" 

885 

886 def __init__( 

887 self, n_step, padding, kick_off=0, depth_slice=1, ignore_futur=None, **kwargs 

888 ): 

889 self.n_step = n_step 

890 self.padding = padding 

891 self.kick_off = kick_off 

892 self.depth_slice = depth_slice 

893 self.ignore_futur = ignore_futur 

894 super().__init__(**kwargs) 

895 

896 def build(self, input_shape): 

897 super().build(input_shape) 

898 

899 def call(self, input_data): 

900 """Apply moving_slice_map to generate window sequnce 

901 

902 Args: 

903 input_data (_type_): _description_ 

904 

905 Returns: 

906 _type_: _description_ 

907 """ 

908 if self.ignore_futur: 

909 input_data = input_data[:, : self.ignore_futur] 

910 z_slice = moving_slice_map( 

911 input_data, self.n_step, self.padding, self.kick_off, self.depth_slice 

912 ) 

913 return z_slice 

914 

915 def compute_output_shape(self, input_shape): 

916 step = range( 

917 self.kick_off, self.kick_off + self.n_step * self.padding, self.padding 

918 ) 

919 if self.depth_slice > 1: 

920 output_shape = (input_shape[0], len(step), input_shape[-1]) 

921 else: 

922 output_shape = ( 

923 input_shape[0], 

924 len(step), 

925 self.depth_slice, 

926 input_shape[-1], 

927 ) 

928 return output_shape 

929 

930 

931class Double_Moving_slice_layer(Layer): 

932 """Layer that apply double moving_slice_map 

933 

934 Args: 

935 Layer (_type_): _description_ 

936 """ 

937 

938 def __init__( 

939 self, n_step_out, n_step_in, padding_out, padding_in, depth_slice=1, **kwargs 

940 ): 

941 self.n_step_out = n_step_out 

942 self.n_step_in = n_step_in 

943 self.padding_out = padding_out 

944 self.padding_in = padding_in 

945 self.depth_slice = depth_slice 

946 super().__init__(**kwargs) 

947 

948 def build(self, input_shape): 

949 super().build(input_shape) 

950 

951 def call(self, input_data): 

952 """_summary_ 

953 

954 Args: 

955 input_data (_type_): _description_ 

956 

957 Returns: 

958 _type_: _description_ 

959 """ 

960 z_slice_out = [] 

961 for i in range(self.n_step_out): 

962 kick_off = i * self.padding_out 

963 z_slice_in = moving_slice_map( 

964 input_data, 

965 self.n_step_in, 

966 self.padding_in, 

967 kick_off=kick_off, 

968 depth_slice=self.depth_slice, 

969 ) 

970 z_slice_out.append(z_slice_in[:, None]) 

971 z_slice = Lambda(lambda x: K.concatenate(x, axis=1))(z_slice_out) 

972 return z_slice 

973 

974 def compute_output_shape(self, input_shape): 

975 step_out = range(0, self.n_step_out * self.padding_out, self.padding_out) 

976 step_in = range(0, self.n_step_in * self.padding_in, self.padding_in) 

977 

978 if self.depth_slice > 1: 

979 output_shape = ( 

980 input_shape[0], 

981 len(step_out), 

982 len(step_in), 

983 input_shape[-1], 

984 ) 

985 else: 

986 output_shape = ( 

987 input_shape[0], 

988 len(step_out), 

989 len(step_in), 

990 self.depth_slice, 

991 input_shape[-1], 

992 ) 

993 return output_shape 

994 

995 

996# Modelisation ensemble de code lié à la construction d'un modèle LSTM ED à prédiction multihorizon 

997# Processing layers : LSTM cell modification to handle state passing. 

998 

999 

1000class LSTMCellReturnCellState(LSTMCell): 

1001 """Layer LSTM returning output and state jointly 

1002 

1003 Args: 

1004 LSTMCell (_type_): _description_ 

1005 """ 

1006 

1007 def call(self, inputs, states, training=None): 

1008 """_summary_ 

1009 

1010 Args: 

1011 inputs (_type_): _description_ 

1012 states (_type_): _description_ 

1013 training (_type_, optional): _description_. Defaults to None. 

1014 

1015 Returns: 

1016 _type_: _description_ 

1017 """ 

1018 outputs, [h, c] = super().call(inputs, states, training=training) 

1019 return tf.concat([h, c], axis=1), [h, c] 

1020 

1021 

1022class LSTMCellMidsize(LSTMCell): 

1023 """Hack to take into accout state in cell : size in dim_z*2 | size out dim_z 

1024 

1025 Args: 

1026 LSTMCell (_type_): _description_ 

1027 """ 

1028 

1029 def build(self, input_shape): 

1030 input_shape = (None, int(input_shape[-1] / 2)) 

1031 super().build(input_shape) 

1032 

1033 

1034# Processing layers : RNN cell 

1035 

1036 

1037class RNN_states_in_inputs(RNN): 

1038 """RNN class dispatching jointly [H,C] 

1039 

1040 Args: 

1041 RNN (_type_): _description_ 

1042 """ 

1043 

1044 def call( 

1045 self, inputs, mask=None, training=None, initial_state=None, constants=None 

1046 ): 

1047 """_summary_ 

1048 

1049 Args: 

1050 inputs (_type_): _description_ 

1051 mask (_type_, optional): _description_. Defaults to None. 

1052 training (_type_, optional): _description_. Defaults to None. 

1053 initial_state (_type_, optional): _description_. Defaults to None. 

1054 constants (_type_, optional): _description_. Defaults to None. 

1055 

1056 Returns: 

1057 _type_: _description_ 

1058 """ 

1059 h, c = tf.split(inputs, 2, axis=-1) 

1060 states = (h[:, -1], c[:, -1]) 

1061 return super().call( 

1062 h, initial_state=states, mask=mask, training=training, constants=constants 

1063 ) 

1064 

1065 

1066def LSTM_EProcessing( 

1067 n_step, 

1068 dim_in, 

1069 dim_z, 

1070 flag_mc, 

1071 dp=0.05, 

1072 dp_r=0.02, 

1073 l1_l2_reg=(0.0000001, 0.0000001), 

1074 random_state=None, 

1075): 

1076 """Encoder Processing as block aim to capture dynamic 

1077 Input (batch,n_step,dim_in) output (batch,n_step,dim_z*2) with Z_h and Z_state concatenate) 

1078 

1079 

1080 Args: 

1081 n_step (_type_): _description_ 

1082 dim_in (_type_): _description_ 

1083 dim_z (_type_): _description_ 

1084 flag_mc (_type_): _description_ 

1085 dp (float, optional): _description_. Defaults to 0.05. 

1086 dp_r (float, optional): _description_. Defaults to 0.02. 

1087 l1_l2_reg (tuple, optional): _description_. Defaults to (0.0000001, 0.0000001). 

1088 random_state (bool): handle experimental random using seed. 

1089 Returns: 

1090 Model: Keras model as LSTM Encoder block 

1091 """ 

1092 

1093 lstm_cell_enc = LSTMCellReturnCellState( 

1094 int(dim_z), 

1095 name="LSTM_enc_0", 

1096 activation="sigmoid", 

1097 recurrent_dropout=dp_r, 

1098 seed=random_state, 

1099 kernel_regularizer=tf.keras.regularizers.l1_l2( 

1100 l1=l1_l2_reg[0], l2=l1_l2_reg[1] 

1101 ), 

1102 ) 

1103 lstm_enc = tf.keras.layers.RNN(lstm_cell_enc, return_sequences=True) 

1104 

1105 EProcessing_in = tf.keras.layers.Input( 

1106 shape=(n_step, dim_in), name="input_EProcessing" 

1107 ) 

1108 Z_input_lstm = Dropout(dp, seed=add_random_state(random_state, 100))( 

1109 TimeDistributed(Dense(dim_z))(EProcessing_in) 

1110 ) 

1111 Z_lstm_output = Dropout(dp, seed=add_random_state(random_state, 101))( 

1112 lstm_enc(Z_input_lstm), training=flag_mc 

1113 ) 

1114 

1115 # Extended with 0 to not affect state part in skip connexion. 

1116 Z_input_lstm_extended = tf.keras.layers.Lambda(lambda x: K.concatenate([x, x * 0]))( 

1117 Z_input_lstm 

1118 ) 

1119 

1120 EProcessing_out = tf.keras.layers.Lambda( 

1121 lambda x: tf.keras.layers.Add()([x[0], x[1]]), name="skip_EProcessing" 

1122 )([Z_lstm_output, Z_input_lstm_extended]) 

1123 

1124 LSTM_EProcessing_ = tf.keras.Model( 

1125 EProcessing_in, EProcessing_out, name="EProcessing" 

1126 ) 

1127 

1128 return LSTM_EProcessing_ 

1129 

1130 

1131def LSTM_DProcessing( 

1132 n_step, 

1133 dim_z, 

1134 flag_mc, 

1135 dp=0.05, 

1136 dp_r=0.02, 

1137 l1_l2_reg=(0.0000001, 0.0000001), 

1138 random_state=None, 

1139): 

1140 """Decoder Processing as block : aim to make temporal projection (Usefull to hold query information) 

1141 Input (batch,n_step,dim_z*2) output (batch,n_step,dim_z) with Zdecoding latent space) 

1142 

1143 Args: 

1144 n_step (_type_): _description_ 

1145 dim_z (_type_): _description_ 

1146 flag_mc (_type_): _description_ 

1147 dp (float, optional): _description_. Defaults to 0.05. 

1148 dp_r (float, optional): _description_. Defaults to 0.02. 

1149 l1_l2_reg (tuple, optional): _description_. Defaults to (0.0000001, 0.0000001). 

1150 random_state (bool): handle experimental random using seed. 

1151 

1152 Returns: 

1153 Model: Keras model as LSTM Decoder block 

1154 """ 

1155 

1156 lstm_cell_dec = LSTMCellMidsize( 

1157 int(dim_z), 

1158 name="LSTM_dec_0", 

1159 activation="sigmoid", 

1160 recurrent_dropout=dp_r, 

1161 seed=random_state, 

1162 kernel_regularizer=tf.keras.regularizers.l1_l2( 

1163 l1=l1_l2_reg[0], l2=l1_l2_reg[1] 

1164 ), 

1165 ) 

1166 

1167 lstm_dec = RNN_states_in_inputs(lstm_cell_dec, return_sequences=True) 

1168 

1169 DProcessing_in = tf.keras.layers.Input( 

1170 shape=(n_step, dim_z * 2), name="input_DProcessing" 

1171 ) 

1172 Z_lstm_output = Dropout(dp, seed=add_random_state(random_state + 100))( 

1173 lstm_dec(DProcessing_in), training=flag_mc 

1174 ) 

1175 DProcessing_out = tf.keras.layers.Lambda( 

1176 lambda x: tf.keras.layers.Add()([x[0], x[1][:, :, :dim_z]]), 

1177 name="skip_DProcessing", 

1178 )([Z_lstm_output, DProcessing_in]) 

1179 

1180 LSTM_DProcessing_ = tf.keras.Model( 

1181 DProcessing_in, DProcessing_out, name="DProcessing" 

1182 ) 

1183 return LSTM_DProcessing_ 

1184 

1185 

1186class Add_query_to_Z_Processing_with_state(Layer): 

1187 def __init__(self, dim_z): 

1188 self.dim_z = dim_z 

1189 self.layer = Dense(dim_z) 

1190 super().__init__() 

1191 

1192 def compute_output_shape(self, input_shape): 

1193 return (input_shape[0], input_shape[1], input_shape[2], self.dim_z * 2) 

1194 

1195 def call(self, ZProcessing, Query): 

1196 """_summary_ 

1197 

1198 Args: 

1199 ZProcessing (_type_): _description_ 

1200 Query (_type_): _description_ 

1201 

1202 Returns: 

1203 _type_: _description_ 

1204 """ 

1205 Z = tf.concat([ZProcessing[:, :, :, : self.dim_z], Query], axis=-1) 

1206 new_Z = TimeDistributed(TimeDistributed(self.layer), name="wtf")(Z) 

1207 Z = tf.concat([new_Z, ZProcessing[:, :, :, self.dim_z :]], axis=-1) 

1208 return Z 

1209 

1210 

1211def get_cnn_enc_params(dim_target, size_subseq_enc=1, dim_z=50, random_state=None): 

1212 """Produce dict params that can instanciate cnn_enn_bloc 

1213 Args: 

1214 dim_target (_type_): dimension of motifs to convolute 

1215 size_subseq_enc (int, optional): length of motifs to convulute 

1216 dim_z (int, optional): latent dimension 

1217 random_state (bool): handle experimental random using seed. 

1218 

1219 """ 

1220 

1221 dict_params = { 

1222 "builder": cnn_enc_bis, 

1223 "size_subseq_enc": size_subseq_enc, 

1224 "dim_target": dim_target, 

1225 "dim_chan": 1, 

1226 "list_filters": [64, 64, 32], 

1227 "list_kernels": [10, 10, 10], 

1228 "block": "2D", 

1229 "dim_z": dim_z, 

1230 "dp": 0.05, 

1231 "random_state": random_state, 

1232 } 

1233 

1234 return dict_params 

1235 

1236 

1237def get_cnn_dec_params(dim_target, size_subseq_dec=1, dim_z=50, random_state=None): 

1238 """Produce dict params that can instanciate cnn_dec_bloc 

1239 Args: 

1240 dim_target (_type_): dimension of motifs to convolute 

1241 size_subseq_dec (int, optional): length of motifs to convulute 

1242 dim_z (int, optional): latent dimension 

1243 random_state (bool): handle experimental random using seed. 

1244 """ 

1245 dict_params = { 

1246 "builder": cnn_dec_bis, 

1247 "dim_z": dim_z, 

1248 "size_subseq_dec": size_subseq_dec, 

1249 "dim_target": dim_target, 

1250 "dim_chan": 1, 

1251 "list_filters": [64, 64], 

1252 "strides": (2, 1), 

1253 "list_kernels": [4, 4], 

1254 "min_logvar": -10, 

1255 "random_state": random_state, 

1256 } 

1257 

1258 return dict_params