Coverage for uqmodels/processing.py: 51%

371 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 14:29 +0000

1####################################################################################################### 

2# Source link to base functionality for data processing 

3 

4import json 

5import os 

6import pickle 

7from abc import ABC 

8from pathlib import Path 

9 

10import numpy 

11import pandas as pd 

12from sklearn.base import BaseEstimator 

13 

14from uqmodels.preprocessing.structure import Structure 

15 

16# Exeption ################################### 

17 

18 

19class EstimatorNotFitted(Exception): 

20 pass 

21 

22 

23# Data Loader ################################## 

24 

25 

26def split_path(path): 

27 """Split path into list of folder name using path.split iterativly. 

28 

29 Args: 

30 path (str): path 

31 """ 

32 folder_name = [] 

33 while path is not "": 

34 path, tail = os.path.split(path) 

35 print(path) 

36 folder_name.append(tail) 

37 return folder_name[::-1] 

38 

39 

40def to_list(obj): 

41 """Put obj in list or do nothing if obj is already a list""" 

42 if not isinstance(obj, list): 

43 return [obj] 

44 else: 

45 return obj 

46 

47 

48class MyEncoder(json.JSONEncoder): 

49 def default(self, obj): 

50 if isinstance(obj, numpy.integer): 

51 return int(obj) 

52 elif isinstance(obj, numpy.floating): 

53 return float(obj) 

54 elif isinstance(obj, numpy.ndarray): 

55 return obj.tolist() 

56 elif isinstance(obj, Structure): 

57 return obj.toJSON() 

58 elif callable(obj): 

59 return obj.__name__ 

60 elif hasattr(obj, "isoformat"): 

61 return obj.isoformat() 

62 else: 

63 try: 

64 return super(MyEncoder, self).default(obj) 

65 except BaseException: 

66 return obj.__name__ 

67 

68 

69# Naive read and load API ################################## 

70 

71 

72def write_function(values, filename): 

73 """Auxiliar write function for file manipulation that hold csv/ pickle /json 

74 

75 Args: 

76 values(obj): values to store 

77 filename (str): name of file 

78 

79 Raises: 

80 FileNotFoundError: Error raise when file is not found 

81 """ 

82 if isinstance(values, pd.core.frame.DataFrame): 

83 write_type = "pandas" 

84 elif str(filename).find(".json") > -1: 

85 write_type = "json" 

86 else: 

87 write_type = "pickle" 

88 

89 if write_type == "pickle": 

90 filename_p = Path((str(filename) + ".p").replace(".p.p", ".p")) 

91 pickle.dump(values, open(filename_p, "wb")) 

92 

93 elif write_type == "json": 

94 with open(filename, "w") as fp: 

95 for key in values.keys(): 

96 if not isinstance(values[key], str): 

97 pass 

98 json.dump(values, fp, cls=MyEncoder) 

99 

100 elif write_type == "pandas": 

101 filename_csv = Path((str(filename) + ".csv").replace(".csv.csv", ".csv")) 

102 file = open(filename_csv, "w") 

103 values.to_csv(file) 

104 file.close() 

105 

106 

107def read_function(filename): 

108 """Auxiliar read function for file manipulation that hold csv/ pickle /json 

109 

110 Args: 

111 filename (str): name of file 

112 

113 Raises: 

114 FileNotFoundError: _description_ 

115 

116 Returns: 

117 values: values loaded 

118 """ 

119 values = None 

120 filename_csv = Path((str(filename) + ".csv").replace(".csv.csv", ".csv")) 

121 filename_p = Path((str(filename) + ".p").replace(".p.p", ".p")) 

122 filename_json = Path((str(filename) + ".json").replace(".json.json", ".json")) 

123 flag_csv = False 

124 flag_p = False 

125 flag_json = False 

126 

127 read_type = "None" 

128 if filename_csv.is_file(): 

129 read_type = "pandas" 

130 flag_csv = True 

131 

132 if filename_p.is_file(): 

133 read_type = "pickle" 

134 flag_p = True 

135 

136 if filename_json.is_file(): 

137 read_type = "json" 

138 flag_json = True 

139 

140 if (flag_csv & flag_p) & flag_json: 

141 print( 

142 "warning csv/pickle/json with same name : " 

143 + filename 

144 + ".p priority to pickle file" 

145 ) 

146 

147 if read_type == "pickle": 

148 file = open(filename_p, "rb") 

149 values = pickle.load(file) 

150 file.close() 

151 

152 elif read_type == "json": 

153 file = open(filename_json, "r") 

154 values = json.load(file) 

155 file.close() 

156 

157 elif read_type == "pandas": 

158 values = pd.read_csv(open(filename_csv, "rb")) 

159 

160 elif Path(filename).is_dir(): 

161 values = str(filename) 

162 

163 else: 

164 raise FileNotFoundError("Warning: not found", str(filename)) 

165 

166 return values 

167 

168 

169def write(storing, keys, values, **kwargs): 

170 """Write API for file management 

171 

172 Args: 

173 storing (str): global path of values to read 

174 keys (list of str): local path as list of folder + filename as last key 

175 values (obj): values to write 

176 

177 """ 

178 if isinstance(storing, dict): 

179 mode = "dict" 

180 elif isinstance(storing, str): 

181 mode = "file" 

182 else: 

183 print("storing have to be a 'dict' or 'str path_file'", type(storing)) 

184 

185 if mode == "dict": 

186 sub_dict = storing 

187 if isinstance(keys, str): 

188 keys = split_path(keys) 

189 

190 for k in keys[:-1]: 

191 if k not in list(sub_dict.keys()): 

192 sub_dict[k] = {} 

193 sub_dict = sub_dict[k] 

194 sub_dict[keys[-1]] = values 

195 

196 elif mode == "file": 

197 full_path = storing 

198 if isinstance(keys, str): 

199 keys = split_path(keys) 

200 

201 for k in keys[:-1]: 

202 full_path = os.path.join(full_path, k) 

203 os.makedirs(os.path.dirname(full_path), exist_ok=True) 

204 full_path = os.path.join(full_path, keys[-1]) 

205 filename = Path(full_path) 

206 os.makedirs(os.path.dirname(filename), exist_ok=True) 

207 write_function(values, filename) 

208 

209 

210def read(storing, keys, **kwargs): 

211 """Read API for file management 

212 

213 Args: 

214 storing (str): global path of values to read 

215 keys (list of str): local path as list of folder + filename as last key 

216 """ 

217 

218 if isinstance(storing, dict): 

219 mode = "dict" 

220 elif isinstance(storing, str): 

221 mode = "file" 

222 else: 

223 print("storing have to be a 'dict' or 'str path_file'") 

224 

225 if mode == "dict": 

226 sub_dict = storing 

227 if isinstance(keys, str): 

228 return sub_dict[keys] 

229 else: 

230 for n, k in enumerate(keys): 

231 if k in list(sub_dict.keys()): 

232 sub_dict = sub_dict[k] 

233 if n + 1 == len(keys): 

234 return sub_dict 

235 else: 

236 return None 

237 

238 elif mode == "file": 

239 if not isinstance(storing, str): 

240 print("ERROR : storing is not a path") 

241 full_path = storing 

242 if isinstance(keys, str): 

243 full_path = os.path.join(full_path, keys) 

244 else: 

245 full_path = os.path.join(full_path, *keys) 

246 filename = Path(full_path) 

247 return read_function(filename) 

248 

249 else: 

250 print("mode have to be 'dict' or 'file'") 

251 

252 

253# Data Loader ################################## 

254 

255 

256class Data_loader(ABC): 

257 def __init__(self, data_loader_api=read): 

258 """Data loader object : aim to instanciate a generic data loader that handle a query to return selected data. 

259 

260 Args: 

261 data_loader_api (_type_, optional): API of loading . Defaults to read form store.py 

262 """ 

263 self.data_loader_api = data_loader_api # API loader 

264 

265 def load(self, dict_query): 

266 """load form a dict_query that will be provide to the data_loader_api function 

267 

268 Args: 

269 dict_query (dict): query as a dict that contains argument of the self.data_loader_api 

270 

271 Raises: 

272 FileNotFoundError: error if file not found 

273 

274 Returns: 

275 selected_data: selected_data loaded by the data_loader_api function from the dict_query 

276 """ 

277 # Load from data storage using data_link (or API) 

278 data = self.data_loader_api(**dict_query) 

279 # Select data from query if can't not be done by data provider. 

280 selected_data = data 

281 if data is None: 

282 raise FileNotFoundError("Erreur query", dict_query) 

283 return selected_data 

284 

285 

286# Cache ################################## 

287 

288 

289def set_query_to_cache_query(filename, with_arborescence=False, storing=None): 

290 """Generate function that translate query to a cache API query 

291 Only compatible with write/read uqmodels.processing API 

292 Args: 

293 filename (_type_): Name of file to store data & processor 

294 with_arborescence (bool, optional): store in file arborescence. Defaults to False. 

295 returns: 

296 query_to_cache_query : function for cache manager object 

297 """ 

298 

299 def query_to_cache_query(query, filename=filename, values=None): 

300 cache_query = {"storing": storing, "keys": []} 

301 

302 if "storing" in query.keys(): 

303 cache_query["storing"] = query["storing"] 

304 

305 if values is not None: 

306 cache_query["values"] = values 

307 

308 elif "values" in query.keys(): 

309 cache_query["values"] = query["values"] 

310 

311 # Generation of storage location 

312 

313 if "keys" in query.keys(): 

314 cache_query["keys"] = query["keys"] 

315 

316 if "name" in query.keys(): 

317 cache_query["keys"].append(query["name"]) 

318 

319 if "processing" in query.keys(): 

320 for name in query["processing"]: 

321 cache_query["keys"].append(name) 

322 

323 if filename is not None: 

324 cache_query["keys"].append(filename) 

325 

326 if not (with_arborescence): 

327 new_keys = cache_query["keys"][0] 

328 for key in cache_query["keys"][1:]: 

329 new_keys = new_keys + "_" + key 

330 cache_query["keys"] = new_keys 

331 

332 if cache_query["keys"] == []: 

333 cache_query["keys"] = ["data"] 

334 

335 return cache_query 

336 

337 return query_to_cache_query 

338 

339 

340class Cache_manager: 

341 def __init__( 

342 self, 

343 save_API=write, 

344 load_API=read, 

345 storing="", 

346 default_filename="", 

347 query_to_cache_query=None, 

348 ): 

349 """Cache manager object : aim to save/load results or estimators using provided save_API & load_API 

350 and query_to_cache_query to transform query into cache_manager query 

351 

352 Args: 

353 save_API (_type_, optional): save API. Defaults to write. 

354 load_API (_type_, optional): load API. Defaults to read. 

355 storing (str, optional): storing path. Defaults to 'data'. 

356 query_to_cache_query (function, optional): function that create cache manager query from query. 

357 Defaults to default_query_to_cache_query. 

358 """ 

359 self.storing = storing 

360 self.default_filename = default_filename 

361 self.save_API = save_API 

362 self.load_API = load_API 

363 self.query_to_cache_query = query_to_cache_query 

364 

365 def load(self, query, filename=None): 

366 """load obj at query + filname location using save_API 

367 

368 Args: 

369 query (dict): query to interpret by query_to_cache_query 

370 filename (str, optional): filename of object if not provided by query. Defaults to None. 

371 

372 Raises: 

373 FileNotFoundError: _description_ 

374 FileNotFoundError: _description_ 

375 

376 Returns: 

377 _type_: _description_ 

378 """ 

379 if type(query is dict): 

380 if "storing" not in query.keys(): 

381 query["storing"] = self.storing 

382 

383 if filename is None: 

384 filename = self.default_filename 

385 

386 if self.query_to_cache_query is None: 

387 new_query = self.default_query_to_cache_query(query, filename) 

388 else: 

389 new_query = self.query_to_cache_query(query, filename) 

390 try: 

391 obj = self.load_API(**new_query) 

392 

393 except (FileNotFoundError, NotADirectoryError): 

394 raise FileNotFoundError(new_query["storing"], new_query["keys"]) 

395 

396 if obj is None: 

397 raise FileNotFoundError(new_query["storing"], new_query["keys"]) 

398 

399 return obj 

400 

401 def save(self, query, obj, filename=None, verbose=False): 

402 """save obj at query + filename location using load_API 

403 

404 Args: 

405 query (dict): query to interpret by query_to_cache_query 

406 obj (obj): object to store by save_API 

407 filename (_type_, optional): filename of object if not provided by query. Defaults to None. 

408 """ 

409 # Save data link to a query on cache_link using it's unique id-query 

410 # print('save', query, filename) 

411 

412 if query is not {}: 

413 

414 # Use by default storing and filename 

415 if type(query is dict): 

416 if "storing" not in query.keys(): 

417 query["storing"] = self.storing 

418 

419 if filename is None: 

420 filename = self.default_filename 

421 

422 if self.query_to_cache_query is None: 

423 new_query = self.default_query_to_cache_query( 

424 query, filename, values=obj 

425 ) 

426 else: 

427 new_query = self.query_to_cache_query(query, filename, values=obj) 

428 

429 if verbose: 

430 print("save", new_query["keys"], type(new_query["values"])) 

431 # Replace name_of_file if specified 

432 

433 self.save_API(**new_query) 

434 else: 

435 print("skip save") 

436 

437 def default_query_to_cache_query(self, query, filename=None, values=None): 

438 """default_query_to_keys : function that create a Save/Load query for the cache_manager load/save api_function 

439 from query 

440 

441 Args: 

442 query (_type_): query to interpret 

443 filename (str, optional): default_storage_filename. Defaults to 'object.p'. 

444 values (obj or None, optional): obj to store by load_API/ if None then query_to_save_API 

445 

446 Returns: 

447 dict_new_q: dict_parameters to provide to save_API or load_API 

448 """ 

449 dict_new_q = {"storing": "", "keys": []} 

450 

451 if values is not None: 

452 dict_new_q["values"] = values 

453 

454 if "storing" in query.keys(): 

455 dict_new_q["storing"] = query["storing"] 

456 else: 

457 print("warning : Incompatible query for default_query_to_new_query") 

458 if "keys" in query.keys(): 

459 dict_new_q["keys"] = query["keys"] 

460 

461 if "name" in query.keys(): 

462 dict_new_q["keys"].append(query["name"]) 

463 

464 if "source" in query.keys(): 

465 dict_new_q["keys"].append(query["source"]) 

466 

467 if filename is not None: 

468 dict_new_q["keys"].append(filename) 

469 

470 if dict_new_q["keys"] == []: 

471 print("warning : Incompatible query for default_query_to_new_query") 

472 

473 return dict_new_q 

474 

475 

476# Processor ################################## 

477 

478 

479class Processor: 

480 def __init__(self, name="processor", cache=None, update_query=None, **kwargs): 

481 """Processor class : that aim to process data in a (fit/transform) scheme and hold a cache manager 

482 functionality to save/load object 

483 

484 Args: 

485 name (str, optional): Name of processor. Defaults to 'processor'. 

486 cache (Cache_manager or None, optional): Cache manager. Defaults to None : no save/load procedure 

487 update_query (function, optional): Function to update query due to Processor application if needed. 

488 Defaults to default_update_query : no update/ 

489 """ 

490 self.name = name 

491 self.cache = cache 

492 self._update_query = update_query 

493 for key_arg in kwargs.keys(): 

494 setattr(self, key_arg, kwargs[key_arg]) 

495 self.is_fitted = False 

496 

497 def default_update_query(self, query): 

498 type_ = type(query) 

499 if type_ == dict: 

500 pass 

501 if type_ == str: 

502 pass 

503 else: 

504 print( 

505 "Warning type of query not hold by update_query : define your own function of query update : " 

506 'replace by str "data_"' 

507 ) 

508 query = "data_" 

509 return query 

510 

511 def fit(self, data=None): 

512 """Fit Processor using data 

513 

514 Args: 

515 data (obj, optional): data. Defaults to None. 

516 """ 

517 # Fit processor using train_data 

518 self.is_fitted = True 

519 

520 def transform(self, data=None): 

521 """Apply Processor to data 

522 

523 Args: 

524 data (obj, optional): data. Defaults to None. 

525 """ 

526 return data 

527 

528 def fit_transform(self, data=None): 

529 """Fit Processor and apply it on data 

530 

531 Args: 

532 data (obj, optional): data. Defaults to None. 

533 """ 

534 self.fit(data) 

535 data = self.transform(data) 

536 return data 

537 

538 def save(self, query=None, object=None, name=None): 

539 """Save method to store object at queery+name location using cache_manager 

540 

541 Args: 

542 query (dict, optional): query_paramaters. Defaults to None. 

543 object (obj, optional): object to store. Defaults to None. 

544 name (_type_, optional): filename of obj to store. Defaults to None. 

545 """ 

546 if self.cache is not None: 

547 if object is not None: 

548 self.cache.save(query, object, name) 

549 else: 

550 self.cache.save(query, self, self.name) 

551 

552 def load(self, query=None, name=None): 

553 """Load method to load Processor at query+name location using cache_manager 

554 

555 Args: 

556 query (dict, optional): query_paramaters. Defaults to None. 

557 name (_type_, optional): filename of obj to load. Defaults to None. 

558 """ 

559 loaded_processor = None 

560 

561 if self.cache is None: 

562 raise FileNotFoundError 

563 

564 else: 

565 try: 

566 if name is None: 

567 name = self.name 

568 loaded_processor = self.cache.load(query, name) 

569 for property, value in vars(loaded_processor).items(): 

570 self.__setattr__(property, value) 

571 self.is_fitted = True 

572 except BaseException: 

573 raise FileNotFoundError 

574 

575 return loaded_processor 

576 

577 def update_query(self, query): 

578 """Apply the update_query_function provided at init to update query 

579 Args: 

580 query (dict): query 

581 

582 Returns: 

583 new_query: updated query 

584 """ 

585 if self._update_query is None: 

586 new_query = self.default_update_query(query) 

587 else: 

588 new_query = self._update_query(query) 

589 return new_query 

590 

591 def use_cache(self, query): 

592 """Use_cache manager to check if there is cache link to data already processed 

593 

594 Args: 

595 query (dict): query 

596 

597 Raises: 

598 FileNotFoundError: cache Not Found error caught by method that called use_case 

599 

600 Returns: 

601 data: data if 

602 """ 

603 query = self.update_query(query) 

604 if self.cache is None: 

605 raise FileNotFoundError 

606 else: 

607 try: 

608 data = self.cache.load(query) 

609 return data 

610 

611 except (FileNotFoundError, NotADirectoryError): 

612 raise FileNotFoundError 

613 

614 

615# Pipeline ################################## 

616 

617 

618class Pipeline(BaseEstimator): 

619 # Preprocessing Pipeline : Load, Formalisation and Cache management for each query (i.e each similar data source) 

620 def __init__( 

621 self, data_loader=None, list_processors=[], verbose=False, skip_cache=False 

622 ): 

623 """Pipeline object aim to apply a processing pipeline that include data_loader and sequence of 

624 Processing with cache management procedure for each Processor 

625 

626 Args: 

627 data_loader (Data_loader): Data_loader that use a load_API to load data 

628 list_processors (list of Processor): List of Processor to fit and apply 

629 verbose (bool, optional): use verbose mode or not. Defaults to False. 

630 """ 

631 self.skip_cache = False 

632 self.data_loader = data_loader 

633 self.list_processors = list_processors 

634 self.verbose = verbose 

635 

636 def fit(self, query, save_processor=False): 

637 """Apply a fiting procedure to the pipeline with data_loader and a list of processor to a query/querries_list 

638 if no data_loader provide data instead of "query" 

639 

640 Args: 

641 query_or_list (_type_): Query or List of query to provide to the data_loader 

642 

643 """ 

644 if self.verbose: 

645 print("Fiting procedure") 

646 

647 for q in to_list(query): 

648 if self.verbose: 

649 print("For ", q) 

650 

651 if self.data_loader is None: 

652 data = q 

653 

654 else: 

655 data = self.data_loader.load(q) 

656 

657 if self.verbose: 

658 print("Data loaded") 

659 

660 for n, processor in enumerate(self.list_processors): 

661 try: 

662 processor.load(q) 

663 if self.verbose: 

664 print("Skip fit " + processor.name) 

665 

666 except BaseException: 

667 if self.verbose: 

668 print("Fit " + processor.name) 

669 if hasattr(processor, "transform"): # Processor 

670 processor.fit(data, q, save_processor) 

671 else: # Estimator 

672 X, y = data[0], data[1] 

673 processor.fit(X, y) 

674 

675 # Update data using processor 

676 data = processor.transform(data, q) 

677 # Update query 

678 q = processor.update_query(q) 

679 

680 def transform(self, query_or_list): 

681 """Apply a pipeline with data_loader and a list of processor to a query o a list of query provide a generator 

682 If query not found in cache, Load data and tranform data using fitted processor 

683 If no data_loader provide data instead of "query_or_list" 

684 

685 Args: 

686 query_or_list (query): Query or List of query to provide to the data_loader 

687 

688 Returns: 

689 a Data gerenator that provide returned by the last Processor of the piepline 

690 

691 Yields: 

692 generator : generator that will apply pipeline to each querries provided. 

693 """ 

694 

695 if self.verbose: 

696 print("Transform procedure") 

697 

698 for q in to_list(query_or_list): 

699 # Generate the id_querry 

700 

701 if self.verbose: 

702 print("Work on ", str(q["source"])) 

703 # Look if there is data in some cache-step 

704 strat_marker = len(self.list_processors) 

705 

706 # For each formalizer (in reverse order) 

707 for n, processor in enumerate(self.list_processors[::-1]): 

708 new_q = q 

709 try: 

710 # Update query upon actual processor : strat_marker - n - 1 

711 # Doesn't consider himself, since it udpate_query is done in use_cache 

712 for proc in self.list_processors[: strat_marker - n - 1]: 

713 if hasattr(proc, "update_query"): 

714 new_q = proc.update_query(new_q) 

715 

716 if hasattr(processor, "use_cache"): 

717 data = processor.use_cache(new_q) 

718 print("Recover data from cache :", new_q) 

719 break 

720 

721 except FileNotFoundError: 

722 # Marker of pipeline start : 

723 strat_marker += -1 

724 

725 if strat_marker == 0: 

726 print( 

727 "Pipeline : No cache data found : load data and execute the whole pipeline" 

728 ) 

729 # If no cache found load data 

730 if self.data_loader is None: 

731 data = q 

732 

733 else: 

734 data = self.data_loader.load(q) 

735 

736 new_q = q 

737 # Apply chain of processor 

738 for n, processor in enumerate(self.list_processors): 

739 if n < strat_marker: 

740 # Pass step upsteam to the cache 

741 if self.verbose: 

742 print("Skip " + processor.name) 

743 else: 

744 if hasattr(processor, "transform"): 

745 if self.verbose: 

746 print("Tranform " + processor.name) 

747 data = processor.transform(data, new_q) 

748 new_q = processor.update_query(new_q) 

749 

750 elif hasattr(processor, "predict"): 

751 if len(data) == 2: 

752 X, y = data 

753 if len(data) > 2: 

754 X, y = data[0], data[1] 

755 data = processor.predict(X, y) 

756 

757 yield data 

758 

759 def fit_transform(self, query_or_list, save_processor=False): 

760 """Fit tranform procedure : apply sucessively fit then transform 

761 

762 Args: 

763 query_or_list (_type_): query or list of query to apply 

764 save_processor (bool, optional): _description_. Defaults to False. 

765 

766 Returns: 

767 _type_: _description_ 

768 """ 

769 self.fit(query_or_list, save_processor) 

770 data_processed = self.transform(query_or_list) 

771 return data_processed