Coverage for uqmodels/postprocessing/custom_UQKPI_Processor.py: 20%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1from copy import deepcopy
3import numpy as np
5import uqmodels.postprocessing.anomaly_processing as anom_proc
6import uqmodels.postprocessing.UQ_processing as UQ_proc
7import uqmodels.postprocessing.UQKPI_Processor as UQKPI_proc
8from uqmodels.processing import Processor
9from uqmodels.utils import apply_middledim_reduction
12class Multiscale_Anomscore_processor(UQKPI_proc.UQKPI_Processor):
13 """Processor aiming to produce an contextual deviation anomaly score from UQmeasure prediction and observation"""
15 def __init__(
16 self,
17 name="Multiscale_anomscore",
18 KPI_parameters=dict(),
19 cache=None,
20 random_state=None,
21 **kwargs
22 ):
23 """Initialization
25 Args:
26 name (str, optional): name. Defaults to 'Anomscore'.
27 KPI_parameters (_type_, optional): dict of parameters link to compute_score function.
28 Defaults to None will use predefined paramters.
29 cache (_type_, optional): Cache manager or none
30 """
32 KPI_parameters_default = {
33 "type_norm": "Nsigma_local",
34 "q_var": 1,
35 "d": 2,
36 "beta": 0.001,
37 "beta_source": 0.001,
38 "beta_global": 0.001,
39 "min_cut": 0.002,
40 "max_cut": 0.998,
41 "per_seuil": 0.999,
42 "dim_chan": 1,
43 "type_fusion": "mahalanobis",
44 "filt": [0.1, 0.2, 0.5, 0.2, 0.1],
45 }
47 for key in KPI_parameters_default.keys():
48 if key not in KPI_parameters.keys():
49 KPI_parameters[key] = KPI_parameters_default[key]
51 super().__init__(
52 name=name,
53 KPI_parameters=KPI_parameters,
54 cache=cache,
55 random_state=random_state,
56 **kwargs
57 )
59 self.params_ = None
61 def _aux_proc(
62 self,
63 list_UQ,
64 type_UQ,
65 list_pred,
66 list_y,
67 type_UQ_params=None,
68 ctx_mask=None,
69 mutliscale_anom_score_params_=None,
70 **kwargs
71 ):
72 """Auxialiar function that implement both fit and tranform procedure for Mutliscale_anom_score.
74 It is mainly based on the application of anom_proc.fit/compute_anom_score &
75 anom_proc.fit/compute_score_fusion.
77 Args:
78 UQ (np.array): UQmeasure provide by the UQestiamtor.
79 type_UQ (str): Type_UQ of the UQestimators.
80 pred (np.array): Prediction of the predictor or the UQestimator.
81 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit
82 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters.
83 Defaults to None.
84 """
86 n_dim = len(list_y)
87 n_chan = list_y[0].shape[2]
88 n_step = max([len(list_y[source_id]) for source_id in range(n_dim)])
90 S_anom_chan = np.zeros((n_step, n_dim * n_chan))
91 S_anom_source = np.zeros((n_step, n_dim))
92 list_bot = []
93 list_top = []
95 mode_fit = False
96 if mutliscale_anom_score_params_ is None:
97 mode_fit = True
98 mutliscale_anom_score_params_ = {
99 "anom_score_loc_params_": [],
100 "score_fusion_loc_params_": [],
101 "score_fusion_agg_params_": None,
102 }
104 for n_s in range(n_dim):
105 pred, UQ = list_pred[n_s], list_UQ[n_s]
106 y = list_y[n_s]
108 anom_score_loc_params_ = None
109 score_fusion_loc_params_ = None
110 if not mode_fit:
111 anom_score_loc_params_ = mutliscale_anom_score_params_[
112 "anom_score_loc_params_"
113 ][n_s]
115 score_fusion_loc_params_ = mutliscale_anom_score_params_[
116 "score_fusion_loc_params_"
117 ][n_s]
119 # Use fact that if params_ is none then fit is computed internall
120 (s_loc, born), anom_score_loc_params_ = anom_proc.compute_anom_score(
121 UQ=UQ,
122 type_UQ=type_UQ,
123 pred=pred,
124 y=y,
125 type_UQ_params=type_UQ_params,
126 ctx_mask=ctx_mask,
127 with_born=True,
128 params_=anom_score_loc_params_,
129 **self.KPI_parameters,
130 **kwargs
131 )
132 mutliscale_anom_score_params_["anom_score_loc_params_"].append(
133 anom_score_loc_params_
134 )
136 # Use fact that if params_ is none then fit is computed internally
137 s_agg, score_fusion_loc_params_ = anom_proc.compute_score_fusion(
138 s_loc,
139 ctx_mask=ctx_mask,
140 params_=score_fusion_loc_params_,
141 **self.KPI_parameters
142 )
144 mutliscale_anom_score_params_["score_fusion_loc_params_"].append(
145 score_fusion_loc_params_
146 )
148 mask = (np.arange(n_dim * n_chan) >= (n_s * n_chan)) & (
149 np.arange(n_dim * n_chan) < ((n_s + 1) * n_chan)
150 )
152 S_anom_chan[:, mask] = s_loc
153 S_anom_source[:, n_s] = s_agg[:, 0]
155 list_bot.append(born[0])
156 list_top.append(born[1])
158 # type_norm may be Chi² -> Exploratory works
160 score_fusion_agg_params_ = None
161 if not mode_fit:
162 score_fusion_agg_params_ = mutliscale_anom_score_params_[
163 "score_fusion_agg_params_"
164 ]
166 S_anom_agg, score_fusion_agg_params_ = anom_proc.compute_score_fusion(
167 S_anom_source,
168 ctx_mask=None,
169 beta=self.KPI_parameters["beta_global"],
170 type_fusion=self.KPI_parameters["type_fusion"],
171 type_norm="quantiles_global",
172 per_seuil=0.995,
173 d=2,
174 filt=self.KPI_parameters["filt"],
175 params_=score_fusion_agg_params_,
176 )
178 mutliscale_anom_score_params_["score_fusion_agg_params_"] = (
179 score_fusion_agg_params_
180 )
182 if mode_fit:
183 return mutliscale_anom_score_params_
185 else:
186 return S_anom_chan, S_anom_source, S_anom_agg, list_bot, list_top
188 def fit(
189 self,
190 list_UQ,
191 type_UQ,
192 list_pred,
193 list_y,
194 type_UQ_params=None,
195 ctx_mask=None,
196 **kwargs
197 ):
198 """Fitting procedure aim to estimate and store Multiscale_Anomscore_processor params for
199 Multiscale_Anomscore computation
201 Args:
202 UQ (np.array): UQmeasure provide by the UQestiamtor.
203 type_UQ (str): Type_UQ of the UQestimators.
204 pred (np.array): Prediction of the predictor or the UQestimator.
205 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit
206 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters.
207 Defaults to None.
208 """
209 self.params_ = self._aux_proc(
210 list_UQ,
211 type_UQ,
212 list_pred,
213 list_y,
214 type_UQ_params=type_UQ_params,
215 ctx_mask=ctx_mask,
216 )
217 super().fit(type_UQ=type_UQ)
219 def transform(
220 self,
221 list_UQ,
222 type_UQ,
223 list_pred,
224 list_y,
225 type_UQ_params=None,
226 ctx_mask=None,
227 **kwargs
228 ):
229 """Transform procedure aim to transform (predictor) & UQestimator output into a Multiscale_Anomscore
230 according to Multiscale_Anomscore_params
232 Args:
233 UQ (np.array): UQmeasure provide by the UQestiamtor.
234 type_UQ (str): Type_UQ of the UQestimators.
235 pred (np.array): Prediction of the predictor or the UQestimator.
236 y (np.array, optional): Targets/Observations, can be None if processor does't need y to fit
237 type_UQ_params (type_UQ_params, optional): Additional information about type_UQ parameters.
238 Defaults to None.
240 Returns:
241 A tuple (S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top)
242 where S_anom_chan is a mutli-dimensional providing Anom score for each channels of each source
243 on compute_anom_score,
244 S_anom_source is an aggregated score providing Anom score at source lvl on compute_score_fusion,
245 S_anom_agg is an aggregated score providing 1D Anom score based on compute_score_fusion,
246 list_bot is an anomalie lower threeshold for each chan of each sensors provided by compute_anom_score
247 and list_top is an anomalie upper threeshold for each chan of each sensors provided by compute_anom_score
248 """
249 super().transform(type_UQ=type_UQ)
250 S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top = self._aux_proc(
251 list_UQ,
252 type_UQ,
253 list_pred,
254 list_y,
255 type_UQ_params=type_UQ_params,
256 ctx_mask=ctx_mask,
257 mutliscale_anom_score_params_=self.params_,
258 **kwargs
259 )
260 return (S_anom_chan, S_anom_agg, S_anom_source, list_bot, list_top)