Coverage for tadkit / base / basedensitydetector.py: 83%
48 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:41 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-03 15:41 +0000
1from typing import Any
3import numpy as np
4import pandas as pd
5from sklearn.base import BaseEstimator, OutlierMixin
8class BaseDensityOutlierDetector(BaseEstimator, OutlierMixin):
9 """
10 Base class for density-based outlier detection.
12 Subclasses must implement:
13 - _fit_density(X)
14 - _score_density(X)
16 Accepts pandas DataFrame/Series but works internally with NumPy arrays.
17 Returns results with same index as input if input is pandas.
18 """
20 contamination: float
21 offset_: float | None
23 def __init__(self, contamination: float = 0.1):
24 """
25 Parameters
26 ----------
27 contamination : float, default=0.1
28 The proportion of outliers in the dataset. Must be in (0, 0.5).
29 """
30 if not 0.0 < contamination < 0.5:
31 raise ValueError("contamination must be between 0 and 0.5")
32 self.contamination = contamination
33 self.offset_ = None
35 def fit(self, X: Any, y=None):
36 self._is_pandas_input = isinstance(X, (pd.DataFrame, pd.Series))
37 self._input_index = X.index if self._is_pandas_input else None
39 X_np = X.values if self._is_pandas_input else np.asarray(X)
40 if X_np.ndim != 2:
41 raise ValueError("X must be 2D array or DataFrame")
43 self._fit_density(X_np)
44 scores = self._score_density(X_np)
45 self.offset_ = np.percentile(scores, 100.0 * self.contamination)
46 return self
48 def decision_function(self, X: Any) -> np.ndarray | pd.Series:
49 X_np, index = self._convert_input(X)
50 decision = self._score_density(X_np) - self.offset_
51 return pd.Series(decision, index=index) if index is not None else decision
53 def predict(self, X: Any) -> np.ndarray | pd.Series:
54 decision = self.decision_function(X)
55 labels = np.ones_like(decision, dtype=int)
56 labels[decision < 0] = -1
57 if isinstance(decision, pd.Series):
58 labels = pd.Series(labels, index=decision.index)
59 return labels
61 def score_samples(self, X: Any) -> np.ndarray | pd.Series:
62 X_np, index = self._convert_input(X)
63 scores = self._score_density(X_np)
64 return pd.Series(scores, index=index) if index is not None else scores
66 def _convert_input(self, X: Any):
67 """Helper to handle pandas input."""
68 is_pandas = isinstance(X, (pd.DataFrame, pd.Series))
69 index = X.index if is_pandas else None
70 X_np = X.values if is_pandas else np.asarray(X)
71 if X_np.ndim != 2:
72 raise ValueError("X must be 2D array or DataFrame")
73 return X_np, index
75 def _fit_density(self, X: np.ndarray):
76 raise NotImplementedError("_fit_density must be implemented in subclass")
78 def _score_density(self, X: np.ndarray) -> np.ndarray:
79 raise NotImplementedError("_score_density must be implemented in subclass")