Coverage for uqmodels/modelization/DL_estimator/loss.py: 22%

136 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1import numpy as np 

2import tensorflow as tf 

3import tensorflow.keras.backend as K 

4from tensorflow.keras import callbacks 

5 

6 

7def build_MSE_loss(split=2, metric=False, var=1): 

8 def MSE_loss(true, pred): 

9 if true.shape[-1] == pred.shape[-1]: 

10 mu = pred 

11 else: 

12 pred_split = tf.split(pred, split, -1) 

13 mu = pred_split[0] 

14 

15 if metric: # Only consider 1 prediction (t+1) 

16 if len(mu.shape) == 3: 

17 loss = K.pow(true[:, 0] - mu[:, 0], 2) 

18 

19 elif ( 

20 len(mu.shape) > 3 

21 ): # Only consider 1 prediction of last (futur) segment = t+1 

22 loss = K.pow(true[:, -1, 0] - mu[:, -1, 0], 2) 

23 

24 else: 

25 loss = K.pow(true - mu, 2) 

26 

27 reduce_loss = K.sqrt(K.mean(loss)) * np.sqrt(var) 

28 else: 

29 loss = K.pow(true - mu, 2) 

30 reduce_loss = K.mean(loss, axis=0) 

31 return reduce_loss 

32 

33 return MSE_loss 

34 

35 

36def build_CCE_loss(): 

37 def CCE_loss(true, pred): 

38 loss = -K.mean(K.log(true * pred)) 

39 reduce_loss = K.sum(loss, axis=0) 

40 return reduce_loss 

41 

42 return CCE_loss 

43 

44 

45@tf.keras.utils.register_keras_serializable( 

46 package="UQModels_loss", name="BNN_loss_gaussian" 

47) 

48def BNN_loss_gaussian(true, pred, alpha=0.95): 

49 mu = pred 

50 mu, logvar = tf.split(pred, 2, -1) 

51 loss = K.pow(true - mu, 2) * K.exp(-logvar) + alpha * logvar 

52 reduce_loss = K.mean(loss, axis=0) 

53 return reduce_loss 

54 

55 

56@tf.keras.utils.register_keras_serializable( 

57 package="UQModels_loss", name="BNN_loss_edl" 

58) 

59def BNN_loss_edl(true, pred, alpha=0.95): 

60 gamma, vu, alpha_edl, beta = tf.split(pred, 4, -1) 

61 mu = gamma 

62 logvar = K.log(beta / (alpha_edl - 1)) 

63 loss = K.pow(true - mu, 2) * K.exp(-logvar) + alpha * logvar 

64 reduce_loss = K.mean(loss, axis=0) 

65 return reduce_loss 

66 

67 

68@tf.keras.utils.register_keras_serializable( 

69 package="UQModels_loss", name="BNN_metric_gaussian" 

70) 

71def BNN_metric_gaussian(true, pred): 

72 mu = pred 

73 mu, logvar = tf.split(pred, 2, -1) 

74 if len(mu.shape) == 3: 

75 mu_ = mu[:, 0] 

76 logvar_ = logvar[:, 0] 

77 true_ = true[:, 0] 

78 

79 elif len(mu.shape) > 3: 

80 mu_ = mu[:, -1, 0] 

81 logvar_ = logvar[:, -1, 0] 

82 true_ = true[:, -1, 0] 

83 else: 

84 mu_ = mu 

85 logvar_ = logvar 

86 true_ = true 

87 

88 loss = K.greater(2 * K.sqrt(K.exp(logvar_)), K.abs(true_ - mu_)) 

89 reduce_loss = K.mean(loss) 

90 return reduce_loss 

91 

92 

93@tf.keras.utils.register_keras_serializable( 

94 package="UQModels_loss", name="BNN_metric_edl" 

95) 

96def BNN_metric_edl(true, pred): 

97 gamma, vu, alpha_edl, beta = tf.split(pred, 4, -1) 

98 mu = gamma 

99 logvar = K.log(beta / (alpha_edl - 1)) 

100 

101 if len(mu.shape) == 3: 

102 mu_ = mu[:, 0] 

103 logvar_ = logvar[:, 0] 

104 true_ = true[:, 0] 

105 

106 elif len(mu.shape) > 3: 

107 mu_ = mu[:, -1, 0] 

108 logvar_ = logvar[:, -1, 0] 

109 true_ = true[:, -1, 0] 

110 else: 

111 mu_ = mu 

112 logvar_ = logvar 

113 true_ = true 

114 

115 loss = K.greater(2 * K.sqrt(K.exp(logvar_)), K.abs(true_ - mu_)) 

116 reduce_loss = K.mean(loss) 

117 return reduce_loss 

118 

119 

120def build_BNN_loss(alpha=0.95, metric=False, type_output="MC_Dropout"): 

121 def BNN_loss(true, pred): 

122 mu = pred 

123 if (type_output == "MC_Dropout") or (type_output == "Deep_ensemble"): 

124 mu, logvar = tf.split(pred, 2, -1) 

125 

126 elif type_output == "EDL": 

127 gamma, vu, alpha_edl, beta = tf.split(pred, 4, -1) 

128 mu = gamma 

129 logvar = K.log(beta / (alpha_edl - 1)) 

130 else: 

131 mu = pred 

132 np.array([[1], [1]]) 

133 

134 if metric: 

135 if len(mu.shape) == 3: 

136 mu_ = mu[:, 0] 

137 logvar_ = logvar[:, 0] 

138 true_ = true[:, 0] 

139 

140 if len(mu.shape) > 3: 

141 mu_ = mu[:, -1, 0] 

142 logvar_ = logvar[:, -1, 0] 

143 true_ = true[:, -1, 0] 

144 else: 

145 mu_ = mu 

146 logvar_ = logvar 

147 true_ = true 

148 

149 loss = K.greater(2 * K.sqrt(K.exp(logvar_)), K.abs(true_ - mu_)) 

150 reduce_loss = K.mean(loss) 

151 

152 else: 

153 loss = K.pow(true - mu, 2) * K.exp(-logvar) + alpha * logvar 

154 reduce_loss = K.mean(loss, axis=0) 

155 return reduce_loss 

156 

157 return BNN_loss 

158 

159 

160# EDL LOSS 

161# https://github.com/aamini/evidential-deep-learning/blob/main/evidential_deep_learning/layers/dense.py 

162 

163 

164def build_EDL_loss(coeff_reg=0.95, coeff_var_pen=1): 

165 def NIG_NLL(y, gamma, v, alpha, beta, reduce=True): 

166 twoBlambda = 2 * beta * (coeff_var_pen + v) 

167 nll = ( 

168 0.5 * tf.math.log(np.pi / v) 

169 - alpha * tf.math.log(twoBlambda) 

170 + (alpha + 0.5) * tf.math.log(v * (y - gamma) ** 2 + twoBlambda) 

171 + tf.math.lgamma(alpha) 

172 - tf.math.lgamma(alpha + 0.5) 

173 ) 

174 return tf.reduce_mean(nll) if reduce else nll 

175 

176 def KL_NIG(mu1, v1, a1, b1, mu2, v2, a2, b2): 

177 KL = ( 

178 0.5 * (a1 - 1) / b1 * (v2 * tf.square(mu2 - mu1)) 

179 + 0.5 * v2 / v1 

180 - 0.5 * tf.math.log(tf.abs(v2) / tf.abs(v1)) 

181 - 0.5 

182 + a2 * tf.math.log(b1 / b2) 

183 - (tf.math.lgamma(a1) - tf.math.lgamma(a2)) 

184 + (a1 - a2) * tf.math.digamma(a1) 

185 - (b1 - b2) * a1 / b1 

186 ) 

187 return KL 

188 

189 def NIG_Reg(y, gamma, v, alpha, beta, omega=0.01, reduce=True, kl=False): 

190 # error = tf.stop_gradient(tf.abs(y-gamma)) 

191 error = tf.abs(y - gamma) 

192 if kl: 

193 kl = KL_NIG(gamma, v, alpha, beta, gamma, omega, 1 + omega, beta) 

194 reg = error * kl 

195 else: 

196 evi = 2 * v + (alpha) 

197 reg = error * evi 

198 

199 return tf.reduce_mean(reg) if reduce else reg 

200 

201 def EvidentialRegressionLoss(y_true, pred): 

202 gamma, v, alpha, beta = tf.split(pred, 4, axis=-1) 

203 loss_NLL = NIG_NLL(y_true, gamma, v, alpha, beta, reduce=False) 

204 loss_Reg = NIG_Reg(y_true, gamma, v, alpha, beta, reduce=False) 

205 reduce_loss = K.mean(loss_NLL + coeff_reg * loss_Reg, axis=0) 

206 return reduce_loss 

207 

208 return EvidentialRegressionLoss 

209 

210 

211# Call back procedure 

212def default_callbacks( 

213 min_delta=0.0001, 

214 earlystop_patience=60, 

215 reducelr_patience=30, 

216 reducelr_factor=0.3, 

217 reduce_lr_min_lr=0.000001, 

218 verbose=0, 

219): 

220 call2 = callbacks.TerminateOnNaN() 

221 call0 = callbacks.EarlyStopping( 

222 monitor="val_loss", 

223 min_delta=min_delta, 

224 patience=earlystop_patience, # 60/ 30 

225 verbose=verbose, 

226 mode="min", 

227 restore_best_weights=False, 

228 ) 

229 call1 = callbacks.ReduceLROnPlateau( 

230 monitor="loss", 

231 min_delta=min_delta, 

232 factor=reducelr_factor, 

233 patience=reducelr_patience, # 10 

234 verbose=verbose, 

235 mode="min", 

236 cooldown=0, 

237 min_lr=reduce_lr_min_lr, # 0.00001 

238 ) 

239 return [call0, call1, call2] 

240 

241 

242# Basic Loss 

243# class InterruptingCallback(tf.keras.callbacks.Callback): 

244# def on_epoch_begin(self, epoch, logs=None): 

245# if epoch == 4: 

246# raise RuntimeError('Interrupting!') 

247# callback = tf.keras.callbacks.BackupAndRestore(backup_dir="/tmp/backup") 

248# model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) 

249# model.compile(tf.keras.optimizers.SGD(), loss='mse') 

250# try: 

251# model.fit(np.arange(100).reshape(5, 20), np.zeros(5), epochs=10, 

252# batch_size=1, callbacks=[callback, InterruptingCallback()], 

253# verbose=0) 

254# except: 

255# pass 

256# history = model.fit(np.arange(100).reshape(5, 20), np.zeros(5), 

257# epochs=10, batch_size=1, callbacks=[callback], 

258# verbose=0) 

259# Only 6 more epochs are run, since first trainning got interrupted at 

260# zero-indexed epoch 4, second training will continue from 4 to 9.