Coverage for uqmodels/evaluation/metrics.py: 45%

207 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1import sys 

2from abc import ABC 

3 

4import numpy as np 

5import scipy 

6 

7import uqmodels.postprocessing.UQ_processing as UQ_proc 

8 

9from .base_metrics import average_coverage 

10 

11sys.path.insert(1, "/home/kevin.pasini/Workspace/n5_benchmark") 

12sys.path.insert(1, "/home/kevin.pasini/Workspace/n5_puncc") 

13sys.path.insert(1, "/home/kevin.pasini/Workspace/n5_uqmodels") 

14 

15# Metrics wrapper 

16 

17 

18class Encapsulated_metrics(ABC): 

19 """Abstract Encapsulated Metrics class : 

20 Allow generic manipulation of metrics with output specifyied format""" 

21 

22 def __init__(self): 

23 self.name = "metrics" 

24 

25 def compute(self, y, output, sets, context, **kwarg): 

26 """Compute metrics 

27 

28 Args: 

29 output (array): Model results 

30 y (array): Targets 

31 sets (array list): Sub-set (train,test) 

32 context (array): Additional information that may be used in metrics 

33 """ 

34 

35 

36def build_ctx_mask(context, list_ctx_constraint): 

37 meta_flag = [] 

38 for ctx, min_, max_ in list_ctx_constraint: 

39 if min_ is not None: 

40 meta_flag.append(context[:, ctx] > min_) 

41 if max_ is not None: 

42 meta_flag.append(context[:, ctx] < max_) 

43 ctx_flag = np.array(meta_flag).mean(axis=0) == 1 

44 return ctx_flag 

45 

46 

47class Generic_metric(Encapsulated_metrics): 

48 def __init__( 

49 self, 

50 ABmetric, 

51 name="Metric", 

52 mask=None, 

53 list_ctx_constraint=None, 

54 reduce=True, 

55 **kwarg 

56 ): 

57 """Metrics wrapper from function 

58 

59 Args: 

60 ABmetric (function): function to wrap 

61 name (str, optional): name of the metric. Defaults to "Metric". 

62 mask (_type_, optional): mask for specify a focused dimension on multidimensional task. Defaults to None. 

63 list_ctx_constraint (_type_, optional): list of ctx_constraint link to context information. 

64 Defaults to None. 

65 reduce (bool, optional): if reduce multidimensional using mean. Defaults to True. 

66 """ 

67 self.ABmetric = ABmetric 

68 self.mask = mask 

69 self.name = name 

70 self.reduce = reduce 

71 self.list_ctx_constraint = list_ctx_constraint 

72 self.kwarg = kwarg 

73 

74 def compute(self, y, output, sets, context, **kwarg): 

75 perf_res = [] 

76 if self.kwarg != dict(): 

77 kwarg = self.kwarg 

78 

79 if self.list_ctx_constraint is not None: 

80 ctx_mask = build_ctx_mask(context, self.list_ctx_constraint) 

81 for set_ in sets: 

82 if self.list_ctx_constraint is not None: 

83 set_ = set_ & ctx_mask 

84 

85 perf_res.append( 

86 self.ABmetric(y, output, set_, self.mask, self.reduce, **kwarg) 

87 ) 

88 return perf_res 

89 

90 

91# Metric reworked 

92 

93 

94def rmse(y, output, set_, mask, reduce, **kwarg): 

95 """Root mean square error metrics 

96 

97 Args: 

98 y (np.array): Targets/observation 

99 output (np.array): modeling output : (y,UQ) 

100 set_ (list of mask): subset specification 

101 mask (bool array): mask the last dimension 

102 reduce (bool): apply reduction 

103 

104 Returns: 

105 val: rmse values 

106 """ 

107 pred = output[0] 

108 val = np.sqrt(np.power(pred[set_] - y[set_], 2).mean(axis=0)) 

109 if mask: 

110 val = val[mask] 

111 

112 if reduce: 

113 val = val.mean() 

114 return val 

115 

116 

117def UQ_sharpness(y, output, set_, mask, reduce, type_UQ="var", **kwarg): 

118 """Compute sharpness by transform UQ into 95% coverage PIs then compute size of PIs. 

119 

120 Args: 

121 y (np.array): Targets/observation 

122 output (np.array): modeling output : (y,UQ) 

123 set_ (list of mask): subset specification 

124 mask (bool array): mask the last dimension 

125 reduce (bool): apply reduction 

126 type_UQ (str, optional): _description_. Defaults to "var". 

127 

128 Returns: 

129 _type_: _description_ 

130 """ 

131 

132 pred, UQ = output 

133 y_lower = UQ_proc.process_UQmeasure_to_quantile( 

134 UQ, type_UQ, pred, y, type_UQ_params=None, alpha=0.025 

135 ) 

136 y_upper = UQ_proc.process_UQmeasure_to_quantile( 

137 UQ, type_UQ, pred, y, type_UQ_params=None, alpha=0.975 

138 ) 

139 

140 if mask is None: 

141 val = (y_upper[set_] - y_lower[set_]).mean(axis=0) 

142 else: 

143 val = (y_upper[set_] - y_lower[set_]).mean(axis=0)[mask] 

144 if reduce: 

145 val = val.mean() 

146 return val 

147 

148 

149def UQ_average_coverage( 

150 y, output, set_, mask, reduce, type_UQ="var", alpha=0.045, mode="UQ", **kwarg 

151): 

152 """Compute data coverage by transform UQ into (1-alpha)% coverage PIs 

153 

154 Args: 

155 y (np.array): Targets/observation 

156 output (np.array): modeling output : (y,UQ) 

157 set_ (list of mask): subset specification 

158 mask (bool array): mask the last dimension 

159 reduce (bool): apply reduction 

160 type_UQ (str, optional): _description_. Defaults to "var". 

161 alpha (float, optional): _description_. Defaults to 0.045. 

162 mode (str, optional): _description_. Defaults to 'UQ'. 

163 

164 Returns: 

165 _type_: _description_ 

166 """ 

167 

168 if mode == "UQ": 

169 pred, UQ = output 

170 y_lower = UQ_proc.process_UQmeasure_to_quantile( 

171 UQ, type_UQ, pred, y, type_UQ_params=None, alpha=alpha / 2 

172 ) 

173 y_upper = UQ_proc.process_UQmeasure_to_quantile( 

174 UQ, type_UQ, pred, y, type_UQ_params=None, alpha=1 - (alpha / 2) 

175 ) 

176 

177 elif mode == "KPI": 

178 y_lower, y_upper = output 

179 

180 if mask is None: 

181 val = cov_metrics(y[set_], y_lower[set_], y_upper[set_]) 

182 

183 else: 

184 val = cov_metrics(y[set_], y_lower[set_], y_upper[set_])[mask] 

185 

186 if reduce: 

187 val = val.mean() 

188 return val 

189 

190 

191def UQ_Gaussian_NLL(y, output, set_, mask, reduce, type_UQ="var", mode=None, **kwarg): 

192 """Compute Neg likelihood by transform UQ into sigma using guassian assumption 

193 

194 Args: 

195 y (np.array): Targets/observation 

196 output (np.array): modeling output : (y,UQ) 

197 set_ (list of mask): subset specification 

198 mask (bool array): mask the last dimension 

199 reduce (bool): apply reduction 

200 type_UQ (str, optional): _description_. Defaults to "var". 

201 mode (_type_, optional): _description_. Defaults to None. 

202 

203 Returns: 

204 val: _description_ 

205 """ 

206 

207 pred, UQ = output 

208 if mode is None: 

209 sigma = UQ_proc.process_UQmeasure_to_sigma(UQ, type_UQ, pred, y=None) 

210 elif mode == "A": 

211 sigma = np.sqrt(UQ[0]) 

212 elif mode == "E": 

213 sigma = np.sqrt(UQ[1]) 

214 val = ( 

215 -np.log(sigma) - 0.5 * np.log(2 * np.pi) - ((y - pred) ** 2 / (2 * (sigma**2))) 

216 ) 

217 val[val < -7] = -7 

218 if mask is None: 

219 val = val[set_].mean(axis=0) 

220 else: 

221 val = val[set_].mean(axis=0)[mask] 

222 if reduce: 

223 val = val.mean() 

224 return val 

225 

226 

227def UQ_heteroscedasticity_ratio( 

228 y, output, set_, mask, reduce, type_UQ="var", mode=None, **kwarg 

229): 

230 """Compute ratio by transform UQ into sigma using guassian assumption 

231 

232 Args: 

233 y (np.array): Targets/observation 

234 output (np.array): modeling output : (y,UQ) 

235 set_ (list of mask): subset specification 

236 mask (bool array): mask the last dimension 

237 reduce (bool): apply reduction 

238 type_UQ (str, optional): _description_. Defaults to "var". 

239 mode (_type_, optional): _description_. Defaults to None. 

240 

241 Returns: 

242 val: _description_ 

243 """ 

244 

245 pred, UQ = output 

246 if mode is None: 

247 sigma = UQ_proc.process_UQmeasure_to_sigma(UQ, type_UQ, pred, y=None) 

248 elif mode == "A": 

249 sigma = np.sqrt(UQ[0]) 

250 elif mode == "E": 

251 sigma = np.sqrt(UQ[1]) 

252 # val = NLL_loss(y, pred, sigma) 

253 val = np.abs((y - pred)) / (sigma) 

254 val_temoin = np.abs((y - pred)) / (sigma.mean(axis=1)) 

255 

256 val[val < -6] = -6 

257 val_temoin[val_temoin < -6] = -6 

258 

259 if mask is None: 

260 val = (val[set_] / val_temoin[set_]).mean(axis=0) 

261 else: 

262 val = (val[set_] / val_temoin[set_]).mean(axis=0)[mask] 

263 

264 if reduce: 

265 val = val.mean() 

266 return val 

267 

268 

269def UQ_absolute_residu_score( 

270 y, output, set_, mask, reduce, type_UQ="var", mode=None, **kwarg 

271): 

272 """Compute absolute residu score from UQ,pred,y 

273 

274 Args: 

275 y (np.array): Targets/observation 

276 output (np.array): modeling output : (y,UQ) 

277 set_ (list of mask): subset specification 

278 mask (bool array): mask the last dimension 

279 reduce (bool): apply reduction 

280 type_UQ (str, optional): _description_. Defaults to "var". 

281 mode (_type_, optional): _description_. Defaults to None. 

282 

283 Returns: 

284 val: val 

285 """ 

286 pred, UQ = output 

287 residu_score = UQ_proc.process_UQmeasure_to_residu(UQ, type_UQ, pred, y=y) 

288 # val = NLL_loss(y, pred, sigma) 

289 val = np.abs(residu_score) 

290 if mask is None: 

291 val = val[set_].mean(axis=0) 

292 else: 

293 val = val[set_].mean(axis=0)[mask] 

294 if reduce: 

295 val = val.mean() 

296 return val 

297 

298 

299def UQ_dEI(y, output, set_, mask, reduce, type_UQ="var_A&E", **kwarg): 

300 """disentangled epistemic indicator from pred,UQ that provide insight about model unreliability 

301 

302 Args: 

303 y (np.array): Targets/observation 

304 output (np.array): modeling output : (y,UQ) 

305 set_ (list of mask): subset specification 

306 mask (bool array): mask the last dimension 

307 reduce (bool): apply reduction 

308 

309 Returns: 

310 val: array of metrics values 

311 """ 

312 if type_UQ != "var_A&E": 

313 print("erreur metric only compatible with type_UQ = var A&E") 

314 pred, (var_A, var_E) = output 

315 var_A, var_E = np.maximum(var_A, 0.00001), np.maximum(var_E, 0.00001) 

316 val = -0.5 * np.log(1 + (var_A[set_] / var_E[set_])).mean(axis=0) 

317 val = val 

318 if mask is not None: 

319 val = val[mask] 

320 if reduce: 

321 val = val.mean() 

322 return val 

323 

324 

325# Metric to rework 

326 

327 

328def calibrate_var( 

329 y, output, set_, mask, reduce, type_output="all", alpha=0.955, **kwarg 

330): 

331 per_rejection = 1 - alpha 

332 pred, (var_A, var_E) = output 

333 pred, (var_A, var_E) = output 

334 if type_output == "epistemic": 

335 pass 

336 elif type_output == "aleatoric": 

337 pass 

338 elif type_output == "all": 

339 var_A + var_E 

340 

341 Empirical_coverage = average_coverage( 

342 y, output, np.arange(len(y)), mask, reduce, type_output 

343 ) 

344 

345 Empirical_coef = scipy.stats.norm.ppf(1 - ((1 - Empirical_coverage) / 2), 0, 1) 

346 True_coeff = scipy.stats.norm.ppf(1 - (per_rejection / 2), 0, 1) 

347 corr_ratio = np.power(True_coeff / Empirical_coef, 2) 

348 if type_output == "epistemic": 

349 new_output = pred, output[1], output[2] * corr_ratio 

350 elif type_output == "aleatoric": 

351 new_output = pred, output[1] * corr_ratio, output[2] 

352 elif type_output == "all": 

353 new_output = pred, output[1] * corr_ratio, output[2] * corr_ratio 

354 return new_output 

355 

356 

357def mae(y, output, set_, mask, reduce, **kwarg): 

358 pred, (var_A, var_E) = output 

359 

360 if mask is None: 

361 val = np.abs(pred[set_] - y[set_]).mean(axis=0) 

362 else: 

363 val = np.abs(pred[set_] - y[set_]).mean(axis=0)[mask] 

364 

365 if reduce: 

366 val = val.mean() 

367 return val 

368 

369 

370def cov_metrics(y, y_lower, y_upper, **kwarg): 

371 return ((y >= y_lower) & (y <= y_upper)).mean(axis=0) 

372 

373 

374def dEI(y, output, set_, mask, reduce, type_output="all", **kwarg): 

375 pred, (var_A, var_E) = output 

376 var_A, var_E = np.maximum(var_A, 0.00001), np.maximum(var_E, 0.00001) 

377 val = -0.5 * np.log(1 + (var_A[set_] / var_E[set_])).mean(axis=0) 

378 val = val 

379 if mask is not None: 

380 val = val[mask] 

381 if reduce: 

382 val = val.mean() 

383 return val 

384 

385 

386def anom_score( 

387 y, output, set_, mask, reduce, type_output="all", min_A=0.08, min_E=0.02, **kwarg 

388): 

389 pred, (var_A, var_E) = output 

390 if type_output == "epistemic": 

391 pass 

392 elif type_output == "aleatoric": 

393 pass 

394 elif type_output == "all": 

395 var_A + var_E 

396 ind_A = np.sqrt(var_A) 

397 

398 ind_E = np.sqrt(var_E) 

399 ind_E[ind_E < min_E] = min_E 

400 ind_A[ind_A < min_A] = min_A 

401 anom_score = (np.abs(y - pred) + ind_E) / ( 

402 2 * np.sqrt(np.power(ind_E, 2) + np.power(ind_A, 2)) 

403 ) 

404 

405 if mask is None: 

406 val = (anom_score[set_]).mean(axis=0) 

407 else: 

408 val = (anom_score[set_]).mean(axis=0)[mask] 

409 if reduce: 

410 val = val.mean() 

411 return val 

412 

413 

414def confidence_score( 

415 y, output, set_, mask, reduce, type_output="all", min_A=0.08, min_E=0.02, **kwarg 

416): 

417 pred, var_A, var_E = output 

418 if type_output == "epistemic": 

419 pass 

420 elif type_output == "aleatoric": 

421 pass 

422 elif type_output == "all": 

423 var_A + var_E 

424 

425 ind_A = np.sqrt(var_A) 

426 ind_E = np.sqrt(var_E) 

427 ind_E[ind_E < min_E] = min_E 

428 ind_A[ind_A < min_A] = min_A 

429 confidence_score = ind_E / np.power(ind_A, 0.75) 

430 

431 if mask is None: 

432 val = (confidence_score[set_]).mean(axis=0) 

433 else: 

434 val = (confidence_score[set_]).mean(axis=0)[mask] 

435 if reduce: 

436 val = val.mean() 

437 return val 

438 

439 

440# Visualisation tools