Source code for neural_de.transformations.transformation_pipeline

"""
Transformation pipeline for automation of multiple transformations methods
"""
import logging
from pathlib import Path
from typing import Union

import numpy as np
import yaml

from neural_de.transformations.transformation import BaseTransformation
# from neural_de import transformations
from neural_de.utils.twe_logger import log_and_raise
import importlib


[docs] def camel_to_snake(s): """ This function convert input strings s from Camelcase format to snake case format""" return ''.join(['_' + c.lower() if c.isupper() else c for c in s]).lstrip('_')
[docs] class TransformationPipeline(BaseTransformation): """ Provides a pipeline object, to facilitate the automation of multiple transformations methods, and/or offer loading from a yaml file. You can check the example notebook **examples/Pipeline_example.ipynb** for details on the syntax and usage. An example of valid config file can be found in **examples/config/conf_user.yaml** Args: config: either a path toward a yaml configuration file, or a list of dict. logger: It is recommended to use the confiance.ai logger, obtainable with neural_de.utils.get_logger(...). If None, one logging with stdout will be provided. """ def __init__(self, config: Union[str, list, Path], logger: logging.Logger = None): super().__init__(logger) if isinstance(config, list): self._pipeline_conf = config else: # Avoid lazy init for the configuration in order to check consistency asap. self._pipeline_conf = self._read_config(config) self._pipeline = None self._logger.info("Config file loaded") def _read_config(self, config_path: str) -> dict: """ Read user configuration of pipeline that contains different transformations parameters Args: config_path: path to a yaml configuration file. See example/Pipeline_example for syntax specification Returns: The loaded configuration """ try: with open(config_path, 'r', encoding='utf-8') as yaml_file: config = yaml.safe_load(yaml_file) except FileNotFoundError: log_and_raise(self._logger, FileNotFoundError, f"Config file '{config_path}' not found.") except yaml.YAMLError as e: log_and_raise(self._logger, yaml.YAMLError, f"Error parsing YAML file: {e}") self._logger.info("Config loaded from %s loaded", config_path) return config def _init_pipeline(self) -> None: """ Initialize every transformation method in the pipeline (once on first transform call). """ self._logger.info("Loading all the pipeline methods and models") self._pipeline = [] try: # for every transformation, initialize it and store the resulting instance # initialization can be costly, so it is done only once, during first transform call. for transformation_conf in self._pipeline_conf: transformation_name: str = transformation_conf['name'] parameters = transformation_conf.get('init_param', {}) # Get module name corresponding to tranformation module_transformation = importlib.import_module("neural_de.transformations." + camel_to_snake(transformation_name)) transformation = getattr(module_transformation, transformation_name) self._pipeline.append(transformation(**parameters)) except KeyError: log_and_raise(self._logger, KeyError, "Invalid structure for method " + transformation_conf["name"]) except AttributeError: log_and_raise(self._logger, AttributeError, "Transformation " + transformation_name + "not found in neural.transformations") except TypeError: log_and_raise(self._logger, TypeError, "Invalid call during initialization of " + transformation_conf["name"]) self._logger.info("All pipeline models successfully loaded")
[docs] def transform(self, images: Union[list, np.ndarray]) -> Union[list, np.ndarray]: """ Sequentially apply every method of the pipeline on a batch of image, and returns the resulting images. Args: images: Batch of images. Each image should be of a ``np.ndarray`` of target_shape *(h,w, channels)* Returns: Resulting batch of images, one per image provided. """ # verify if image is a valid batch self._check_batch_validity(images) # Lazy method init, as it can be costly both in term of computing power and ram. if self._pipeline is None: self._init_pipeline() try: # For each method in the pipeline for i, transformation in enumerate(self._pipeline): self._logger.info("Applying method %s to images", transformation) # Retrieves optional transform() parameters if any transformation_parameters = self._pipeline_conf[i]\ .get('transform', {}) # apply the transformation to the images images = transformation.transform(images=images, **transformation_parameters) except TypeError as e: raise log_and_raise(self._logger, TypeError, f"Invalid call during function transform of " f"'{transformation}': {e}") return images