Coverage for uqmodels/visualization/visualization.py: 39%
472 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1"""
2Visualization module.
3"""
5import matplotlib as mpl
6import matplotlib.dates as mdates
7import matplotlib.pyplot as plt
8import numpy as np
9import scipy
10from matplotlib.colors import LinearSegmentedColormap
12import uqmodels.postprocessing.UQ_processing as UQ_proc
13from uqmodels.utils import compute_born, propagate
16def provide_cmap(mode="bluetored"):
17 """Generate a bluetored or a cyantopurple cutsom cmap
19 Args:
20 mode (str, optional):Values: bluetored' or 'cyantopurple '
22 return:
23 Colormap matplotlib
24 """
25 if mode == "bluetored":
26 bluetored = [
27 [0.0, (0, 0, 90)],
28 [0.05, (5, 5, 120)],
29 [0.1, (20, 20, 150)],
30 [0.15, (20, 20, 190)],
31 [0.2, (40, 40, 220)],
32 [0.25, (70, 70, 255)],
33 [0.33, (100, 100, 255)],
34 [0.36, (180, 180, 255)],
35 [0.4, (218, 218, 255)],
36 [0.45, (245, 245, 255)],
37 [0.5, (255, 253, 253)],
38 [0.55, (255, 245, 245)],
39 [0.6, (255, 218, 218)],
40 [0.63, (255, 200, 200)],
41 [0.66, (255, 160, 160)],
42 [0.7, (255, 110, 110)],
43 [0.75, (255, 70, 70)],
44 [0.8, (230, 40, 40)],
45 [0.85, (200, 20, 20)],
46 [0.9, (180, 10, 10)],
47 [0.95, (150, 5, 5)],
48 [1.0, (130, 0, 0)],
49 ]
50 bluetored_cmap = LinearSegmentedColormap.from_list(
51 "bluetored", [np.array(i[1]) / 255 for i in bluetored], N=255
52 )
53 return bluetored_cmap
54 elif mode == "cyantopurple":
55 cyantopurple = [
56 [0.0, (25, 255, 255)],
57 [0.05, (20, 250, 250)],
58 [0.1, (20, 230, 230)],
59 [0.15, (20, 220, 220)],
60 [0.2, (15, 200, 200)],
61 [0.25, (10, 170, 170)],
62 [0.3, (10, 140, 140)],
63 [0.36, (5, 80, 80)],
64 [0.4, (5, 50, 50)],
65 [0.45, (0, 30, 30)],
66 [0.5, (0, 0, 0)],
67 [0.55, (30, 0, 30)],
68 [0.6, (59, 0, 50)],
69 [0.64, (80, 0, 80)],
70 [0.7, (140, 0, 140)],
71 [0.75, (170, 0, 170)],
72 [0.8, (200, 40, 200)],
73 [0.85, (220, 20, 220)],
74 [0.9, (240, 10, 240)],
75 [0.95, (250, 5, 250)],
76 [1.0, (255, 0, 255)],
77 ]
78 else:
79 raise NameError
81 cyantopurple_cmap = LinearSegmentedColormap.from_list(
82 "cyantopurple", [np.array(i[1]) / 255 for i in cyantopurple], N=255
83 )
84 return cyantopurple_cmap
87plt.rcParams["figure.figsize"] = [8, 8]
88plt.rcParams["font.family"] = "sans-serif"
89plt.rcParams["ytick.labelsize"] = 15
90plt.rcParams["xtick.labelsize"] = 15
91plt.rcParams["axes.labelsize"] = 15
92plt.rcParams["legend.fontsize"] = 15
95def dim_1d_check(y):
96 """Reshape (n,1) 2D array to (n) 1D array else do nothing"""
97 if not isinstance(y, type(None)):
98 if not isinstance(y, tuple):
99 if len(y.shape) == 2:
100 if y.shape[1] == 1:
101 return y.reshape(-1)
102 else:
103 return (dim_1d_check(y[0]), dim_1d_check(y[1]))
104 return y
107def aux_plot_pred(ax, x, y, pred):
108 ax.plot(x, y, ls="dotted", color="black", linewidth=0.9, alpha=1)
109 ax.plot(
110 x,
111 pred,
112 "-",
113 color="darkgreen",
114 alpha=1,
115 linewidth=0.7,
116 zorder=-4,
117 label="Prediction",
118 )
120 ax.scatter(x, y, c="black", s=10, marker="x", linewidth=1, label="Observation")
123def aux_plot_anom(ax, x, y):
124 ax.scatter(
125 x, y, linewidth=1, marker="x", c="magenta", s=25, label='"Abnormal" real demand'
126 )
129def aux_plot_PIs(
130 ax,
131 x,
132 list_PIs,
133 list_alpha_PIs,
134 list_colors_PIs=None,
135 list_alpha_fig_PIs=None,
136 list_label_PIs=None,
137):
138 """Plot PIs enveloppe on ax suplot
140 Args:
141 ax (_type_): ax suplot
142 x (_type_): x_scale
143 list_PIs (_type_): list of PIs ordered from minumum to maximum: ex [PI_low_1,PI_low_2,PI_high_2,PI_high_1]
144 list_alpha (_type_): List of alpha of PIs
145 list_colors_PIs (list, optional): List of color by pair. Defaults to None -> use grey.
146 list_alpha_PIs (_type_, optional): List of color by pair. Defaults to None -> 0.2 as transparancy
147 """
148 n_couple = int(len(list_PIs) / 2)
150 for i in range(n_couple):
151 color = None
152 if list_colors_PIs is not None:
153 color = list_colors_PIs[i]
154 else:
155 color = None
157 if list_label_PIs is None:
158 label = (
159 "Predictive interval: "
160 + str(list_alpha_PIs[n_couple - i] - list_alpha_PIs[i])
161 + "%"
162 )
163 else:
164 label = list_label_PIs[i]
166 alpha = 0.1
167 if list_alpha_fig_PIs is not None:
168 alpha = list_alpha_fig_PIs[i]
170 ax.plot(x, list_PIs[i], color=color, ls="dotted", lw=1.2)
171 ax.plot(x, list_PIs[n_couple - i], color=color, ls="dotted", lw=1.2)
172 ax.fill_between(
173 x,
174 list_PIs[i],
175 list_PIs[n_couple - i],
176 color=list_colors_PIs[i],
177 alpha=alpha,
178 label=label,
179 )
182def aux_plot_conf_score(ax, x, pred, confidence_lvl, label, mode_res=False):
183 if mode_res:
184 pred = pred - pred
186 for i in range(0, int(1 + confidence_lvl.max())):
187 mask = i == confidence_lvl
188 ax.scatter(
189 x[mask],
190 pred[mask],
191 c=confidence_lvl[mask],
192 marker="D",
193 s=14,
194 edgecolors="black",
195 linewidth=0.2,
196 cmap=plt.get_cmap("RdYlGn_r", len(label) + 1),
197 vmin=0,
198 vmax=int(confidence_lvl.max()),
199 label=label[i],
200 zorder=10 + i,
201 )
204def plot_pi(
205 y,
206 y_pred,
207 y_pred_lower,
208 y_pred_upper,
209 mode_res=False,
210 f_obs=None,
211 X=None,
212 size=(12, 2),
213 name=None,
214 show_plot=True,
215):
216 y = dim_1d_check(y)
217 y_pred = dim_1d_check(y_pred)
218 y_pred_upper = dim_1d_check(y_pred_upper)
219 y_pred_lower = dim_1d_check(y_pred_lower)
221 if isinstance(f_obs, type(None)):
222 f_obs = np.arange(len(y))
224 plt.figure(figsize=(size[0], size[1]))
226 if not isinstance(y_pred, type(None)):
227 if mode_res:
228 y = y - y_pred
229 y_pred_upper = y_pred_upper - y_pred
230 y_pred_lower = y_pred_lower - y_pred
231 y_pred = y_pred - y_pred
233 plt.plot(y_pred[f_obs], "black", label="Prediction")
235 if name is not None:
236 plt.title(name)
238 plt.plot(
239 y[f_obs],
240 "darkgreen",
241 marker="X",
242 markersize=2,
243 linewidth=0,
244 label="Observation (inside PI)",
245 zorder=20,
246 )
248 plt.xlabel("X")
249 plt.ylabel("Y")
251 if not isinstance(y_pred_upper, type(None)):
252 anom = (y[f_obs] > y_pred_upper[f_obs]) | (y[f_obs] < y_pred_lower[f_obs])
254 plt.plot(
255 np.arange(len(f_obs))[anom],
256 y[f_obs][anom],
257 color="red",
258 marker="o",
259 markersize=2,
260 linewidth=0,
261 label="Observation (outside PI)",
262 zorder=20,
263 )
265 plt.plot(y_pred_upper[f_obs], "--", color="blue", linewidth=1, alpha=0.7)
266 plt.plot(y_pred_lower[f_obs], "--", color="blue", linewidth=1, alpha=0.7)
267 plt.fill_between(
268 x=np.arange(len(f_obs)),
269 y1=y_pred_upper[f_obs],
270 y2=y_pred_lower[f_obs],
271 alpha=0.2,
272 fc="b",
273 ec="None",
274 label="Prediction Interval",
275 )
276 plt.legend(loc="best")
277 if show_plot:
278 plt.show()
281def plot_prediction_interval(
282 y: np.array,
283 y_pred_lower: np.array,
284 y_pred_upper: np.array,
285 X: np.array = None,
286 y_pred: np.array = None,
287 save_path: str = None,
288 sort_X: bool = False,
289 **kwargs,
290) -> None:
291 """Plot prediction intervals whose bounds are given by y_pred_lower and y_pred_upper.
292 True values and point estimates are also plotted if given as argument.
294 Args:
295 y: label true values.
296 y_pred_lower: lower bounds of the prediction interval.
297 y_pred_upper: upper bounds of the prediction interval.
298 X <optionnal>: abscisse vector.
299 y_pred <optionnal>: predicted values.
300 kwargs: plot parameters.
301 """
303 # Figure configuration
304 if "figsize" in kwargs.keys():
305 figsize = kwargs["figsize"]
306 else:
307 figsize = (15, 6)
308 if "loc" not in kwargs.keys():
309 loc = kwargs["loc"]
310 else:
311 loc = "upper left"
312 plt.figure(figsize=figsize)
314 plt.rcParams["font.family"] = "Times New Roman"
315 plt.rcParams["ytick.labelsize"] = 15
316 plt.rcParams["xtick.labelsize"] = 15
317 plt.rcParams["axes.labelsize"] = 15
318 plt.rcParams["legend.fontsize"] = 16
320 if X is None:
321 X = np.arange(len(y))
322 elif sort_X:
323 sorted_idx = np.argsort(X)
324 X = X[sorted_idx]
325 y = y[sorted_idx]
326 y_pred = y_pred[sorted_idx]
327 y_pred_lower = y_pred_lower[sorted_idx]
328 y_pred_upper = y_pred_upper[sorted_idx]
330 if y_pred_upper is None or y_pred_lower is None:
331 miscoverage = np.array([False for _ in range(len(y))])
332 else:
333 miscoverage = (y > y_pred_upper) | (y < y_pred_lower)
335 label = "Observation" if y_pred_upper is None else "Observation (inside PI)"
336 plt.plot(
337 X[~miscoverage],
338 y[~miscoverage],
339 "darkgreen",
340 marker="X",
341 markersize=2,
342 linewidth=0,
343 label=label,
344 zorder=20,
345 )
347 label = "Observation" if y_pred_upper is None else "Observation (outside PI)"
348 plt.plot(
349 X[miscoverage],
350 y[miscoverage],
351 σ="red",
352 marker="o",
353 markersize=2,
354 linewidth=0,
355 label=label,
356 zorder=20,
357 )
358 if y_pred_upper is not None and y_pred_lower is not None:
359 plt.plot(X, y_pred_upper, "--", color="blue", linewidth=1, alpha=0.7)
360 plt.plot(X, y_pred_lower, "--", color="blue", linewidth=1, alpha=0.7)
361 plt.fill_between(
362 x=X,
363 y1=y_pred_upper,
364 y2=y_pred_lower,
365 alpha=0.2,
366 fc="b",
367 ec="None",
368 label="Prediction Interval",
369 )
371 if y_pred is not None:
372 plt.plot(X, y_pred, color="k", label="Prediction")
374 plt.xlabel("X")
375 plt.ylabel("Y")
377 if "loc" not in kwargs.keys():
378 loc = "upper left"
379 else:
380 loc = kwargs["loc"]
382 plt.legend(loc=loc)
383 if save_path:
384 plt.savefig(f"{save_path}", format="pdf")
385 else:
386 plt.show()
389def plot_sorted_pi(
390 y: np.array,
391 y_pred_lower: np.array,
392 y_pred_upper: np.array,
393 X: np.array = None,
394 y_pred: np.array = None,
395 **kwargs,
396) -> None:
397 """Plot prediction intervals in an ordered fashion (lowest to largest width),
398 showing the upper and lower bounds for each prediction.
399 Args:
400 y: label true values.
401 y_pred_lower: lower bounds of the prediction interval.
402 y_pred_upper: upper bounds of the prediction interval.
403 X <optionnal>: abscisse vector.
404 y_pred <optionnal>: predicted values.
405 kwargs: plot parameters.
406 """
408 if y_pred is None:
409 y_pred = (y_pred_upper + y_pred_lower) / 2
411 width = np.abs(y_pred_upper - y_pred_lower)
412 sorted_order = np.argsort(width)
414 # Figure configuration
415 if "figsize" in kwargs.keys():
416 figsize = kwargs["figsize"]
417 else:
418 figsize = (15, 6)
419 if "loc" not in kwargs.keys():
420 kwargs["loc"]
421 else:
422 pass
423 plt.figure(figsize=figsize)
425 if X is None:
426 X = np.arange(len(y_pred_lower))
428 # True values
429 plt.plot(
430 X,
431 y_pred[sorted_order] - y_pred[sorted_order],
432 color="black",
433 markersize=2,
434 zorder=20,
435 label="Prediction",
436 )
438 misscoverage = (y > y_pred_upper) | (y < y_pred_lower)
439 misscoverage = misscoverage[sorted_order]
441 # True values
442 plt.plot(
443 X[~misscoverage],
444 y[sorted_order][~misscoverage] - y_pred[sorted_order][~misscoverage],
445 color="darkgreen",
446 marker="o",
447 markersize=2,
448 linewidth=0,
449 zorder=20,
450 label="Observation (inside PI)",
451 )
453 plt.plot(
454 X[misscoverage],
455 y[sorted_order][misscoverage] - y_pred[sorted_order][misscoverage],
456 color="red",
457 marker="o",
458 markersize=2,
459 linewidth=0,
460 zorder=20,
461 label="Observation (outside PI)",
462 )
464 # PI Lower bound
465 plt.plot(
466 X,
467 y_pred_lower[sorted_order] - y_pred[sorted_order],
468 "--",
469 label="Prediction Interval Bounds",
470 color="blue",
471 linewidth=1,
472 alpha=0.7,
473 )
475 # PI upper bound
476 plt.plot(
477 X,
478 y_pred_upper[sorted_order] - y_pred[sorted_order],
479 "--",
480 color="blue",
481 linewidth=1,
482 alpha=0.7,
483 )
485 plt.legend()
487 plt.show()
490def visu_latent_space(grid_dim, embedding, f_obs, context_grid, context_grid_name=None):
491 fig = plt.figure(figsize=(15, 7))
492 for i in range(grid_dim[0]):
493 for j in range(grid_dim[1]):
494 ax = fig.add_subplot(
495 grid_dim[0], grid_dim[1], i * grid_dim[1] + j + 1, projection="3d"
496 )
497 if context_grid_name is not None:
498 plt.title(context_grid_name[i][j])
499 ax.scatter(
500 embedding[f_obs, 0],
501 embedding[f_obs, 1],
502 embedding[f_obs, 2],
503 c=context_grid[i][j][f_obs],
504 cmap=plt.get_cmap("jet"),
505 s=1,
506 )
509def aux_fill_area(context, **kwargs):
510 if "list_name_subset" in kwargs.keys():
511 list_name_subset = kwargs["list_name_subset"]
514def uncertainty_plot(
515 y,
516 output,
517 context=None,
518 size=(15, 5),
519 f_obs=None,
520 name="UQplot",
521 mode_res=False,
522 born=None,
523 born_bis=None,
524 dim=0,
525 confidence_lvl=None,
526 list_percent=[0.8, 0.9, 0.99, 0.999, 1],
527 env=[0.95, 0.65],
528 type_UQ="old",
529 show_plot=True,
530 with_colorbar=False,
531 **kwarg,
532):
533 if f_obs is None:
534 f_obs = np.arange(len(y))
536 ind_ctx = None
537 if "ind_ctx" in kwarg.keys():
538 ind_ctx = kwarg["ind_ctx"]
540 split_ctx = -1
541 if "split_ctx" in kwarg.keys():
542 split_ctx = kwarg["split_ctx"]
544 ylim = None
545 if "ylim" in kwarg.keys():
546 ylim = kwarg["ylim"]
548 if "compare_deg" in kwarg.keys():
549 kwarg["compare_deg"]
550 else:
551 pass
553 min_A, min_E = 0.000001, 0.000001
554 if "var_min" in kwarg.keys():
555 min_A, min_E = kwarg["var_min"]
557 if output is not None:
558 if type_UQ == "old":
559 pred, var_A, var_E = output
561 elif type_UQ == "var_A&E":
562 pred, (var_A, var_E) = output
564 var_E[var_E < min_E] = min_E
565 var_A[var_A < min_A] = min_A
567 # Post-processig PIs naifs:
568 # Post-processing epistemics naifs.
570 var_E[var_E < min_E] = min_E
571 var_A[var_A < min_A] = min_A
573 only_data = False
574 if "list_name_subset" in kwarg.keys():
575 list_name_subset = kwarg["list_name_subset"]
577 if "only_data" in kwarg.keys():
578 only_data = kwarg["only_data"]
579 if only_data:
580 name = "Data"
582 f_obs_full = np.copy(f_obs)
583 n_ctx = 1
584 if isinstance(dim, int):
585 dim = [dim]
587 if split_ctx > -1:
588 if ind_ctx is None:
589 list_ctx_ = list(set(context[f_obs, split_ctx]))
590 else:
591 list_ctx_ = ind_ctx
592 n_ctx = len(list_ctx_)
594 fig, axs = plt.subplots(len(dim), n_ctx, sharex=True, figsize=size)
595 label = None
597 for n, d in enumerate(dim):
598 for n_fig in range(n_ctx):
599 ax = axs
600 if len(dim) > 1:
601 ax = ax[n]
602 if n_ctx > 1:
603 ax = ax[n_fig]
605 if split_ctx > -1:
606 f_obs = f_obs_full[context[f_obs_full, split_ctx] == list_ctx_[n_fig]]
607 if only_data:
608 ax.scatter(
609 f_obs,
610 y[f_obs, d],
611 c="black",
612 s=10,
613 marker="x",
614 linewidth=1,
615 label="observation",
616 )
618 ax.plot(
619 f_obs,
620 y[f_obs, d],
621 ls=":",
622 color="darkgreen",
623 alpha=1,
624 linewidth=0.7,
625 zorder=-4,
626 )
627 if ylim is not None:
628 ax.set_ylim(ylim[0], ylim[1])
629 else:
630 (y.min(), y.max())
632 else:
633 born_ = None
634 if born:
635 born_ = born[0][f_obs, d], born[1][f_obs, d]
637 born_bis_ = None
638 if born_bis:
639 born_bis_ = born_bis[0][f_obs, d], born_bis[1][f_obs, d]
641 x = np.arange(len(y))
642 if "x" in kwarg.keys():
643 x = kwarg["x"]
645 if confidence_lvl is None:
646 confidence_lvl, params_ = UQ_proc.compute_Epistemic_score(
647 (var_A, var_E),
648 type_UQ="var_A&E",
649 pred=pred,
650 list_percent=list_percent,
651 params_=None,
652 )
654 aux_plot_confiance(
655 ax=ax,
656 y=y[f_obs, d],
657 pred=pred[f_obs, d],
658 var_A=var_A[f_obs, d],
659 var_E=var_E[f_obs, d],
660 born=born_,
661 born_bis=born_bis_,
662 env=env,
663 x=x[f_obs],
664 mode_res=mode_res,
665 **kwarg,
666 )
668 label = [str(i) for i in list_percent]
669 label.append(">1")
671 aux_plot_conf_score(
672 ax,
673 x[f_obs],
674 pred[f_obs, d],
675 confidence_lvl[f_obs, d],
676 label=label,
677 mode_res=mode_res,
678 )
680 if "ctx_attack" in kwarg.keys():
681 y[f_obs, d]
682 if ylim is None:
683 ylim = (y.min(), y.max())
684 dim_ctx, ctx_val = kwarg["ctx_attack"]
685 if ctx_val == -1:
686 list_ctx = list(set(context[f_obs, dim_ctx]))
687 color = plt.get_cmap("jet", len(list_name_subset))
688 list_color = [color(i) for i in range(3)]
689 for n, i in enumerate(list_ctx):
690 ax.fill_between(
691 f_obs,
692 ylim[0],
693 ylim[1],
694 where=context[f_obs, dim_ctx] == i,
695 color=list_color[int(i)],
696 alpha=0.2,
697 )
698 else:
699 ax.fill_between(
700 f_obs,
701 y.min(),
702 y.max(),
703 where=context[f_obs, dim_ctx] == 1,
704 color="yellow",
705 alpha=0.2,
706 )
708 if n_fig != 0:
709 ax.set_yticklabels([])
711 if "ctx_attack" in kwarg.keys():
712 color = plt.get_cmap("jet", len(list_name_subset))
713 for n, i in enumerate(range(len(list_name_subset))):
714 ax.fill_between(
715 [],
716 ylim[0],
717 ylim[1],
718 where=[],
719 label=list_name_subset[int(i)],
720 color=color(i),
721 alpha=0.08,
722 )
723 plt.suptitle(name)
724 plt.subplots_adjust(
725 wspace=0.03, hspace=0.03, left=0.1, bottom=0.22, right=0.90, top=0.8
726 )
727 plt.legend(frameon=True, ncol=6, fontsize=10, bbox_to_anchor=(0.5, 0, 0.38, -0.11))
728 # plt.xlim(0, 8400)
730 if label is not None:
731 cmap = [plt.get_cmap("RdYlGn_r", 7)(i) for i in np.arange(len(label))]
733 list_percent
734 bounds = np.concatenate(
735 [[0], np.cumsum(np.abs(np.array(list_percent) - 1) + 0.1)]
736 )
737 bounds = 10 * bounds / bounds.max()
739 if with_colorbar:
740 cmap = mpl.colors.ListedColormap(cmap)
741 norm = mpl.colors.BoundaryNorm(bounds, cmap.N)
742 color_ls = mpl.cm.ScalarMappable(norm=norm, cmap=cmap)
743 cbar1 = plt.colorbar(
744 color_ls,
745 pad=0.20,
746 fraction=0.10,
747 shrink=0.5,
748 anchor=(0.2, 0.0),
749 orientation="horizontal",
750 spacing="proportional",
751 )
752 cbar1.set_label("Confidence_lvl", fontsize=14)
754 ticks = (bounds + np.roll(bounds, -1)) / 2
755 ticks[-1] = 10
757 cbar1.set_ticks(ticks)
758 cbar1.set_ticklabels(label, fontsize=12)
759 plt.tight_layout()
760 if show_plot:
761 plt.show()
762 return
765def aux_plot_confiance(
766 ax,
767 y,
768 pred,
769 var_A,
770 var_E,
771 born=None,
772 born_bis=None,
773 ylim=None,
774 split_values=-1,
775 x=None,
776 mode_res=False,
777 min_A=0.08,
778 min_E=0.02,
779 env=[0.95, 0.68],
780 **kwarg,
781):
782 if x is None:
783 x = np.arange(len(y))
785 if mode_res:
786 y = y - pred
787 if born is not None:
788 born = born[0] - pred, born[1] - pred
789 pred = pred * 0
791 y_lower_A, y_upper_A = compute_born(pred, np.sqrt(var_A), 0.045)
792 y_lower_E, y_upper_E = compute_born(pred, np.sqrt(var_E / (var_A + var_E)), 0.045)
793 y_lower, y_upper = compute_born(pred, np.sqrt(var_A + var_E * 0), 0.045)
794 y_lower_N, y_upper_N = compute_born(pred, np.sqrt(var_A + var_E), 0.045)
796 ind_A = np.sqrt(var_A)
797 ind_E = np.sqrt(var_E)
798 ind_E[ind_E < min_E] = min_E
799 ind_A[ind_A < min_A] = min_A
801 if born:
802 anom_score = (y < born[0]) | (y > born[1])
803 flag_anom = anom_score > 0
805 else:
806 anom_score = (np.abs(y - pred) + 0 * ind_E) / (
807 2 * np.sqrt(np.power(ind_E, 2) + np.power(ind_A, 2))
808 )
809 flag_anom = anom_score > 1
811 aux_plot_pred(ax, x, y, pred)
812 aux_plot_anom(ax, x[flag_anom], y[flag_anom])
814 if born:
815 aux_plot_PIs(
816 ax,
817 x,
818 [born[0], born[1]],
819 list_alpha_PIs=[0.025, 0.975],
820 list_colors_PIs=["green"],
821 list_label_PIs=["Normal_limit"],
822 )
823 ax.fill_between(
824 x,
825 born[1],
826 y,
827 where=propagate(y > born[1], 0, sym=True),
828 facecolor="red",
829 alpha=0.8,
830 label="Anomaly",
831 interpolate=True,
832 zorder=-10,
833 )
835 ax.fill_between(
836 x,
837 born[0],
838 y,
839 where=propagate(y < born[0], 0),
840 interpolate=True,
841 facecolor="red",
842 alpha=0.8,
843 zorder=-10,
844 )
846 else:
847 aux_plot_PIs(
848 ax,
849 x,
850 [y_lower, y_upper],
851 list_alpha_PIs=[0.025, 0.975],
852 list_colors_PIs=["green"],
853 list_label_PIs=["2σAleatoric PIs (95%)"],
854 list_alpha_fig_PIs=[0.2,0.2],
855 )
857 aux_plot_PIs(
858 ax,
859 x,
860 [y_lower_N, y_upper_N],
861 list_alpha_PIs=[0.16, 0.84],
862 list_colors_PIs=["darkblue"],
863 list_alpha_fig_PIs=[0.1,0.1],
864 list_label_PIs=["2σTotal PIs(95%)"],
865 )
867 if born_bis:
868 aux_plot_PIs(
869 ax,
870 x,
871 [born_bis[0], born_bis[1]],
872 list_alpha_PIs=[0.025, 0.975],
873 list_colors_PIs=["teal"],
874 list_alpha_fig_PIs=[0.1,0.1],
875 list_label_PIs=["Normal_limit"],
876 )
878 if False:
879 aux_plot_PIs(
880 ax,
881 x,
882 [y_lower_A, y_upper_A],
883 list_alpha_PI=[0.025, 0.975],
884 list_colors_PIs=["blue"],
885 list_alpha_fig_PIs=[0.2,0.2],
886 list_label_PIs=["Var_A"],
887 )
889 aux_plot_PIs(
890 ax,
891 x,
892 [y_lower_E, y_upper_E],
893 [0.025, 0.975],
894 list_colors_PIs=["red"],
895 list_alpha_PIs=None,
896 list_label_PIs=["Var_E"],
897 )
899 if ylim is None:
900 ylim_ = min(y.min(), y_lower.min()), max(y.max(), y_upper.max())
901 ylim_ = ylim_[0] + np.abs(ylim_[0] * 0.05), ylim_[1] - np.abs(ylim_[1] * 0.05)
902 else:
903 ylim_ = ylim[0], ylim[1]
905 ax.set_ylim(ylim_[0], ylim_[1])
906 ax.set_xlim(-0.5 + x.min(), x.max() + 0.5)
907 # ax.legend(ncol=7)
910def plot_anom_matrice(
911 score,
912 score2=None,
913 f_obs=None,
914 true_label=None,
915 data=None,
916 x=None,
917 vmin=-3,
918 vmax=3,
919 cmap=None,
920 list_anom_ind=None,
921 figsize=(15, 6),
922 grid_spec=None,
923 x_date=False,
924 show_plot=True,
925 setup=None
926):
927 """Plot score_anomalie matrice and true label if there is.
928 Args:
929 score (_type_): Anomaly score matrice or list of Anomaly matrix
930 f_obs (_type_, optional): mask_focus
931 true_label (_type_, optional): True label or None
932 vmin (int, optional): _description_. Defaults to -3.
933 vmax (int, optional): _description_. Defaults to 3.
934 cmap (_type_, optional): _description_. Defaults to None.
935 figsize (tuple, optional): _description_. Defaults to (15, 6).
936 """
937 if isinstance(score, list):
938 len_score = len(score[0])
939 dim_score = [score_.shape[-1] for score_ in score]
940 n_score = len(score)
941 else:
942 score = [score]
943 len_score = len(score)
944 dim_score = [score[0].shape[-1]]
945 n_score = 1
947 if f_obs is None:
948 f_obs = np.arange(len_score)
950 if cmap is None:
951 cmap = provide_cmap("bluetored")
953 x_flag = True
954 if x is None:
955 x_flag = False
956 x = np.arange(len_score)
957 list_extend = [None for dim_score_ in dim_score]
959 else:
960 d0 = mdates.date2num(x[f_obs][0])
961 d1 = mdates.date2num(x[f_obs][-1])
962 list_extend = [[d0, d1, 0, dim_score_] for dim_score_ in dim_score]
964 n_fig = 3 + n_score
965 if true_label is None:
966 n_fig -= 1
968 if score2 is None:
969 n_fig -= 1
971 if data is None:
972 n_fig -= 1
974 sharey = False
975 if (
976 (n_score == 1)
977 and (true_label is not None)
978 and (dim_score[0] == true_label.shape)
979 ):
980 sharey = True
982 if grid_spec is None:
983 grid_spec = np.ones(n_fig)
985 fig, ax = plt.subplots(
986 n_fig,
987 1,
988 sharex=True,
989 sharey=sharey,
990 gridspec_kw={"height_ratios": grid_spec},
991 figsize=figsize,
992 )
994 if n_fig == 1:
995 ax.set_title("score")
997 ax.imshow(
998 score[0][f_obs].T[::-1],
999 cmap=provide_cmap("bluetored"),
1000 vmin=vmin,
1001 vmax=vmax,
1002 aspect="auto",
1003 extent=list_extend[0],
1004 interpolation=None,
1005 )
1007 if(setup) is not None:
1008 (n_chan,n_sensor) = setup
1010 for i in range(n_chan* n_sensor):
1011 a0.hlines(i, 0, len(f_obs), color="grey", lw=0.5)
1013 for i in range(n_sensor):
1014 a0.hlines(i * n_chan,0, len(f_obs), color="black", lw=1)
1017 else:
1018 ind_ax = -1
1019 for n, score_ in enumerate(score):
1020 ind_ax += 1
1021 ax[ind_ax].set_title("score")
1022 ax[ind_ax].imshow(
1023 score_[f_obs].T[::-1],
1024 cmap=provide_cmap("bluetored"),
1025 vmin=vmin,
1026 vmax=vmax,
1027 aspect="auto",
1028 extent=list_extend[n],
1029 interpolation="None",
1030 )
1032 if score2 is not None:
1033 ind_ax += 1
1034 ax[ind_ax].set_title("score")
1035 ax[ind_ax].imshow(
1036 score2[f_obs].T[::-1],
1037 cmap=provide_cmap("bluetored"),
1038 vmin=vmin,
1039 vmax=vmax,
1040 aspect="auto",
1041 extent=list_extend[0],
1042 interpolation="None",
1043 )
1045 if true_label is not None:
1046 ind_ax += 1
1047 ax[ind_ax].set_title("True_anom")
1048 ax[ind_ax].imshow(
1049 true_label[f_obs].T[::-1],
1050 cmap="Reds",
1051 aspect="auto",
1052 extent=list_extend[0],
1053 interpolation="None",
1054 )
1056 if data is not None:
1057 ind_ax += 1
1058 ax[ind_ax].set_title("data")
1059 colors = []
1060 for i in range(data.shape[1]):
1061 colors.append(plt.get_cmap("Greens", data.shape[1])(i))
1062 if list_anom_ind is not None:
1063 for n, anom_ind in enumerate(list_anom_ind):
1064 colors[anom_ind] = plt.get_cmap("Reds", len(list_anom_ind) + 4)(
1065 n + 4
1066 )
1068 for i in range(dim_score[0]):
1069 ax[ind_ax].plot(x[f_obs], data[f_obs, i], color=colors[i], lw=0.9)
1071 for i in range(dim_score[0]):
1072 mask = np.abs(score[0])[f_obs, i] > 1
1073 ax[ind_ax].scatter(
1074 x[f_obs][mask],
1075 data[f_obs, i][mask],
1076 color="red",
1077 marker="x",
1078 s=1,
1079 zorder=10,
1080 )
1082 if (true_label is not None) & (data is not None):
1083 for i in range(data.shape[1]):
1084 mask = true_label[f_obs, i] > 0
1085 ax[ind_ax].scatter(x[f_obs][mask], data[f_obs, i][mask], color="purple")
1086 if x_flag:
1087 ax[ind_ax].xaxis_date()
1088 if x_date:
1089 ax[ind_ax].xaxis.set_major_formatter(mdates.DateFormatter("%d/%m %H:%M"))
1091 fig.tight_layout()
1092 if show_plot:
1093 plt.show()
1096# Display of data curve with mean and variance.
1097def plot_var(
1098 Y,
1099 data_full,
1100 variance,
1101 impact_anom=None,
1102 anom=None,
1103 f_obs=None,
1104 dim=(400, 20, 3),
1105 g=0,
1106 res_flag=False,
1107 fig_s=(20, 3),
1108 title=None,
1109 ylim=None,
1110):
1111 def add_noise(data, noise_mult, noise_add):
1112 signal = (data * (1 + noise_mult)) + noise_add
1113 return signal
1115 dim_n, dim_t, dim_g = dim
1116 anom_pred = (np.abs(impact_anom).sum(axis=-1) > 0).astype(int) - (anom > 0).astype(
1117 int
1118 )
1120 if anom_pred.sum() < 1:
1121 anom_pred[0] = 1
1122 anom_pred[-1] = 1
1124 step = g
1125 plt.figure(figsize=fig_s)
1126 plt.title(title)
1127 # norm = Y.mean(axis=0)
1128 if anom_pred.sum() < 1:
1129 anom_pred[0] = 1
1130 anom_pred[-1] = 1
1132 ni = [100, 98, 95, 80, 50]
1133 color_full = [
1134 (0.5, 0.0, 0.5),
1135 (0.8, 0, 0),
1136 (0.8, 0.6, 0),
1137 (0, 0.8, 0),
1138 (0, 0.4, 0),
1139 (0, 0.8, 0),
1140 (0.8, 0.6, 0),
1141 (0.8, 0, 0),
1142 (0.5, 0.0, 0.5),
1143 ]
1144 color_full2 = [
1145 (0.5, 0.0, 0.5),
1146 (0.8, 0, 0),
1147 (0.8, 0.6, 0),
1148 (0, 0.8, 0),
1149 (0, 0.4, 0),
1150 (0, 0.4, 0),
1151 (0, 0.8, 0),
1152 (0.8, 0.6, 0),
1153 (0.8, 0, 0),
1154 (0.5, 0.0, 0.5),
1155 ]
1157 per_list = [0.01, 1, 2.5, 10, 25, 75, 90, 97.5, 99, 99.99]
1158 per = []
1160 res = data_full * 0
1161 if res_flag:
1162 res = data_full
1164 for i in per_list:
1165 per.append(
1166 add_noise(
1167 data_full - res,
1168 0,
1169 scipy.stats.norm.ppf((i / 100), 0, np.sqrt(variance)),
1170 ),
1171 )
1173 for i in range(len(per) - 1):
1174 plt.fill_between(
1175 np.arange(len(f_obs)),
1176 per[i][f_obs, step],
1177 per[i + 1][f_obs, step],
1178 color=color_full[i],
1179 alpha=0.20,
1180 )
1181 for i in range(len(per)):
1182 plt.plot(
1183 np.arange(len(f_obs)),
1184 per[i][f_obs, step],
1185 color=color_full2[i],
1186 linewidth=0.5,
1187 alpha=0.40,
1188 )
1189 if i > 4:
1190 plt.fill_between(
1191 [],
1192 [],
1193 [],
1194 color=color_full2[i],
1195 label=str(ni[9 - i]) + "% Coverage",
1196 alpha=0.20,
1197 )
1199 plt.plot(
1200 Y[f_obs, step] - res[f_obs, step],
1201 label="Series",
1202 color="black",
1203 linewidth=1.5,
1204 marker="o",
1205 ms=3,
1206 )
1207 flag = impact_anom[f_obs, step] != 0
1209 plt.plot(
1210 np.arange(len(f_obs))[flag],
1211 Y[f_obs, step][flag] - res[f_obs, step][flag],
1212 label="Anom",
1213 color="red",
1214 ls="",
1215 marker="X",
1216 ms=10,
1217 alpha=0.8,
1218 )
1220 if False:
1221 f_anom = np.repeat(np.abs(impact_anom)[f_obs, step] > 0, 7)
1222 for i in range(8):
1223 f_anom += np.roll(f_anom, i - 4)
1224 f_anom = f_anom > 0
1225 plt.fill_between(
1226 np.arange(len(f_obs) * 7) / 7 - (3 / 7),
1227 -1000,
1228 Y[f_obs, step].max() * 1.2,
1229 where=f_anom,
1230 facecolor="blue",
1231 alpha=0.2,
1232 label="Anomaly",
1233 zorder=-10,
1234 )
1236 if ylim is None:
1237 plt.ylim(per[0][f_obs, step].min() * 1.05, per[-1][f_obs, step].max() * 1.05)
1238 else:
1239 plt.ylim(ylim[0], ylim[1])
1240 plt.legend(ncol=7, fontsize=14)
1241 plt.xlim(0, len(f_obs))
1242 plt.tight_layout()
1243 plt.show()
1244 return (per, per_list)
1247def show_dUQ_refinement(
1248 UQ,
1249 y=None,
1250 d=0,
1251 f_obs=None,
1252 max_cut_A=0.99,
1253 q_Eratio=2,
1254 E_cut_in_var_nominal=False,
1255 A_res_in_var_atypic=False,
1256):
1257 if isinstance(UQ, tuple):
1258 UQ = np.array(UQ)
1260 if f_obs is None:
1261 f_obs = np.arange(UQ.shape[1])
1263 var_A, var_E = UQ
1264 extremum_var_TOT, ndUQ_ratio = UQ_proc.get_extremum_var_TOT_and_ndUQ_ratio(
1265 UQ,
1266 min_cut=0,
1267 max_cut=max_cut_A,
1268 var_min=0,
1269 var_max=None,
1270 factor=1,
1271 q_var=1,
1272 q_Eratio=q_Eratio,
1273 mode_multidim=True,
1274 E_cut_in_var_nominal=E_cut_in_var_nominal,
1275 A_res_in_var_atypic=A_res_in_var_atypic,
1276 )
1278 var_A_cut, var_E_res = UQ_proc.split_var_dUQ(
1279 UQ,
1280 q_var=1,
1281 q_var_e=1,
1282 ndUQ_ratio=ndUQ_ratio,
1283 E_cut_in_var_nominal=E_cut_in_var_nominal,
1284 A_res_in_var_atypic=A_res_in_var_atypic,
1285 extremum_var_TOT=extremum_var_TOT,
1286 )
1288 var_A_res = var_A - var_A_cut
1289 var_E_cut = var_E - var_E_res
1291 val = 0
1292 if y is not None:
1293 val = 1
1295 fig, ax = plt.subplots(3 + val, 1, sharex=True, figsize=(20, 5))
1296 if val == 1:
1297 ax[0].plot(y[f_obs, d : d + 1], label="true_val")
1298 ax[0 + val].plot(var_A[f_obs, d : d + 1], label="row_var_A")
1299 ax[0 + val].plot(var_A_cut[f_obs, d : d + 1], label="refined_var_A")
1300 ax[0 + val].legend()
1301 ax[1 + val].plot(var_E[f_obs, d : d + 1], label="row_var_E")
1302 ax[1 + val].plot(var_E_res[f_obs, d : d + 1], label="refined_var_E")
1303 ax[1 + val].legend()
1304 ratio = var_E[f_obs, d : d + 1] / var_A[f_obs, d : d + 1]
1305 ax[2 + val].plot(ratio / ratio.std(), label="row_ratio")
1306 refined_ratio = (var_A_res[f_obs, d : d + 1] + var_E_res[f_obs, d : d + 1]) / (
1307 var_A_cut[f_obs, d : d + 1] + var_E_cut[f_obs, d : d + 1]
1308 )
1309 ax[2 + val].plot(refined_ratio / refined_ratio.std(), label="refined_ratio")
1310 ax[2 + val].legend()
1311 print("yaya")
1312 return (fig, ax)