Coverage for uqmodels/processing.py: 51%
371 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 14:29 +0000
1#######################################################################################################
2# Source link to base functionality for data processing
4import json
5import os
6import pickle
7from abc import ABC
8from pathlib import Path
10import numpy
11import pandas as pd
12from sklearn.base import BaseEstimator
14from uqmodels.preprocessing.structure import Structure
16# Exeption ###################################
19class EstimatorNotFitted(Exception):
20 pass
23# Data Loader ##################################
26def split_path(path):
27 """Split path into list of folder name using path.split iterativly.
29 Args:
30 path (str): path
31 """
32 folder_name = []
33 while path is not "":
34 path, tail = os.path.split(path)
35 print(path)
36 folder_name.append(tail)
37 return folder_name[::-1]
40def to_list(obj):
41 """Put obj in list or do nothing if obj is already a list"""
42 if not isinstance(obj, list):
43 return [obj]
44 else:
45 return obj
48class MyEncoder(json.JSONEncoder):
49 def default(self, obj):
50 if isinstance(obj, numpy.integer):
51 return int(obj)
52 elif isinstance(obj, numpy.floating):
53 return float(obj)
54 elif isinstance(obj, numpy.ndarray):
55 return obj.tolist()
56 elif isinstance(obj, Structure):
57 return obj.toJSON()
58 elif callable(obj):
59 return obj.__name__
60 elif hasattr(obj, "isoformat"):
61 return obj.isoformat()
62 else:
63 try:
64 return super(MyEncoder, self).default(obj)
65 except BaseException:
66 return obj.__name__
69# Naive read and load API ##################################
72def write_function(values, filename):
73 """Auxiliar write function for file manipulation that hold csv/ pickle /json
75 Args:
76 values(obj): values to store
77 filename (str): name of file
79 Raises:
80 FileNotFoundError: Error raise when file is not found
81 """
82 if isinstance(values, pd.core.frame.DataFrame):
83 write_type = "pandas"
84 elif str(filename).find(".json") > -1:
85 write_type = "json"
86 else:
87 write_type = "pickle"
89 if write_type == "pickle":
90 filename_p = Path((str(filename) + ".p").replace(".p.p", ".p"))
91 pickle.dump(values, open(filename_p, "wb"))
93 elif write_type == "json":
94 with open(filename, "w") as fp:
95 for key in values.keys():
96 if not isinstance(values[key], str):
97 pass
98 json.dump(values, fp, cls=MyEncoder)
100 elif write_type == "pandas":
101 filename_csv = Path((str(filename) + ".csv").replace(".csv.csv", ".csv"))
102 file = open(filename_csv, "w")
103 values.to_csv(file)
104 file.close()
107def read_function(filename):
108 """Auxiliar read function for file manipulation that hold csv/ pickle /json
110 Args:
111 filename (str): name of file
113 Raises:
114 FileNotFoundError: _description_
116 Returns:
117 values: values loaded
118 """
119 values = None
120 filename_csv = Path((str(filename) + ".csv").replace(".csv.csv", ".csv"))
121 filename_p = Path((str(filename) + ".p").replace(".p.p", ".p"))
122 filename_json = Path((str(filename) + ".json").replace(".json.json", ".json"))
123 flag_csv = False
124 flag_p = False
125 flag_json = False
127 read_type = "None"
128 if filename_csv.is_file():
129 read_type = "pandas"
130 flag_csv = True
132 if filename_p.is_file():
133 read_type = "pickle"
134 flag_p = True
136 if filename_json.is_file():
137 read_type = "json"
138 flag_json = True
140 if (flag_csv & flag_p) & flag_json:
141 print(
142 "warning csv/pickle/json with same name : "
143 + filename
144 + ".p priority to pickle file"
145 )
147 if read_type == "pickle":
148 file = open(filename_p, "rb")
149 values = pickle.load(file)
150 file.close()
152 elif read_type == "json":
153 file = open(filename_json, "r")
154 values = json.load(file)
155 file.close()
157 elif read_type == "pandas":
158 values = pd.read_csv(open(filename_csv, "rb"))
160 elif Path(filename).is_dir():
161 values = str(filename)
163 else:
164 raise FileNotFoundError("Warning: not found", str(filename))
166 return values
169def write(storing, keys, values, **kwargs):
170 """Write API for file management
172 Args:
173 storing (str): global path of values to read
174 keys (list of str): local path as list of folder + filename as last key
175 values (obj): values to write
177 """
178 if isinstance(storing, dict):
179 mode = "dict"
180 elif isinstance(storing, str):
181 mode = "file"
182 else:
183 print("storing have to be a 'dict' or 'str path_file'", type(storing))
185 if mode == "dict":
186 sub_dict = storing
187 if isinstance(keys, str):
188 keys = split_path(keys)
190 for k in keys[:-1]:
191 if k not in list(sub_dict.keys()):
192 sub_dict[k] = {}
193 sub_dict = sub_dict[k]
194 sub_dict[keys[-1]] = values
196 elif mode == "file":
197 full_path = storing
198 if isinstance(keys, str):
199 keys = split_path(keys)
201 for k in keys[:-1]:
202 full_path = os.path.join(full_path, k)
203 os.makedirs(os.path.dirname(full_path), exist_ok=True)
204 full_path = os.path.join(full_path, keys[-1])
205 filename = Path(full_path)
206 os.makedirs(os.path.dirname(filename), exist_ok=True)
207 write_function(values, filename)
210def read(storing, keys, **kwargs):
211 """Read API for file management
213 Args:
214 storing (str): global path of values to read
215 keys (list of str): local path as list of folder + filename as last key
216 """
218 if isinstance(storing, dict):
219 mode = "dict"
220 elif isinstance(storing, str):
221 mode = "file"
222 else:
223 print("storing have to be a 'dict' or 'str path_file'")
225 if mode == "dict":
226 sub_dict = storing
227 if isinstance(keys, str):
228 return sub_dict[keys]
229 else:
230 for n, k in enumerate(keys):
231 if k in list(sub_dict.keys()):
232 sub_dict = sub_dict[k]
233 if n + 1 == len(keys):
234 return sub_dict
235 else:
236 return None
238 elif mode == "file":
239 if not isinstance(storing, str):
240 print("ERROR : storing is not a path")
241 full_path = storing
242 if isinstance(keys, str):
243 full_path = os.path.join(full_path, keys)
244 else:
245 full_path = os.path.join(full_path, *keys)
246 filename = Path(full_path)
247 return read_function(filename)
249 else:
250 print("mode have to be 'dict' or 'file'")
253# Data Loader ##################################
256class Data_loader(ABC):
257 def __init__(self, data_loader_api=read):
258 """Data loader object : aim to instanciate a generic data loader that handle a query to return selected data.
260 Args:
261 data_loader_api (_type_, optional): API of loading . Defaults to read form store.py
262 """
263 self.data_loader_api = data_loader_api # API loader
265 def load(self, dict_query):
266 """load form a dict_query that will be provide to the data_loader_api function
268 Args:
269 dict_query (dict): query as a dict that contains argument of the self.data_loader_api
271 Raises:
272 FileNotFoundError: error if file not found
274 Returns:
275 selected_data: selected_data loaded by the data_loader_api function from the dict_query
276 """
277 # Load from data storage using data_link (or API)
278 data = self.data_loader_api(**dict_query)
279 # Select data from query if can't not be done by data provider.
280 selected_data = data
281 if data is None:
282 raise FileNotFoundError("Erreur query", dict_query)
283 return selected_data
286# Cache ##################################
289def set_query_to_cache_query(filename, with_arborescence=False, storing=None):
290 """Generate function that translate query to a cache API query
291 Only compatible with write/read uqmodels.processing API
292 Args:
293 filename (_type_): Name of file to store data & processor
294 with_arborescence (bool, optional): store in file arborescence. Defaults to False.
295 returns:
296 query_to_cache_query : function for cache manager object
297 """
299 def query_to_cache_query(query, filename=filename, values=None):
300 cache_query = {"storing": storing, "keys": []}
302 if "storing" in query.keys():
303 cache_query["storing"] = query["storing"]
305 if values is not None:
306 cache_query["values"] = values
308 elif "values" in query.keys():
309 cache_query["values"] = query["values"]
311 # Generation of storage location
313 if "keys" in query.keys():
314 cache_query["keys"] = query["keys"]
316 if "name" in query.keys():
317 cache_query["keys"].append(query["name"])
319 if "processing" in query.keys():
320 for name in query["processing"]:
321 cache_query["keys"].append(name)
323 if filename is not None:
324 cache_query["keys"].append(filename)
326 if not (with_arborescence):
327 new_keys = cache_query["keys"][0]
328 for key in cache_query["keys"][1:]:
329 new_keys = new_keys + "_" + key
330 cache_query["keys"] = new_keys
332 if cache_query["keys"] == []:
333 cache_query["keys"] = ["data"]
335 return cache_query
337 return query_to_cache_query
340class Cache_manager:
341 def __init__(
342 self,
343 save_API=write,
344 load_API=read,
345 storing="",
346 default_filename="",
347 query_to_cache_query=None,
348 ):
349 """Cache manager object : aim to save/load results or estimators using provided save_API & load_API
350 and query_to_cache_query to transform query into cache_manager query
352 Args:
353 save_API (_type_, optional): save API. Defaults to write.
354 load_API (_type_, optional): load API. Defaults to read.
355 storing (str, optional): storing path. Defaults to 'data'.
356 query_to_cache_query (function, optional): function that create cache manager query from query.
357 Defaults to default_query_to_cache_query.
358 """
359 self.storing = storing
360 self.default_filename = default_filename
361 self.save_API = save_API
362 self.load_API = load_API
363 self.query_to_cache_query = query_to_cache_query
365 def load(self, query, filename=None):
366 """load obj at query + filname location using save_API
368 Args:
369 query (dict): query to interpret by query_to_cache_query
370 filename (str, optional): filename of object if not provided by query. Defaults to None.
372 Raises:
373 FileNotFoundError: _description_
374 FileNotFoundError: _description_
376 Returns:
377 _type_: _description_
378 """
379 if type(query is dict):
380 if "storing" not in query.keys():
381 query["storing"] = self.storing
383 if filename is None:
384 filename = self.default_filename
386 if self.query_to_cache_query is None:
387 new_query = self.default_query_to_cache_query(query, filename)
388 else:
389 new_query = self.query_to_cache_query(query, filename)
390 try:
391 obj = self.load_API(**new_query)
393 except (FileNotFoundError, NotADirectoryError):
394 raise FileNotFoundError(new_query["storing"], new_query["keys"])
396 if obj is None:
397 raise FileNotFoundError(new_query["storing"], new_query["keys"])
399 return obj
401 def save(self, query, obj, filename=None, verbose=False):
402 """save obj at query + filename location using load_API
404 Args:
405 query (dict): query to interpret by query_to_cache_query
406 obj (obj): object to store by save_API
407 filename (_type_, optional): filename of object if not provided by query. Defaults to None.
408 """
409 # Save data link to a query on cache_link using it's unique id-query
410 # print('save', query, filename)
412 if query is not {}:
414 # Use by default storing and filename
415 if type(query is dict):
416 if "storing" not in query.keys():
417 query["storing"] = self.storing
419 if filename is None:
420 filename = self.default_filename
422 if self.query_to_cache_query is None:
423 new_query = self.default_query_to_cache_query(
424 query, filename, values=obj
425 )
426 else:
427 new_query = self.query_to_cache_query(query, filename, values=obj)
429 if verbose:
430 print("save", new_query["keys"], type(new_query["values"]))
431 # Replace name_of_file if specified
433 self.save_API(**new_query)
434 else:
435 print("skip save")
437 def default_query_to_cache_query(self, query, filename=None, values=None):
438 """default_query_to_keys : function that create a Save/Load query for the cache_manager load/save api_function
439 from query
441 Args:
442 query (_type_): query to interpret
443 filename (str, optional): default_storage_filename. Defaults to 'object.p'.
444 values (obj or None, optional): obj to store by load_API/ if None then query_to_save_API
446 Returns:
447 dict_new_q: dict_parameters to provide to save_API or load_API
448 """
449 dict_new_q = {"storing": "", "keys": []}
451 if values is not None:
452 dict_new_q["values"] = values
454 if "storing" in query.keys():
455 dict_new_q["storing"] = query["storing"]
456 else:
457 print("warning : Incompatible query for default_query_to_new_query")
458 if "keys" in query.keys():
459 dict_new_q["keys"] = query["keys"]
461 if "name" in query.keys():
462 dict_new_q["keys"].append(query["name"])
464 if "source" in query.keys():
465 dict_new_q["keys"].append(query["source"])
467 if filename is not None:
468 dict_new_q["keys"].append(filename)
470 if dict_new_q["keys"] == []:
471 print("warning : Incompatible query for default_query_to_new_query")
473 return dict_new_q
476# Processor ##################################
479class Processor:
480 def __init__(self, name="processor", cache=None, update_query=None, **kwargs):
481 """Processor class : that aim to process data in a (fit/transform) scheme and hold a cache manager
482 functionality to save/load object
484 Args:
485 name (str, optional): Name of processor. Defaults to 'processor'.
486 cache (Cache_manager or None, optional): Cache manager. Defaults to None : no save/load procedure
487 update_query (function, optional): Function to update query due to Processor application if needed.
488 Defaults to default_update_query : no update/
489 """
490 self.name = name
491 self.cache = cache
492 self._update_query = update_query
493 for key_arg in kwargs.keys():
494 setattr(self, key_arg, kwargs[key_arg])
495 self.is_fitted = False
497 def default_update_query(self, query):
498 type_ = type(query)
499 if type_ == dict:
500 pass
501 if type_ == str:
502 pass
503 else:
504 print(
505 "Warning type of query not hold by update_query : define your own function of query update : "
506 'replace by str "data_"'
507 )
508 query = "data_"
509 return query
511 def fit(self, data=None):
512 """Fit Processor using data
514 Args:
515 data (obj, optional): data. Defaults to None.
516 """
517 # Fit processor using train_data
518 self.is_fitted = True
520 def transform(self, data=None):
521 """Apply Processor to data
523 Args:
524 data (obj, optional): data. Defaults to None.
525 """
526 return data
528 def fit_transform(self, data=None):
529 """Fit Processor and apply it on data
531 Args:
532 data (obj, optional): data. Defaults to None.
533 """
534 self.fit(data)
535 data = self.transform(data)
536 return data
538 def save(self, query=None, object=None, name=None):
539 """Save method to store object at queery+name location using cache_manager
541 Args:
542 query (dict, optional): query_paramaters. Defaults to None.
543 object (obj, optional): object to store. Defaults to None.
544 name (_type_, optional): filename of obj to store. Defaults to None.
545 """
546 if self.cache is not None:
547 if object is not None:
548 self.cache.save(query, object, name)
549 else:
550 self.cache.save(query, self, self.name)
552 def load(self, query=None, name=None):
553 """Load method to load Processor at query+name location using cache_manager
555 Args:
556 query (dict, optional): query_paramaters. Defaults to None.
557 name (_type_, optional): filename of obj to load. Defaults to None.
558 """
559 loaded_processor = None
561 if self.cache is None:
562 raise FileNotFoundError
564 else:
565 try:
566 if name is None:
567 name = self.name
568 loaded_processor = self.cache.load(query, name)
569 for property, value in vars(loaded_processor).items():
570 self.__setattr__(property, value)
571 self.is_fitted = True
572 except BaseException:
573 raise FileNotFoundError
575 return loaded_processor
577 def update_query(self, query):
578 """Apply the update_query_function provided at init to update query
579 Args:
580 query (dict): query
582 Returns:
583 new_query: updated query
584 """
585 if self._update_query is None:
586 new_query = self.default_update_query(query)
587 else:
588 new_query = self._update_query(query)
589 return new_query
591 def use_cache(self, query):
592 """Use_cache manager to check if there is cache link to data already processed
594 Args:
595 query (dict): query
597 Raises:
598 FileNotFoundError: cache Not Found error caught by method that called use_case
600 Returns:
601 data: data if
602 """
603 query = self.update_query(query)
604 if self.cache is None:
605 raise FileNotFoundError
606 else:
607 try:
608 data = self.cache.load(query)
609 return data
611 except (FileNotFoundError, NotADirectoryError):
612 raise FileNotFoundError
615# Pipeline ##################################
618class Pipeline(BaseEstimator):
619 # Preprocessing Pipeline : Load, Formalisation and Cache management for each query (i.e each similar data source)
620 def __init__(
621 self, data_loader=None, list_processors=[], verbose=False, skip_cache=False
622 ):
623 """Pipeline object aim to apply a processing pipeline that include data_loader and sequence of
624 Processing with cache management procedure for each Processor
626 Args:
627 data_loader (Data_loader): Data_loader that use a load_API to load data
628 list_processors (list of Processor): List of Processor to fit and apply
629 verbose (bool, optional): use verbose mode or not. Defaults to False.
630 """
631 self.skip_cache = False
632 self.data_loader = data_loader
633 self.list_processors = list_processors
634 self.verbose = verbose
636 def fit(self, query, save_processor=False):
637 """Apply a fiting procedure to the pipeline with data_loader and a list of processor to a query/querries_list
638 if no data_loader provide data instead of "query"
640 Args:
641 query_or_list (_type_): Query or List of query to provide to the data_loader
643 """
644 if self.verbose:
645 print("Fiting procedure")
647 for q in to_list(query):
648 if self.verbose:
649 print("For ", q)
651 if self.data_loader is None:
652 data = q
654 else:
655 data = self.data_loader.load(q)
657 if self.verbose:
658 print("Data loaded")
660 for n, processor in enumerate(self.list_processors):
661 try:
662 processor.load(q)
663 if self.verbose:
664 print("Skip fit " + processor.name)
666 except BaseException:
667 if self.verbose:
668 print("Fit " + processor.name)
669 if hasattr(processor, "transform"): # Processor
670 processor.fit(data, q, save_processor)
671 else: # Estimator
672 X, y = data[0], data[1]
673 processor.fit(X, y)
675 # Update data using processor
676 data = processor.transform(data, q)
677 # Update query
678 q = processor.update_query(q)
680 def transform(self, query_or_list):
681 """Apply a pipeline with data_loader and a list of processor to a query o a list of query provide a generator
682 If query not found in cache, Load data and tranform data using fitted processor
683 If no data_loader provide data instead of "query_or_list"
685 Args:
686 query_or_list (query): Query or List of query to provide to the data_loader
688 Returns:
689 a Data gerenator that provide returned by the last Processor of the piepline
691 Yields:
692 generator : generator that will apply pipeline to each querries provided.
693 """
695 if self.verbose:
696 print("Transform procedure")
698 for q in to_list(query_or_list):
699 # Generate the id_querry
701 if self.verbose:
702 print("Work on ", str(q["source"]))
703 # Look if there is data in some cache-step
704 strat_marker = len(self.list_processors)
706 # For each formalizer (in reverse order)
707 for n, processor in enumerate(self.list_processors[::-1]):
708 new_q = q
709 try:
710 # Update query upon actual processor : strat_marker - n - 1
711 # Doesn't consider himself, since it udpate_query is done in use_cache
712 for proc in self.list_processors[: strat_marker - n - 1]:
713 if hasattr(proc, "update_query"):
714 new_q = proc.update_query(new_q)
716 if hasattr(processor, "use_cache"):
717 data = processor.use_cache(new_q)
718 print("Recover data from cache :", new_q)
719 break
721 except FileNotFoundError:
722 # Marker of pipeline start :
723 strat_marker += -1
725 if strat_marker == 0:
726 print(
727 "Pipeline : No cache data found : load data and execute the whole pipeline"
728 )
729 # If no cache found load data
730 if self.data_loader is None:
731 data = q
733 else:
734 data = self.data_loader.load(q)
736 new_q = q
737 # Apply chain of processor
738 for n, processor in enumerate(self.list_processors):
739 if n < strat_marker:
740 # Pass step upsteam to the cache
741 if self.verbose:
742 print("Skip " + processor.name)
743 else:
744 if hasattr(processor, "transform"):
745 if self.verbose:
746 print("Tranform " + processor.name)
747 data = processor.transform(data, new_q)
748 new_q = processor.update_query(new_q)
750 elif hasattr(processor, "predict"):
751 if len(data) == 2:
752 X, y = data
753 if len(data) > 2:
754 X, y = data[0], data[1]
755 data = processor.predict(X, y)
757 yield data
759 def fit_transform(self, query_or_list, save_processor=False):
760 """Fit tranform procedure : apply sucessively fit then transform
762 Args:
763 query_or_list (_type_): query or list of query to apply
764 save_processor (bool, optional): _description_. Defaults to False.
766 Returns:
767 _type_: _description_
768 """
769 self.fit(query_or_list, save_processor)
770 data_processed = self.transform(query_or_list)
771 return data_processed