Source code for tadkit.base.basedensitydetector

from typing import Any

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, OutlierMixin


[docs] class BaseDensityOutlierDetector(BaseEstimator, OutlierMixin): """ Base class for density-based outlier detection. Subclasses must implement: - _fit_density(X) - _score_density(X) Accepts pandas DataFrame/Series but works internally with NumPy arrays. Returns results with same index as input if input is pandas. """ contamination: float offset_: float | None def __init__(self, contamination: float = 0.1): """ Parameters ---------- contamination : float, default=0.1 The proportion of outliers in the dataset. Must be in (0, 0.5). """ if not 0.0 < contamination < 0.5: raise ValueError("contamination must be between 0 and 0.5") self.contamination = contamination self.offset_ = None
[docs] def fit(self, X: Any, y=None): self._is_pandas_input = isinstance(X, (pd.DataFrame, pd.Series)) self._input_index = X.index if self._is_pandas_input else None X_np = X.values if self._is_pandas_input else np.asarray(X) if X_np.ndim != 2: raise ValueError("X must be 2D array or DataFrame") self._fit_density(X_np) scores = self._score_density(X_np) self.offset_ = np.percentile(scores, 100.0 * self.contamination) return self
[docs] def decision_function(self, X: Any) -> np.ndarray | pd.Series: X_np, index = self._convert_input(X) decision = self._score_density(X_np) - self.offset_ return pd.Series(decision, index=index) if index is not None else decision
[docs] def predict(self, X: Any) -> np.ndarray | pd.Series: decision = self.decision_function(X) labels = np.ones_like(decision, dtype=int) labels[decision < 0] = -1 if isinstance(decision, pd.Series): labels = pd.Series(labels, index=decision.index) return labels
[docs] def score_samples(self, X: Any) -> np.ndarray | pd.Series: X_np, index = self._convert_input(X) scores = self._score_density(X_np) return pd.Series(scores, index=index) if index is not None else scores
def _convert_input(self, X: Any): """Helper to handle pandas input.""" is_pandas = isinstance(X, (pd.DataFrame, pd.Series)) index = X.index if is_pandas else None X_np = X.values if is_pandas else np.asarray(X) if X_np.ndim != 2: raise ValueError("X must be 2D array or DataFrame") return X_np, index def _fit_density(self, X: np.ndarray): raise NotImplementedError("_fit_density must be implemented in subclass") def _score_density(self, X: np.ndarray) -> np.ndarray: raise NotImplementedError("_score_density must be implemented in subclass")