Coverage for tadkit / base / basedensitydetector.py: 83%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-03 15:41 +0000

1from typing import Any 

2 

3import numpy as np 

4import pandas as pd 

5from sklearn.base import BaseEstimator, OutlierMixin 

6 

7 

8class BaseDensityOutlierDetector(BaseEstimator, OutlierMixin): 

9 """ 

10 Base class for density-based outlier detection. 

11 

12 Subclasses must implement: 

13 - _fit_density(X) 

14 - _score_density(X) 

15 

16 Accepts pandas DataFrame/Series but works internally with NumPy arrays. 

17 Returns results with same index as input if input is pandas. 

18 """ 

19 

20 contamination: float 

21 offset_: float | None 

22 

23 def __init__(self, contamination: float = 0.1): 

24 """ 

25 Parameters 

26 ---------- 

27 contamination : float, default=0.1 

28 The proportion of outliers in the dataset. Must be in (0, 0.5). 

29 """ 

30 if not 0.0 < contamination < 0.5: 

31 raise ValueError("contamination must be between 0 and 0.5") 

32 self.contamination = contamination 

33 self.offset_ = None 

34 

35 def fit(self, X: Any, y=None): 

36 self._is_pandas_input = isinstance(X, (pd.DataFrame, pd.Series)) 

37 self._input_index = X.index if self._is_pandas_input else None 

38 

39 X_np = X.values if self._is_pandas_input else np.asarray(X) 

40 if X_np.ndim != 2: 

41 raise ValueError("X must be 2D array or DataFrame") 

42 

43 self._fit_density(X_np) 

44 scores = self._score_density(X_np) 

45 self.offset_ = np.percentile(scores, 100.0 * self.contamination) 

46 return self 

47 

48 def decision_function(self, X: Any) -> np.ndarray | pd.Series: 

49 X_np, index = self._convert_input(X) 

50 decision = self._score_density(X_np) - self.offset_ 

51 return pd.Series(decision, index=index) if index is not None else decision 

52 

53 def predict(self, X: Any) -> np.ndarray | pd.Series: 

54 decision = self.decision_function(X) 

55 labels = np.ones_like(decision, dtype=int) 

56 labels[decision < 0] = -1 

57 if isinstance(decision, pd.Series): 

58 labels = pd.Series(labels, index=decision.index) 

59 return labels 

60 

61 def score_samples(self, X: Any) -> np.ndarray | pd.Series: 

62 X_np, index = self._convert_input(X) 

63 scores = self._score_density(X_np) 

64 return pd.Series(scores, index=index) if index is not None else scores 

65 

66 def _convert_input(self, X: Any): 

67 """Helper to handle pandas input.""" 

68 is_pandas = isinstance(X, (pd.DataFrame, pd.Series)) 

69 index = X.index if is_pandas else None 

70 X_np = X.values if is_pandas else np.asarray(X) 

71 if X_np.ndim != 2: 

72 raise ValueError("X must be 2D array or DataFrame") 

73 return X_np, index 

74 

75 def _fit_density(self, X: np.ndarray): 

76 raise NotImplementedError("_fit_density must be implemented in subclass") 

77 

78 def _score_density(self, X: np.ndarray) -> np.ndarray: 

79 raise NotImplementedError("_score_density must be implemented in subclass")