⬅ dqm/main.py source

1 """
2 This script is the entry point for using DQM with command line and docker
3 """
4  
5 from pathlib import Path
6 import os
7 import argparse
8 import numpy as np
9 import yaml
10 import pandas as pd
11  
12 from dqm.completeness.metric import DataCompleteness
13 from dqm.diversity.metric import DiversityIndexCalculator
14 from dqm.representativeness.metric import DistributionAnalyzer
15 from dqm.domain_gap.metrics import CMD, MMD, Wasserstein, ProxyADistance, FID, KLMVN
16 from dqm.utils.twe_logger import get_logger
17 from dqm.cli.dependency import get_available_command
18  
19 # Init the logger
20 logger = get_logger()
21  
22  
23 def load_dataframe(config_dict):
24 """
25 This function loads a pandas dataframe from the config dict passed as input.
26 This config dict comes from a pipeline configuration: An example of such pipeline is present in examples/ folder
27  
28 Args:
29 config_dict (dict): Dict containing a metric configuration
30 """
31  
32 extension_filter = ""
33 separator = ","
34 dataset_path = config_dict["dataset"]
35  
36 # Scan existing extension and separator fields if they exists
37 if "extension" in config_dict.keys():
38 extension_filter = config_dict["extension"]
39 if "separator" in config_dict.keys():
40 separator = config_dict["separator"]
41  
42 df = pd.DataFrame()
43  
44 # Check if dataset path exists
45 if not os.path.exists(dataset_path):
46 logger.exception(
47 "FileNotFoundError -> The dataset %s does not exists", dataset_path
48 )
49 raise FileNotFoundError
50  
51 # In case of a path to a directory , iterate on files and concatenate raw data
52 if os.path.isdir(dataset_path):
53 search_path = Path(dataset_path)
54  
55 # Search all files in folder and subfolder with specified extension
56 file_list = [str(x) for x in list(search_path.rglob("*." + extension_filter))]
57 logger.debug("Number of files found in target folder : %s", str(len(file_list)))
58  
59 if not file_list:
60 logger.exception(
61 "Error, no data files have been found in the dataset folder"
62 )
63 raise ValueError
64  
65 # Concatenate raw data of found datafiles
66 for file_path in file_list:
67 tmp_df = load_raw_data(file_path, separator)
68 df = pd.concat([df, tmp_df])
69  
70 else: # otherwise direct load file content as dataframe
71 df = load_raw_data(dataset_path, separator)
72  
73 return df
74  
75  
76 def load_raw_data(file, separator):
77 """
78 This function load a raw data file content as a pandas dataframe
79  
80 Args:
81 file (str): Path of the file to load
82 separator (str): Separator to use when processing csv and txt format file
83  
84 Returns:
85 df (pandas.DataFrame): Output dataframe
86 """
87  
88 # Check if the passed extension of input path is supported
89 extension = file.split(".")[-1]
90 if extension not in ["csv", "txt", "xslx", "xls", "parquet", "pq"]:
91 logger.exception(
92 "The file named %s has an extension that is not supported :--> %s",
93 file,
94 extension,
95 )
96 raise ValueError
97  
98 # Call the appropriate read function
99 match extension:
100 case "csv" | "txt":
101 df = pd.read_csv(file, sep=separator)
102 case "xslx" | "xls":
103 df = pd.read_excel(file)
104 case "parquet" | "pq":
105 df = pd.read_parquet(file)
106  
107 return df
108  
109  
110 def main():
111 """
112 Main script of DQM component:
113  
114 Args:
115 pipeline_config_path (str): Path to the pipeline definition you want to apply
116 result_file_path : (str): Path the output YAML file where all computed metrics scores are stored
117 """
  • W291 Trailing whitespace
118 # TODO : insertion of new command line inside DQM, will be refactored in v 2.0, for instance
119 # We keep classical execution with a fake "legacy" command
  • W293 Blank line contains whitespace
120
  • E251 Unexpected spaces around keyword / parameter equals (in 2 places)
  • W291 Trailing whitespace
121 parser = argparse.ArgumentParser(description="Main script of DQM", add_help = False)
122 command_list = get_available_command()
123 command_list["legacy"] = None # We add a legacy command by default
124  
  • E501 Line too long (127 > 120 characters)
125 parser.add_argument("command", choices=command_list, default="legacy", nargs='?', help="Available command for your dqm-ml")
126  
127 cli_args, remaining = parser.parse_known_args(None)
128  
129 if cli_args.command != "legacy":
130 if cli_args.command in command_list and command_list[cli_args.command] is not None:
131 command_list[cli_args.command](remaining)
132 else:
133 raise ValueError(f"Unkow comand {cli_args.command}")
134 return
  • W293 Blank line contains whitespace
135
136 parser = argparse.ArgumentParser(
137 prog="dqm-ml", description="DQM-ML Pipeline client", epilog="for more informations see README"
138 )
139  
140 # For legacy cli, we continue with default cli
141 parser.add_argument(
142 "--pipeline_config_path",
143 required=True,
144 type=str,
145 help="Path to the pipeline definition where you specify each metric you want to compute and its params",
146 )
147  
148 parser.add_argument(
149 "--result_file_path",
150 required=True,
151 type=str,
152 help="Path the output YAML file where all computed metrics scores are stored",
153 )
154  
155  
156  
  • E303 Too many blank lines (3)
157 args = parser.parse_args()
158  
159 logger.info("Starting DQM-ML . .")
160  
161 # Read the pipeline configuration file
162  
163 with open(args.pipeline_config_path, "r", encoding="utf-8") as stream:
164 pipeline_config = yaml.safe_load(stream)
165  
166 # Create output diretory if it does not exist
167  
168 Path((os.sep).join(args.result_file_path.split(os.sep)[:-1])).mkdir(
169 parents=True, exist_ok=True
170 )
171  
172 logger.debug("creation directory : %s ", args.result_file_path.split(os.sep)[:-1])
173  
174 # Init output results dict, we start from the input config dict, we will just complete this dict with scores fields
175  
176 res_dict = pipeline_config.copy()
177  
178 # Loop on metrics to compute
179  
180 for idx in range(0, len(pipeline_config["pipeline_definition"])):
181 item = pipeline_config["pipeline_definition"][idx]
182  
183 # For metrics working only on tabular (all metrics excepted domain gap category)
184 if item["domain"] != "domain_gap":
185 logger.info(
186 "procesing dataset : %s for domain : %s ",
187 item["dataset"],
188 item["domain"],
189 )
190  
191 # Load dataset
192 main_df = load_dataframe(item)
193  
194 # Init list of columns on which metrics shall be computed
195 working_columns = list(main_df.columns) # By default, consider all columns
196  
197 # Overload with column_names fied, if it does exist in configuration
198 if "columns_names" in item.keys():
199 working_columns = item["columns_names"]
200  
201 # Init score field that will be filled
202 res_dict["pipeline_definition"][idx]["scores"] = {}
203  
204 # Call the corresponding metric computation functions
205 match item["domain"]:
206 case "completeness":
207 # Compute overall completness scores
208 completeness_evaluator = DataCompleteness()
209 res_dict["pipeline_definition"][idx]["scores"]["overall_score"] = (
210 completeness_evaluator.completeness_tabular(main_df)
211 )
212  
213 # Compute column specific completeness
214 for col in working_columns:
215 res_dict["pipeline_definition"][idx]["scores"][col] = (
216 completeness_evaluator.data_completion(main_df[col])
217 )
218  
219 case "diversity":
220 # Compute diversity scores
221 metric_calculator = DiversityIndexCalculator()
222  
223 for metric in item["metrics"]:
224 res_dict["pipeline_definition"][idx]["scores"][metric] = {}
225 for col in working_columns:
226 match metric:
227 case "simpson":
228 computed_score = metric_calculator.simpson(
229 main_df[col]
230 )
231 case "gini":
232 computed_score = metric_calculator.gini(
233 main_df[col]
234 )
235 case _:
236 raise ValueError(
237 "The given metric", metric, "is not implemented"
238 )
239  
240 res_dict["pipeline_definition"][idx]["scores"][metric][
241 col
242 ] = computed_score
243  
244 case "representativeness":
245 # Prepare output fields in result dict
246 for metric in item["metrics"]:
247 res_dict["pipeline_definition"][idx]["scores"][metric] = {}
248  
249 # Init analyzer
250 bins = item["bins"]
251 distribution = item["distribution"]
252  
253 # Compute representativeness
254 for col in working_columns:
255 var = main_df[col]
256 mean = np.mean(var)
257 std = np.std(var)
258 analyzer = DistributionAnalyzer(var, bins, distribution)
259  
260 for metric in item["metrics"]:
261 match metric:
262 case "chi-square":
263 pvalue, _ = analyzer.chisquare_test()
264 computed_score = pvalue
265  
266 case "kolmogorov-smirnov":
267 computed_score = analyzer.kolmogorov(mean, std)
268  
269 case "shannon-entropy":
270 computed_score = analyzer.shannon_entropy()
271  
272 case "GRTE":
273 grte_result, _ = analyzer.grte()
274 computed_score = grte_result
275  
276 case _:
277 raise ValueError(
278 "The given metric", metric, "is not implemented"
279 )
280  
281 res_dict["pipeline_definition"][idx]["scores"][metric][
282 col
283 ] = computed_score
284  
285 # Specificely for domain gap metrics . .
286 else:
287 # Init score output file
288 res_dict["pipeline_definition"][idx]["scores"] = {}
289  
290 # Iterate on metrics items
291  
292 for metric_dict in item["metrics"]:
293 config_method = metric_dict["method_config"]
294 metric = metric_dict["metric_name"]
295  
296 logger.info(
297 "procesing domain gap for metric : %s for source dataset : %s and target dataset : %s",
298 metric,
299 config_method["DATA"]["source"],
300 config_method["DATA"]["target"],
301 )
302  
303 match metric:
304 case "wasserstein":
305 wass = Wasserstein()
306 computed_score = wass.compute_1D_distance(config_method)
307  
308 case "FID":
309 fid = FID()
310 computed_score = fid.compute_image_distance(config_method)
311  
312 case "KLMVN":
313 klmvn = KLMVN()
314 computed_score = klmvn.compute_image_distance(config_method)
315  
316 case "PAD":
317 pad = ProxyADistance()
318 computed_score = pad.compute_image_distance(config_method)
319  
320 case "MMD":
321 mmd = MMD()
322 computed_score = mmd.compute(config_method)
323  
324 case "CMD":
325 cmd = CMD()
326 computed_score = cmd.compute(config_method)
327  
328 case _:
329 logger.exception(
330 "The given metric %s is not implemented", metric
331 )
332 raise ValueError
333  
334 # Add computed metric to results
335  
336 res_dict["pipeline_definition"][idx]["scores"][metric] = float(
337 computed_score
338 )
339  
340 # Export final results to yaml file
341 with open(args.result_file_path, "w+", encoding="utf-8") as ff:
342 yaml.dump(res_dict, ff, default_flow_style=False, sort_keys=False)
343  
344 logger.info("pipeline final results exported to file : %s", args.result_file_path)
345  
346  
347 if __name__ == "__main__":
348 main()