Coverage for uqmodels/modelization/ML_estimator/random_forest_UQ.py: 49%
199 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1import numpy as np
2from joblib import Parallel, delayed
3from sklearn.ensemble import RandomForestRegressor
4from sklearn.ensemble._forest import _generate_sample_indices, _get_n_samples_bootstrap
6from uqmodels.modelization.UQEstimator import UQEstimator
7from uqmodels.utils import EPSILON
10class RF_UQEstimator(UQEstimator):
11 """Uncertainty quantification approch based on "local" sub-sampling UQ estimation from Random forest
12 neighboorhood extraction"""
14 def __init__(
15 self,
16 estimator=RandomForestRegressor(),
17 pretuned=False,
18 type_UQ="var",
19 use_biais=True,
20 rescale=True,
21 n_jobs=4,
22 beta=0.1,
23 var_min=0.00001,
24 random_state=None,
25 ):
26 """Initialization
28 Args:
29 estimator (_type_, optional): RandomForestRegressor with meta-parameters
30 pretuned (bool, optional): bool flag that freeze estimator. Defaults to False.
31 type_UQ (str, optional): nature of UQmeasure. Defaults to 'var'.
32 use_biais (bool, optional): use oob biais correction. Defaults to True.
33 rescale (bool, optional): use rescale procedure. Defaults to True.
34 n_jobs (int, optional): number of jobs used for parallelization purpose. Defaults to 4.
35 beta (float, optional): miss coverage targets in case of type_UQ = quantile
36 var_min (float, optional): minimal variance. Defaults to 0.00001.
37 random_state (bool): handle experimental random using seed.
38 """
40 super().__init__(
41 name="RF_UQEstimator",
42 type_UQ=type_UQ,
43 rescale=rescale,
44 var_min=var_min,
45 random_state=random_state,
46 )
48 self.beta = beta
49 if type_UQ in ["quantile", "res_quantile"]:
50 self.list_alpha = [beta / 2, (1 - beta / 2)]
51 self.type_UQ_params = {"list_alpha": self.list_alpha}
52 self.use_biais = use_biais
53 self.list_statistics = [
54 "pred",
55 "n_obs",
56 "aleatoric",
57 "epistemic",
58 "oob_aleatoric",
59 ]
60 if self.use_biais:
61 self.list_statistics.append("biais")
63 self.pretuned = pretuned
64 self.estimator = estimator
65 if self.random_state is not None:
66 self.estimator.set_params(random_state=self.random_state)
67 self.dict_leaves_statistics = dict()
68 self.n_jobs = n_jobs
69 if self.type_UQ in ["var", "res_var"]:
70 self.list_statistics.append("var")
72 elif self.type_UQ in ["2var", "res_2var"]:
73 self.list_statistics.append("var_bot")
74 self.list_statistics.append("var_top")
76 elif self.type_UQ in ["quantile", "res_quantile"]:
77 self.list_statistics.append("Q_bot")
78 self.list_statistics.append("Q_top")
80 self.Y_shape = None
82 def _format(self, X, y, type_transform, mode_UQ=False, skip_format=False, **kwargs):
83 """data & output standarization : see UQEstimator._format
84 Args:
85 X (np.array or None): Features
86 y (np.array or None): Targets/observations or prediction or UQmeasure
87 type_transform (str): _description_
88 mode_UQ (bool, optional): True if normalise UQ. Defaults to False.
89 var_min (int, optional): _description_. Defaults to 0.
91 Returns:
92 _type_: _description_
93 """
94 X, y = super()._format(
95 X,
96 y,
97 type_transform=type_transform,
98 mode_UQ=mode_UQ,
99 skip_format=skip_format,
100 )
101 return X, y
103 def fit(self, X, y, sample_weight=None, skip_format=False, **kwargs):
104 """Train scikit RF model and then estimate and store leafs variance to uncertainty quantification purpose
105 Args:
106 X (array): Features of the training set
107 y (array): Targets/Observations of the training set
108 """
110 def aux_leaf_statistics(Y_leaf, pred_leaf, Y_oob_leaf, list_statistics, beta):
111 """Auxiliar function : Extract statistics of a leaf:
113 Args:
114 Y_leaf ([float np.array (?,dim)]): Target of train leaf's elements
115 pred_leaf ([float np.array (?,dim)]): Predict of train leaf's elements
116 Y_oob_leaf ([float np.array (?,dim)]): Target of oob(out-of-bound) leaf's elements
117 list_statistics ([list]) List of statistics to compute.
118 beta ([float]): miss-coverage target (used for quantile estimation)
120 Ouput:
121 dict_statistics : dict of extracted statistics statistics.
122 """
124 dict_statistics = dict()
126 n_obs = len(Y_leaf)
127 len(Y_oob_leaf)
128 if "n_obs" in list_statistics:
129 dict_statistics["n_obs"] = n_obs
131 # Compute biais : Error on out of "bag" sample (Part of train sample leave beside for the Tree)
132 biais = 0
133 if "biais" in list_statistics: # Moyenne du biais
134 if len(Y_oob_leaf) > 4:
135 biais = (Y_oob_leaf - Y_leaf.mean(axis=0)).mean(axis=0)
136 dict_statistics["biais"] = biais
138 if "oob_aleatoric" in list_statistics: # Variance du biais
139 dict_statistics["oob_aleatoric"] = 0
140 if len(Y_oob_leaf) > 1:
141 dict_statistics["oob_aleatoric"] = (Y_oob_leaf).var(axis=0, ddof=1)
143 # Compute : the whole bias : Mean of leafs erros on both Used train sample and Non-Used train sample.
144 # If val_pred is the tree forecast values (Y_train_leaf - val_pred) = 0 so biais = biais
146 # Debias forecast values and compute residuals in order to perform variance estimation
147 Residual = Y_leaf - Y_leaf.mean(axis=0)
149 if "pred" in list_statistics:
150 dict_statistics["pred"] = Y_leaf.mean(axis=0)
152 # Estimation of leaf residuals variance.
153 if "var" in list_statistics:
154 dict_statistics["var"] = (np.power(Y_leaf, 2) / (n_obs)).sum(axis=0)
156 # Estimation of exploratory statistics
157 if "aleatoric" in list_statistics:
158 dict_statistics["aleatoric"] = Y_leaf.var(
159 axis=0, ddof=1
160 ) # partial E[Var[X]]
162 # partial E[X] No biais because reducing with mean that doesn't take
163 # account biais (native RF predict function)
164 if "epistemic" in list_statistics:
165 dict_statistics["epistemic"] = np.power(Y_leaf.mean(axis=0), 2)
167 # Identify negative and positive residuals (for '2var' or 'quantile' estimation)
168 Residual = Y_leaf - (Y_leaf.mean(axis=0) - biais)
169 flag_res_bot = Residual <= 0
170 flag_res_top = Residual >= 0
172 # Estimation of positive and negative leaf residuals variance.
173 if "var_bot" in list_statistics:
174 # Identify negative and positive residuals
175 flag_res_bot = Residual <= 0
176 flag_res_top = Residual >= 0
177 dict_statistics["var_bot"] = EPSILON
178 dict_statistics["var_top"] = EPSILON
179 if (flag_res_bot).sum() > 2:
180 dict_statistics["var_bot"] = Residual[flag_res_bot].var(
181 axis=0, ddof=1
182 )
184 if (flag_res_top).sum() > 2:
185 dict_statistics["var_top"] = Residual[flag_res_top].var(
186 axis=0, ddof=1
187 )
189 if "Q_bot" in list_statistics:
190 # Identify negative and positive residuals
191 flag_res_bot = Residual <= 0
192 flag_res_top = Residual >= 0
193 dict_statistics["Q_bot"] = EPSILON
194 dict_statistics["Q_top"] = EPSILON
195 if (flag_res_bot).sum() > 2:
196 dict_statistics["Q_bot"] = np.quantile(
197 Residual[flag_res_bot],
198 beta * (1 + 1 / flag_res_bot.sum()),
199 axis=0,
200 )
201 if (flag_res_top).sum() > 2:
202 dict_statistics["Q_top"] = np.quantile(
203 Residual[flag_res_top],
204 np.minimum((1 - beta) * (1 + 1 / flag_res_top.sum()), 0.995),
205 axis=0,
206 )
208 return dict_statistics
210 def aux_tree_statistics(
211 num_tree,
212 n_samples,
213 max_samples,
214 y,
215 pred,
216 tree_affectation,
217 list_statistics,
218 beta,
219 simple_m,
220 bootstrap,
221 ):
222 """Extraction of statistics for each leaves of a tree
224 Args:
225 num_tree ([int]): ID of the tree
226 n_samples ([float]): Random forest paramater (used to reproduce the trainning set)
227 max_samples ([int]): Random forest parameter (used to reproduce the trainning set)
228 y ([array]): Target of training set values
229 pred ([array]): Forecast of training set values
230 tree_affectation ([érray]): Array of element leaves affectations
231 pred_true ([2D Array of float]): Forecast values !!! Non-used !!!
232 list_statistics ([list]) List of statistics to compute.
233 beta ([float]): miss-coverage target (used for quantile estimation)
234 simple_m ([object]): scikit learn decision tree model
236 Output:
237 Side effect on dict_statistics
238 """
240 # Regenerate the bootstrat training sample thank to scikit function
242 leaves = list(set(tree_affectation))
243 if bootstrap:
244 n_samples_bootstrap = _get_n_samples_bootstrap(n_samples, max_samples)
245 re_sample = _generate_sample_indices(
246 simple_m.random_state, n_samples, n_samples_bootstrap
247 )
248 inv_draw = np.ones(n_samples)
249 inv_draw[re_sample] = 0
250 oob_sample = np.repeat(np.arange(inv_draw.size), inv_draw.astype(int))
251 else:
252 # Compute leaves affectation for the tree on bootstrap sample.
254 # Identify non-used training data : oob data
255 re_sample = np.arange(n_samples)
256 inv_draw = np.ones(n_samples)
257 inv_draw[re_sample] = 0
258 oob_sample = np.repeat(np.arange(inv_draw.size), inv_draw.astype(int))
259 # Compute leaves affectation for the oob sample.
261 leaves_interest = []
262 # add key : (num_tree,num_leaf) to save leaf values.
264 # For each (visited) leaves :
265 tree_statistics = dict()
266 for num_leaf in leaves:
267 # Identify concerned bootstrap and oob observations.
268 Y_leaf = y[re_sample[tree_affectation[re_sample] == num_leaf]]
269 pred_leaf = pred[re_sample[tree_affectation[re_sample] == num_leaf]]
270 Y_oob_leaf = y[oob_sample[tree_affectation[oob_sample] == num_leaf]]
272 # Extract leaf statistics
273 tree_statistics[num_leaf] = aux_leaf_statistics(
274 Y_leaf, pred_leaf, Y_oob_leaf, list_statistics, beta
275 )
276 if (num_tree, num_leaf) in leaves_interest:
277 tree_statistics[num_leaf]["Y_leaf"] = Y_leaf
278 return (num_tree, tree_statistics)
280 beta = self.beta
281 X, y = self._format(X, y, "fit_transform", skip_format=skip_format)
282 # self.X_train = X
283 # self.Y_train = y
284 self.Y_shape = y.shape
285 list_statistics = self.list_statistics
286 model_rf = self.estimator
287 # Fit the model using scikit method
288 model_rf.fit(X, y, sample_weight=sample_weight)
289 RF_affectation = model_rf.apply(X)
290 n_estimators = int(model_rf.n_estimators)
291 pred = model_rf.predict(X)
293 # Extract subsample statistics
294 parrallel_inputs = [
295 (
296 num_tree,
297 len(y),
298 model_rf.max_samples,
299 y,
300 pred,
301 RF_affectation[:, num_tree],
302 list_statistics,
303 beta,
304 model_rf.estimators_[num_tree],
305 model_rf.bootstrap,
306 )
307 for num_tree in np.arange(n_estimators)
308 ]
310 Rf_leaves_statistics = Parallel(n_jobs=self.n_jobs)(
311 delayed(aux_tree_statistics)(*inputs) for inputs in parrallel_inputs
312 )
313 # Store leaves statistics of each tree in a dict
314 dict_leaves_statistics = dict()
315 for num_tree, dict_tree_statistics in Rf_leaves_statistics:
316 dict_leaves_statistics[num_tree] = dict_tree_statistics
317 self.dict_leaves_statistics = dict_leaves_statistics
318 self.is_fitted = True
319 return
321 def predict(self, X, beta=None, skip_format=False, **kwargs):
322 """Predict both forecast and UQ estimations values
323 Args:
324 X ([type]): Features of the data to forecast
325 beta ([type]): Miss-coverage target
327 Returns:
328 pred ([array]): Forecast values
329 UQ ([type]): UQmeasure
330 """
332 if beta is None:
333 beta = self.beta
335 X, _ = self._format(X, None, "transform", skip_format=skip_format)
337 # Call auxiliaire function that compute RF statistics from leaf subsampling.
338 pred, biais, UQ, var_A, var_E, _ = self.RF_extraction(X)
340 if self.use_biais:
341 pred = pred - biais
343 # Compute (top,bot) boundaries from (1 or 2)-var estimation
345 _, UQ = self._format(None, UQ, "inverse_transform", mode_UQ=True)
346 _, pred = self._format(None, pred, "inverse_transform")
348 return (pred, UQ)
350 def RF_extraction(self, X):
351 """Random-forest subsampling statistics extraction "
353 Args:
354 X ([array]): Features of elements.
356 Output:
357 Statistics array of shape (n_obs,dim):
358 'Pred' : Random forest forecast values
359 'Biais' : RF Biais computed as the sum of Oob Tree biais
361 'UQ' : Shape of UQ depends of the type_UQ !!!
362 IF mode=var:
363 UQ = Var RF variance computed as the sum of esiduals' variance of the leaves
364 IF mode=2-var:
365 UQ = (Var_bot,Var_top), !!! 2-uple of 2D array !!!
366 Var_bot : bot variance stimation (partial sum) as sum of negative residuals' variance of the leaves
367 Var_top : top variance stimation (partial sum) as sum of positive residuals' variance of the leaves
368 IF mode=quantile:
369 UQ = (Q_bot,Q_top), !!! 2-uple of 2D array !!!
370 'Q_bot' : Bot quantile estimation (partial sum) as ponderation of leaves' bot quantile
371 'Q_top' : Top quantile estimatuon (partial sum) as ponderation of leaves' top quantile
373 Other advanced statistics.
374 'Biais_oob' : part of Biais
375 'Var_E' : Part of total variance (Law of total variance)
376 'E_Var' : Part of total variance (Law of total variance)
377 'Var_oob' : Part of total variance (related to biais)
378 """
380 def aux_predict(
381 shape,
382 list_statistics,
383 list_RF_affectation,
384 list_dict_tree_statistics,
385 prediction,
386 ):
387 """Aggregate statistics (partial sum) of serveral trees.
388 Args:
389 shape ([tupple]): shape of statistics
390 list_statistics ([list]): list of statistcs to extract
391 list_RF_affectation ([array]): array of elements affectations for serveral trees
392 list_dict_tree_statistics ([dict]): pre-computed leaves statistics for the trees
394 Returns:
395 agg_statistics ([array]): Partials aggregated statistics (for serveral tree) of elmments to forecast
396 """
397 agg_statistics = np.zeros((shape))
398 for num, tree_affectation in enumerate(list_RF_affectation):
399 tree_statistic = list_dict_tree_statistics[num]
400 tree_statistics = tree_predict(
401 shape, list_statistics, tree_affectation, tree_statistic
402 )
403 agg_statistics += np.array(tree_statistics)
405 return agg_statistics
407 def tree_predict(shape, list_statistics, tree_affectation, tree_statistic):
408 """Compute extracted statistcs of a tree for the elements to forecast.
410 Args:
411 shape ([tupple]): shape of statistics
412 list_statistics ([type]): [description]
413 tree_affectation ([type]): array of elements affectations for the tree
414 tree_statistic ([type]): pre-computed leaves statistics for the tree
416 Returns:
417 statistics ([array]): Partials statistics for (a tree) of elmments to forecast
418 """
419 leaves = list(set(tree_affectation))
420 statistics = []
421 for n, key in enumerate(list_statistics):
422 statistics.append(np.zeros(shape[1::]))
424 for num_leaf in leaves:
425 mask = tree_affectation == num_leaf
426 for n, key in enumerate(list_statistics):
427 statistics[n][mask] = tree_statistic[num_leaf][key]
428 return statistics
430 n_trees = self.estimator.n_estimators
431 # Compute Leaves affectation array
432 RF_affectation = self.estimator.apply(X)
433 prediction = self.estimator.predict(X)
434 list_statistics = self.list_statistics
436 # Define shape of the statistic array.
437 if len(self.Y_shape) == 1:
438 shape = (len(list_statistics), len(X), 1)
440 else:
441 shape = (len(list_statistics), len(X), self.Y_shape[1])
443 parallel_partition = np.array_split(range(n_trees), self.n_jobs * 2)
445 # Split inputs of auxillar parralel tree statistics extraction
446 parallel_input = []
447 for partition in parallel_partition:
448 parallel_input.append(
449 (
450 shape,
451 list_statistics,
452 [RF_affectation[:, i] for i in partition],
453 [self.dict_leaves_statistics[i] for i in partition],
454 prediction,
455 )
456 )
458 # Extract statistcs for each tree in RF in a parralel way :
460 Predicted_statistics = Parallel(n_jobs=self.n_jobs)(
461 delayed(aux_predict)(*inputs) for inputs in parallel_input
462 )
464 # Final aggregation and normalisation for each statistics
465 Predicted_statistics = np.stack(Predicted_statistics).sum(axis=0)
467 Pred = Predicted_statistics[list_statistics.index("pred")] / n_trees
468 Biais = Pred * 0
469 var_aleatoric, var_epistemic, var_aleatoric_oob = None, None, None
470 if "biais" in list_statistics:
471 Biais = Predicted_statistics[list_statistics.index("biais")] / n_trees
473 if self.type_UQ in ["var", "res_var"]:
474 var = (
475 Predicted_statistics[list_statistics.index("var")] / (n_trees)
476 ) - np.power(Pred, 2)
477 UQ = var
479 if self.type_UQ in ["res_2var", "2var"]:
480 var_bot = Predicted_statistics[list_statistics.index("var_bot")] / n_trees
482 var_top = Predicted_statistics[list_statistics.index("var_top")] / n_trees
484 UQ = np.concatenate(
485 [np.expand_dims(i, 0) for i in [var_bot, var_top]], axis=0
486 )
488 if self.type_UQ in ["res_quantile", "quantile"]:
489 Q_bot = (
490 Pred + Predicted_statistics[list_statistics.index("Q_bot")] / n_trees
491 )
492 Q_top = (
493 Pred + Predicted_statistics[list_statistics.index("Q_top")] / n_trees
494 )
496 UQ = np.concatenate([np.expand_dims(i, 0) for i in [Q_bot, Q_top]], axis=0)
498 if self.type_UQ == "var_A&E":
499 if "aleatoric" in list_statistics:
500 var_aleatoric = (
501 Predicted_statistics[list_statistics.index("aleatoric")] / n_trees
502 )
504 if "epistemic" in list_statistics:
505 var_epistemic = Predicted_statistics[
506 list_statistics.index("epistemic")
507 ] / (n_trees) - np.power(Pred, 2)
509 if "oob_aleatoric" in list_statistics:
510 var_aleatoric_oob = (
511 Predicted_statistics[list_statistics.index("oob_aleatoric")]
512 / n_trees
513 )
515 UQ = np.concatenate(
516 [np.expand_dims(i, 0) for i in [var_aleatoric, var_epistemic]], axis=0
517 )
519 return (Pred, Biais, UQ, var_aleatoric, var_epistemic, var_aleatoric_oob)
521 def _tuning(self, X, y, n_esti=100, folds=4, params=None, **kwarg):
522 """Perform random search tuning using a given grid parameter"""
523 if not (self.pretuned):
524 if not isinstance(params, type(None)):
525 X, y = self._format(X, y, "fit_transform")
526 reg = RandomForestRegressor(random_state=0)
527 score = "neg_mean_squared_error"
528 self.estimator = super()._tuning(
529 reg, X, y, n_esti, folds, score, params
530 )
533def get_params_dict(
534 estimator=None,
535 pretuned=False,
536 type_UQ="var_A&E",
537 use_biais=True,
538 rescale=True,
539 n_jobs=4,
540 beta=0.05,
541 var_min=0.00001,
542 n_estimators=125,
543 max_depth=15,
544 min_impurity_decrease=0.00001,
545 ccp_alpha=1e-05,
546 max_features=0.9,
547 max_samples=0.7,
548 random_state=None,
549 min_samples_leaf=5,
550 min_samples_split=5,
551 **kwargs,
552):
553 """Provide a dict of paramaters to build an RF_UQEstimator
554 Args:
555 estimator (_type_, optional): RandomForestRegressor with meta-parameters
556 pretuned (bool, optional): bool flag that freeze estimator. Defaults to False.
557 type_UQ (str, optional): nature of UQmeasure. Defaults to 'var'.
558 use_biais (bool, optional): use oob biais correction. Defaults to True.
559 rescale (bool, optional): use rescale procedure. Defaults to True.
560 n_jobs (int, optional): number of jobs used for parallelization purpose. Defaults to 4.
561 beta (float, optional): miss coverage targets in case of type_UQ = quantile
562 var_min (float, optional): minimal variance. Defaults to 0.00001.
563 Returns:
564 dict_parameters
565 """
567 if estimator is None:
568 estimator = RandomForestRegressor(
569 n_estimators=n_estimators,
570 max_depth=max_depth,
571 ccp_alpha=ccp_alpha,
572 max_features=max_features,
573 max_samples=max_samples,
574 min_impurity_decrease=min_impurity_decrease,
575 min_samples_leaf=min_samples_leaf,
576 min_samples_split=min_samples_split,
577 random_state=random_state,
578 **kwargs,
579 )
581 # Specification of the UQestimator & Instanciation in a UQmodels wrapper that include post-processing
582 dict_params = {
583 "estimator": estimator,
584 "pretuned": pretuned,
585 "var_min": var_min,
586 "n_jobs": n_jobs,
587 "use_biais": use_biais,
588 "type_UQ": type_UQ,
589 "rescale": rescale,
590 "beta": beta,
591 }
592 return dict_params