Coverage for uqmodels/postprocessing/custom_UQKPI_Processor.py: 20%

59 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1from copy import deepcopy 

2 

3import numpy as np 

4 

5import uqmodels.postprocessing.anomaly_processing as anom_proc 

6import uqmodels.postprocessing.UQ_processing as UQ_proc 

7import uqmodels.postprocessing.UQKPI_Processor as UQKPI_proc 

8from uqmodels.processing import Processor 

9from uqmodels.utils import apply_middledim_reduction 

10 

11 

12class Multiscale_Anomscore_processor(UQKPI_proc.UQKPI_Processor): 

13 """Processor aiming to produce an contextual deviation anomaly score from UQmeasure prediction and observation""" 

14 

15 def __init__( 

16 self, 

17 name="Multiscale_anomscore", 

18 KPI_parameters=dict(), 

19 cache=None, 

20 random_state=None, 

21 **kwargs 

22 ): 

23 """Initialization 

24 

25 Args: 

26 name (str, optional): name. Defaults to 'Anomscore'. 

27 KPI_parameters (_type_, optional): dict of parameters link to compute_score function. 

28 Defaults to None will use predefined paramters. 

29 cache (_type_, optional): Cache manager or none 

30 """ 

31 

32 KPI_parameters_default = { 

33 "type_norm": "Nsigma_local", 

34 "q_var": 1, 

35 "d": 2, 

36 "beta": 0.001, 

37 "beta_source": 0.001, 

38 "beta_global": 0.001, 

39 "min_cut": 0.002, 

40 "max_cut": 0.998, 

41 "per_seuil": 0.999, 

42 "dim_chan": 1, 

43 "type_fusion": "mahalanobis", 

44 "filt": [0.1, 0.2, 0.5, 0.2, 0.1], 

45 } 

46 

47 for key in KPI_parameters_default.keys(): 

48 if key not in KPI_parameters.keys(): 

49 KPI_parameters[key] = KPI_parameters_default[key] 

50 

51 super().__init__( 

52 name=name, 

53 KPI_parameters=KPI_parameters, 

54 cache=cache, 

55 random_state=random_state, 

56 **kwargs 

57 ) 

58 

59 self.params_ = None 

60 

61 def _aux_proc( 

62 self, 

63 list_UQ, 

64 type_UQ, 

65 list_pred, 

66 list_y, 

67 type_UQ_params=None, 

68 ctx_mask=None, 

69 mutliscale_anom_score_params_=None, 

70 **kwargs 

71 ): 

72 """Auxialiar function that implement both fit and tranform procedure for Mutliscale_anom_score. 

73 

74 It is mainly based on the application of anom_proc.fit/compute_anom_score & 

75 anom_proc.fit/compute_score_fusion. 

76 

77 Args: 

78 UQ (np.array): UQmeasure provide by the UQestiamtor. 

79 type_UQ (str): Type_UQ of the UQestimators. 

80 pred (np.array): Prediction of the predictor or the UQestimator. 

81 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit 

82 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters. 

83 Defaults to None. 

84 """ 

85 

86 n_dim = len(list_y) 

87 n_chan = list_y[0].shape[2] 

88 n_step = max([len(list_y[source_id]) for source_id in range(n_dim)]) 

89 

90 S_anom_chan = np.zeros((n_step, n_dim * n_chan)) 

91 S_anom_source = np.zeros((n_step, n_dim)) 

92 list_bot = [] 

93 list_top = [] 

94 

95 mode_fit = False 

96 if mutliscale_anom_score_params_ is None: 

97 mode_fit = True 

98 mutliscale_anom_score_params_ = { 

99 "anom_score_loc_params_": [], 

100 "score_fusion_loc_params_": [], 

101 "score_fusion_agg_params_": None, 

102 } 

103 

104 for n_s in range(n_dim): 

105 pred, UQ = list_pred[n_s], list_UQ[n_s] 

106 y = list_y[n_s] 

107 

108 anom_score_loc_params_ = None 

109 score_fusion_loc_params_ = None 

110 if not mode_fit: 

111 anom_score_loc_params_ = mutliscale_anom_score_params_[ 

112 "anom_score_loc_params_" 

113 ][n_s] 

114 

115 score_fusion_loc_params_ = mutliscale_anom_score_params_[ 

116 "score_fusion_loc_params_" 

117 ][n_s] 

118 

119 # Use fact that if params_ is none then fit is computed internall 

120 (s_loc, born), anom_score_loc_params_ = anom_proc.compute_anom_score( 

121 UQ=UQ, 

122 type_UQ=type_UQ, 

123 pred=pred, 

124 y=y, 

125 type_UQ_params=type_UQ_params, 

126 ctx_mask=ctx_mask, 

127 with_born=True, 

128 params_=anom_score_loc_params_, 

129 **self.KPI_parameters, 

130 **kwargs 

131 ) 

132 mutliscale_anom_score_params_["anom_score_loc_params_"].append( 

133 anom_score_loc_params_ 

134 ) 

135 

136 # Use fact that if params_ is none then fit is computed internally 

137 s_agg, score_fusion_loc_params_ = anom_proc.compute_score_fusion( 

138 s_loc, 

139 ctx_mask=ctx_mask, 

140 params_=score_fusion_loc_params_, 

141 **self.KPI_parameters 

142 ) 

143 

144 mutliscale_anom_score_params_["score_fusion_loc_params_"].append( 

145 score_fusion_loc_params_ 

146 ) 

147 

148 mask = (np.arange(n_dim * n_chan) >= (n_s * n_chan)) & ( 

149 np.arange(n_dim * n_chan) < ((n_s + 1) * n_chan) 

150 ) 

151 

152 S_anom_chan[:, mask] = s_loc 

153 S_anom_source[:, n_s] = s_agg[:, 0] 

154 

155 list_bot.append(born[0]) 

156 list_top.append(born[1]) 

157 

158 # type_norm may be Chi² -> Exploratory works 

159 

160 score_fusion_agg_params_ = None 

161 if not mode_fit: 

162 score_fusion_agg_params_ = mutliscale_anom_score_params_[ 

163 "score_fusion_agg_params_" 

164 ] 

165 

166 S_anom_agg, score_fusion_agg_params_ = anom_proc.compute_score_fusion( 

167 S_anom_source, 

168 ctx_mask=None, 

169 beta=self.KPI_parameters["beta_global"], 

170 type_fusion=self.KPI_parameters["type_fusion"], 

171 type_norm="quantiles_global", 

172 per_seuil=0.995, 

173 d=2, 

174 filt=self.KPI_parameters["filt"], 

175 params_=score_fusion_agg_params_, 

176 ) 

177 

178 mutliscale_anom_score_params_["score_fusion_agg_params_"] = ( 

179 score_fusion_agg_params_ 

180 ) 

181 

182 if mode_fit: 

183 return mutliscale_anom_score_params_ 

184 

185 else: 

186 return S_anom_chan, S_anom_source, S_anom_agg, list_bot, list_top 

187 

188 def fit( 

189 self, 

190 list_UQ, 

191 type_UQ, 

192 list_pred, 

193 list_y, 

194 type_UQ_params=None, 

195 ctx_mask=None, 

196 **kwargs 

197 ): 

198 """Fitting procedure aim to estimate and store Multiscale_Anomscore_processor params for 

199 Multiscale_Anomscore computation 

200 

201 Args: 

202 UQ (np.array): UQmeasure provide by the UQestiamtor. 

203 type_UQ (str): Type_UQ of the UQestimators. 

204 pred (np.array): Prediction of the predictor or the UQestimator. 

205 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit 

206 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters. 

207 Defaults to None. 

208 """ 

209 self.params_ = self._aux_proc( 

210 list_UQ, 

211 type_UQ, 

212 list_pred, 

213 list_y, 

214 type_UQ_params=type_UQ_params, 

215 ctx_mask=ctx_mask, 

216 ) 

217 super().fit(type_UQ=type_UQ) 

218 

219 def transform( 

220 self, 

221 list_UQ, 

222 type_UQ, 

223 list_pred, 

224 list_y, 

225 type_UQ_params=None, 

226 ctx_mask=None, 

227 **kwargs 

228 ): 

229 """Transform procedure aim to transform (predictor) & UQestimator output into a Multiscale_Anomscore 

230 according to Multiscale_Anomscore_params 

231 

232 Args: 

233 UQ (np.array): UQmeasure provide by the UQestiamtor. 

234 type_UQ (str): Type_UQ of the UQestimators. 

235 pred (np.array): Prediction of the predictor or the UQestimator. 

236 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit 

237 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters. 

238 Defaults to None. 

239 

240 Returns: 

241 A tuple (S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top) 

242 where S_anom_chan is a mutli-dimensional providing Anom score for each channels of each source 

243 on compute_anom_score, 

244 S_anom_source is an aggregated score providing Anom score at source lvl on compute_score_fusion, 

245 S_anom_agg is an aggregated score providing 1D Anom score based on compute_score_fusion, 

246 list_bot is an anomalie lower threeshold for each chan of each sensors provided by compute_anom_score 

247 and list_top is an anomalie upper threeshold for each chan of each sensors provided by compute_anom_score 

248 """ 

249 super().transform(type_UQ=type_UQ) 

250 S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top = self._aux_proc( 

251 list_UQ, 

252 type_UQ, 

253 list_pred, 

254 list_y, 

255 type_UQ_params=type_UQ_params, 

256 ctx_mask=ctx_mask, 

257 mutliscale_anom_score_params_=self.params_, 

258 **kwargs 

259 ) 

260 return (S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top)