Coverage for dqm/representativeness/metric.py: 59%

103 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:00 +0000

1""" 

2This script provides functions for analyzing data distribution using chi-square tests, 

3goodness-of-fit tests, Kolmogorov-Smirnov tests, Shannon entropy, and confidence intervals. 

4 

5Authors: 

6 Faouzi ADJED 

7 Anani DJATO 

8 

9Dependencies: 

10 numpy 

11 pandas 

12 matplotlib 

13 scipy 

14 seaborn 

15 dqm.utils.twe_logger 

16 

17Classes: 

18 DistributionAnalyzer: Class for analyzing data distribution 

19 

20Functions: None 

21 

22Usage: Import this script and use the provided functions for distribution analysis. 

23 

24""" 

25 

26from typing import Optional, Tuple, Any 

27import pandas as pd 

28import numpy as np 

29from scipy import stats 

30from dqm.representativeness.utils import VariableAnalysis, DiscretisationParams 

31from dqm.utils.twe_logger import get_logger 

32 

33logger = get_logger() 

34variable_analyzer = VariableAnalysis() 

35 

36 

37class DistributionAnalyzer: 

38 """ 

39 Class for analyzing data distribution. 

40 

41 Args: 

42 data (pd.DataFrame): The data to be analyzed. 

43 bins (int): The number of bins for analysis. 

44 distribution (str): The distribution type ('normal' or 'uniform'). 

45 

46 Methods: 

47 chisquare_test: Perform the chi-square test on the provided data. 

48 Returns p-value and confidence intervals 

49 

50 kolmogorov: Calculate the Kolmogorov-Smirnov test for the chosen distribution. 

51 Returns the KS test p-value. 

52 

53 shannon_entropy: Calculate Shannon entropy for the provided intervals. 

54 Returns Shannon entropy. 

55 

56 grte: Calculates the Granular Relative and Theoretical Entropy (GRTE) for given data 

57 Returns The calculated GRTE value and the intervals discretized data 

58 """ 

59 

60 def __init__(self, data: pd.Series, bins: int, distribution: str): 

61 """ 

62 Initialize DistributionAnalyzer with the provided data and parameters. 

63 

64 Args: 

65 data (pd.Series): The data to be analyzed. 

66 bins (int): The number of bins for analysis. 

67 distribution (str): The distribution type ('normal' or 'uniform'). 

68 """ 

69 self.data = data 

70 self.bins = bins 

71 self.distribution = distribution 

72 self.logger = get_logger() 

73 self.variable_analyzer = VariableAnalysis() 

74 

75 def chisquare_test( 

76 self, *par_dist: Optional[Tuple[float, float]] 

77 ) -> Tuple[float, pd.Series]: 

78 """ 

79 Perform a chi-square test for goodness of fit. 

80 

81 This method analyzes the distribution of data using a chi-square test 

82 for goodness of fit. It supports normal and uniform distributions. 

83 

84 Args: 

85 *par_dist (float): Parameters for the specified distribution. 

86 

87 Returns: 

88 p-value (float): The p-value from the chi-square test 

89 intervals_frequencies (pd.DataFrame): The DataFrame containing 

90 observed and expected frequencies. 

91 """ 

92 if self.data.dtypes in ("O", "bool"): 

93 self.logger.error("Categorical or boolean data are not processed yet.") 

94 return float("nan"), pd.Series(dtype="float") 

95 

96 # Create a dictionary for distribution parameters 

97 distribution_params = {"theory": self.distribution} 

98 

99 if self.distribution == "normal": 

100 # if len(par_dist)>0: 

101 if len(par_dist) == 2: 

102 mean = par_dist[0] 

103 std = par_dist[1] 

104 else: 

105 mean = np.mean(self.data) 

106 std = np.std(self.data) 

107 # Update distribution parameters for uniform distribution 

108 distribution_params.update( 

109 { 

110 "empirical": variable_analyzer.normal_discretization( 

111 self.bins, mean, std 

112 ), 

113 "mean": mean, 

114 "std": std, 

115 } 

116 ) 

117 

118 # discrete_distrib = variable_analyzer.normal_discretization(bins, mean, std) 

119 # Create an instance of DiscretisationParams 

120 discretisation_params = DiscretisationParams(self.data, distribution_params) 

121 intervals_frequencies = variable_analyzer.discretisation_intervals( 

122 discretisation_params 

123 ) 

124 

125 if sum(intervals_frequencies["exp_freq"] == 0) != 0: 

126 logger.error( 

127 "Number of intervals is to large to get acceptable expected values" 

128 ) 

129 

130 chi = stats.chisquare( 

131 intervals_frequencies["obs_freq"], intervals_frequencies["exp_freq"] 

132 ) 

133 

134 elif self.distribution == "uniform": 

135 # if len(par_dist)>0: 

136 if len(par_dist) == 2: 

137 min_value = par_dist[0] 

138 max_value = par_dist[1] 

139 else: 

140 min_value = np.min(self.data) 

141 max_value = np.max(self.data) 

142 # Update distribution parameters for uniform distribution 

143 distribution_params.update( 

144 { 

145 "empirical": variable_analyzer.uniform_discretization( 

146 self.bins, min_value, max_value 

147 ), 

148 "mean": min_value, 

149 "std": max_value, 

150 } 

151 ) 

152 

153 # discrete_distrib = variable_analyzer.uniform_discretization(bins, min_value, max_value) 

154 # Create an instance of DiscretisationParams 

155 discretisation_params = DiscretisationParams(self.data, distribution_params) 

156 intervals_frequencies = variable_analyzer.discretisation_intervals( 

157 discretisation_params 

158 ) 

159 

160 if sum(intervals_frequencies["exp_freq"] == 0) != 0: 

161 logger.error( 

162 "Number of intervals is to large to get acceptable expected values" 

163 ) 

164 

165 chi = stats.chisquare( 

166 intervals_frequencies["obs_freq"], intervals_frequencies["exp_freq"] 

167 ) 

168 

169 if chi.pvalue < 0.05: 

170 logger.info( 

171 "pvalue = %s < 0.05: Data is not following the %s distribution", 

172 chi.pvalue, 

173 self.distribution, 

174 ) 

175 else: 

176 logger.info( 

177 "pvalue = %s >= 0.05: Data is following the %s distribution", 

178 chi.pvalue, 

179 self.distribution, 

180 ) 

181 return float(chi.pvalue), intervals_frequencies 

182 

183 def kolmogorov(self, *par_dist: float) -> float: 

184 """ 

185 Calculation of the Kolmogorov-Smirnov test for every distribution. 

186 

187 Args: 

188 *par_dist: arbitrary positional arguments, should be numeric 

189 

190 Returns: 

191 p-value (float): KS test p-value 

192 """ 

193 # if data.dtypes in {'O', 'bool'}: 

194 if any(isinstance(value, (str, bool)) for value in self.data): 

195 logger.error("Categorical or boolean variables are not treated yet.") 

196 return float("nan") 

197 

198 if self.distribution == "normal": 

199 if len(par_dist) != 2: 

200 logger.error("Error: Provide mean and std for normal distribution.") 

201 return float("nan") 

202 

203 mean, std = par_dist 

204 

205 elif self.distribution == "uniform": 

206 if len(par_dist) != 2: 

207 logger.error("Error: Provide min and max for uniform distribution.") 

208 return float("nan") 

209 

210 mean, std = par_dist 

211 else: 

212 logger.error("Unsupported distribution %s ", self.distribution) 

213 return float("nan") 

214 

215 k = ( 

216 stats.kstest(self.data, stats.norm.cdf, args=(mean, std)) 

217 if self.distribution == "normal" 

218 else stats.kstest(self.data, stats.uniform.cdf, args=(mean, mean + std)) 

219 ) 

220 

221 logger.info(k) 

222 

223 if k.pvalue < 0.05: 

224 logger.info( 

225 "p-value = %s < 0.05 : The data is not followingthe %s distribution", 

226 k.pvalue, 

227 self.distribution, 

228 ) 

229 else: 

230 logger.info( 

231 "p-value = %s >= 0.05 : The data is not followingthe %s distribution", 

232 k.pvalue, 

233 self.distribution, 

234 ) 

235 

236 return float(k.pvalue) 

237 

238 def shannon_entropy(self) -> float: 

239 """ 

240 Calculation of Shannon entropy. 

241 

242 Args: None 

243 

244 Returns: 

245 Shannon entropy (float): 

246 """ 

247 

248 if self.distribution == "uniform": 

249 min_value, max_value = np.min(self.data), np.max(self.data) 

250 discrete_distrib = variable_analyzer.uniform_discretization( 

251 self.bins, min_value, max_value 

252 ) 

253 # Create a dictionary for distribution parameters 

254 distribution_params = { 

255 "theory": self.distribution, 

256 "empirical": discrete_distrib, 

257 "mean": min_value, 

258 "std": max_value, 

259 } 

260 discretisation_params = DiscretisationParams(self.data, distribution_params) 

261 intervals = variable_analyzer.discretisation_intervals( 

262 discretisation_params 

263 ) 

264 

265 if self.distribution == "normal": 

266 mean, std = np.mean(self.data), np.std(self.data) 

267 discrete_distrib = variable_analyzer.normal_discretization( 

268 self.bins, mean, std 

269 ) 

270 # Create a dictionary for distribution parameters 

271 distribution_params = { 

272 "theory": self.distribution, 

273 "empirical": discrete_distrib, 

274 "mean": mean, 

275 "std": std, 

276 } 

277 discretisation_params = DiscretisationParams(self.data, distribution_params) 

278 intervals = variable_analyzer.discretisation_intervals( 

279 discretisation_params 

280 ) 

281 

282 if intervals["exp_freq"].sum() == 0: 

283 logger.info("Leading division by zero") 

284 

285 prob_exp = intervals["exp_freq"] / intervals["exp_freq"].sum() 

286 return float(stats.entropy(prob_exp)) 

287 

288 def grte(self, *args: float) -> Tuple[float, Any]: 

289 """ 

290 Calculates the Granular Relative and Theoretical Entropy (GRTE) for given data. 

291 

292 Args: 

293 *args (float): Optional arguments. For 'uniform', provide start 

294 and end; for 'normal', provide mean and std. 

295 

296 Returns: 

297 grte_res (float): The calculated GRTE value. 

298 intervals_discretized (pd.Series): The intervals discretized data. 

299 """ 

300 # Create a dictionary for distribution parameters 

301 distribution_params = {"theory": self.distribution} 

302 

303 # Check the specified distribution type and process accordingly 

304 if self.distribution == "uniform": 

305 min_value, max_value = ( 

306 (args[0], args[1]) 

307 if len(args) == 2 

308 else (np.min(self.data), np.max(self.data)) 

309 ) 

310 logger.info("debut %s", min_value) 

311 logger.info("la fin %s", max_value) 

312 

313 # Update distribution parameters for uniform distribution 

314 distribution_params.update( 

315 { 

316 "empirical": variable_analyzer.uniform_discretization( 

317 self.bins, min_value, max_value 

318 ), 

319 "mean": min_value, 

320 "std": max_value, 

321 } 

322 ) 

323 

324 elif self.distribution == "normal": 

325 mean, std = ( 

326 (args[0], args[1]) 

327 if len(args) == 2 

328 else (np.mean(self.data), np.std(self.data)) 

329 ) 

330 

331 # Update distribution parameters for normal distribution 

332 distribution_params.update( 

333 { 

334 "empirical": variable_analyzer.normal_discretization( 

335 self.bins, mean, std 

336 ), 

337 "mean": mean, 

338 "std": std, 

339 } 

340 ) 

341 

342 else: 

343 logger.error("Expecting only uniform or normal distribution") 

344 return None, None 

345 

346 # Create an instance of DiscretisationParams 

347 discretisation_params = DiscretisationParams(self.data, distribution_params) 

348 

349 # Calculate the intervals for the discretized data 

350 intervals_discretized = variable_analyzer.discretisation_intervals( 

351 discretisation_params 

352 ) 

353 

354 # Compute GRTE using the entropy of expected and observed frequencies 

355 grte_res = np.exp( 

356 -2 

357 * abs( 

358 stats.entropy(intervals_discretized["exp_freq"]) 

359 - stats.entropy(intervals_discretized["obs_freq"]) 

360 ) 

361 ) 

362 

363 # Return the GRTE result and the discretized intervals 

364 return float(grte_res), intervals_discretized