Source code for monkey_wrench.input_output._models

import os
from datetime import datetime
from pathlib import Path
from typing import Any, Generator, Literal, TypeVar

from loguru import logger
from pydantic import FilePath, NonNegativeFloat, NonNegativeInt, validate_call

from monkey_wrench.generic import ListSetTuple, Model, Pattern, StringTransformation, TransformFunction
from monkey_wrench.input_output._types import (
    AbsolutePath,
    ExistingDirectoryPath,
    ExistingFilePath,
    NewFilePath,
    OpenMode,
)
from monkey_wrench.process import MultiProcess
from monkey_wrench.query import Batches

InputType = TypeVar("InputType")
ReturnType = TypeVar("ReturnType")
OtherReturnType = TypeVar("OtherReturnType")


[docs] class ExistingInputFile(Model): """Pydantic model for an input file which must exist. Example: A text file which includes the list of product IDs which have been already fetched. This file will be used to fetch the product files. """ input_filepath: ExistingFilePath
[docs] class InputFile(Model): """Pydantic model for an input file which does not necessarily exist during the model validation.""" input_filepath: ExistingFilePath | NewFilePath | None = None
[docs] class NewOutputFile(Model): """Pydantic mode for an output file which must not already exist. Example: A text file to store the result of visiting a directory, i.e. collected files that match the determined pattern. """ output_filepath: NewFilePath
[docs] class OutputFile(Model): """Pydantic model for an output file which does not necessarily exist during the model validation.""" output_filepath: ExistingFilePath | NewFilePath | None = None
[docs] class ModelFile(Model): """Pydantic model for a model file which must exist. Example: A ``*.pt`` file used by CHIMP, as the model, to perform a retrieval. """ model_filepath: ExistingFilePath
[docs] class ParentInputDirectory(Model): """Pydantic model for the top-level directory where the child (input) directories reside. The directory must exist. Example: A directory which includes all SEVIRI files that have to be reprocessed using CHIMP. """ parent_input_directory_path: ExistingDirectoryPath
[docs] class ParentOutputDirectory(Model): """Pydantic model for the top-level directory where the child (output) directories reside. The directory must exist. Example: A directory which the output of CHIMP will be saved. """ parent_output_directory_path: ExistingDirectoryPath
[docs] class ExistingInputDirectory(Model): """Pydantic model for an input directory which must exist. Note: This model is to be solely used for a flat structure. If you have a hierarchical tree structure, use :obj:`~monkey_wrench.input_output._models.ParentInputDirectory` instead to be more clear about the directory structure. """ input_directory: ExistingDirectoryPath
[docs] class ExistingOutputDirectory(Model): """Pydantic model for an output directory which must exist. Note: This model is to be solely used for a flat structure. If you have a hierarchical tree structure, use :obj:`~monkey_wrench.input_output._models.ParentOutputDirectory` instead to be more clear about the directory structure. """ output_directory: ExistingDirectoryPath
[docs] class FsSpecCache(Model): """Pydantic model for the caching scheme of fsspec. Note: See `fsspec cache`_, to learn more about buffering and random access in `fsspec`. .. _fsspec cache: https://filesystem-spec.readthedocs.io/en/latest/features.html#file-buffering-and-random-access """ fsspec_cache: Literal["filecache", "blockcache"] | None = None """How to buffer, e.g. ``"filecache"``, ``"blockcache"``, or ``None``. Defaults to ``None``. Warning: ``None`` might cause too many requests to be sent to the server! """ @property def fsspec_cache_str(self): """Return the cache string with a leading ``::`` if it is not ``None``. Otherwise, return an empty string.""" return f"::{self.fsspec_cache}" if self.fsspec_cache else ""
[docs] class DatasetSaveOptions(Model): """Pydantic model for the storage options using which the dataset is to be saved. This is dataset-dependent.""" dataset_save_options: dict[str, bool | str | int] = dict( writer="cf", include_lonlats=False ) """A dictionary which includes the actual storage options. The default behaviour is to use ``cf`` as the writer and exclude longitude and latitude values, i.e. ``dataset_save_options = dict(writer="cf", include_lonlats=False)`` """
[docs] class Writer(OutputFile): """Pydantic model for an ASCII file (text mode) writer.""" open_mode: OpenMode = "w" """The mode using which the text file will be opened. Defaults to ``"w"``.""" pre_writing_transformation: StringTransformation = StringTransformation() """The transformation before writing items to the file. Defaults to `StringTransformation`_, which means the items will be only trimmed. Note: The items will first be transformed according to the ``pre_writing_transformation.transform_function``, and then trimmed. .. _StringTransformation: monkey_wrench.generic.StringTransformation """ on_write_catch_exceptions: tuple[type[Exception], ...] | None = None """Exceptions which will be caught and logged as warnings. Defaults to ``None``, which means all exceptions will be caught. If it is a tuple, only the given exceptions will be caught. As a result, in the case of an empty tuple, no exceptions will be caught. """ def __get_open_mode(self, open_mode: OpenMode | None = None): """Return the ``open_mode`` if it is not ``None``. Otherwise return the value of ``self.open_mode``.""" if open_mode is None: return self.open_mode return open_mode
[docs] def prepare_output_file_for_writing(self, open_mode: OpenMode | None = None) -> None: """Prepare the output file depending on whether the file exists and the value of ``open_mode``. Note: - If the file exists and ``open_mode`` is ``"a"``, the file will be left as it is. - If the file exists and ``open_mode`` is ``"w"``, the file will be overwritten. - If the file does not exist, the file will be created, regardless of ``open_mode``. Args: open_mode: Defaults to ``None``, which means the value from ``self.open_mode`` will be used. """ with open(self.output_filepath, self.__get_open_mode(open_mode)): pass
[docs] def write( self, items: ListSetTuple | Generator[Any, None, None], open_mode: OpenMode | None = None, ) -> NonNegativeInt: """Write items from a list/set/tuple or a generator to a text file, with one item per line. Examples of items are product IDs. This function opens a text file in the `(over)write` or `append` mode. It then writes each item from the provided iterable to the file. It catches specified exceptions during the writing process, and logs them as warnings. Args: items: An iterable of items to be written to the file. open_mode: Defaults to ``None``, which means the value from ``self.open_mode`` will be used. Returns: The number of items that are written to the file successfully. """ number_of_items_written = 0 with open(self.output_filepath, self.__get_open_mode(open_mode)) as f: for item in items: try: item = self.pre_writing_transformation.transform_items(str(item)) item = self.pre_writing_transformation.trim_items(item) f.write(item + "\n") number_of_items_written += 1 except Exception as exception: if self.on_write_catch_exceptions is None: continue if isinstance(exception, self.on_write_catch_exceptions): logger.warning(f"Failed attempt to write {item} to file {self.path}: {exception}") raise exception return number_of_items_written
[docs] def write_in_batches(self, batches: Batches, open_mode: OpenMode | None = None) -> NonNegativeInt: """Similar to `Writer.write()`_, but assumes that the input is in batches. .. _Writer.write(): monkey_wrench.input_output_models.Writer.write """ self.prepare_output_file_for_writing(self.__get_open_mode(open_mode)) number_of_items_written = 0 for batch, _ in batches: number_of_items_written += self.write(batch, open_mode="a") return number_of_items_written
[docs] class Reader(ExistingInputFile): """Pydantic model for an ASCII file (text mode) reader.""" post_reading_transformation: StringTransformation = StringTransformation() """The transformation after reading items from the file and before returning them. Defaults to :obj:`~monkey_wrench.generic.StringTransformation()`, which means the items will be only trimmed. Note: The items will be first trimmed and then transformed according to ``post_reading_transformation.transform_function``. """
[docs] def read(self) -> list[Any]: """Read items from a text file, assuming each line corresponds to a single item. Examples of items are product IDs. Warning: This function does not check whether the items are valid or not. It is a simple convenience function for reading items from a text file. Returns: A list of (transformed) items, where each item corresponds to a single line in the given file. """ with open(self.input_filepath, "r") as f: items = f.readlines() items = self.post_reading_transformation.trim_items(items) items = self.post_reading_transformation.transform_items(items) return items
[docs] class DirectoryVisitor(ParentInputDirectory, Pattern): """Pydantic model for visiting files in a directory tree.""" visitor_writer: Writer | None = None """If given, it will be used to write the list of visited files to a text file.""" visitor_callback: TransformFunction[Any] | None = None """A function that will be called every time a match is found for a file. Defaults to ``None``.""" reverse: bool = False """A boolean to determine whether to sort the files in reverse order. Defaults to ``False``, which means sorting is in the alphabetical order. """ recursive: bool = True """Determines whether to recursively visit the directory tree. or just visit the top-level directory. Defaults to ``True``. """ post_visit_transform_function: TransformFunction[ReturnType] | None = None """The transform function that will be applied on filepaths after visiting them. Defaults to ``None``, which means no transformation is applied. Note: If it is provided, the result of transformation will be returned instead of filepaths. """ def __collect_files(self) -> list[Path]: files_list = [] if self.recursive: for root, _, files in os.walk(self.parent_input_directory_path): for file in files: if self.pattern.exists_in(file): files_list.append(Path(root, file)) else: for item in os.listdir(self.parent_input_directory_path): if (file := Path(self.parent_input_directory_path, item)).is_file(): if self.pattern.exists_in(item): files_list.append(file) return sorted(files_list, reverse=self.reverse)
[docs] def visit(self) -> list[ReturnType] | list[Path]: """Visit all files in the directory, either recursively or just the top-level files. Returns: A sorted flat list of all file paths in the given directory that match the given pattern and have been treated according to the ``visitor_callback`` function. If the ``post_visit_transform_function`` is provided , a list of transformed filepaths will be returned instead. """ files_list = self.__collect_files() if self.visitor_callback is not None: for f in files_list: self.visitor_callback(f) if self.visitor_writer is not None: self.visitor_writer.write(files_list) if self.post_visit_transform_function is not None: return [self.post_visit_transform_function(f) for f in files_list] return files_list
[docs] class FilesIntegrityValidator(MultiProcess): """Pydantic model to verify files integrity by checking their size and comparing their list against a reference. Note: This class does two main verifications, namely checking for corrupted and missing files as follows 1- Checking that the file sizes are within some threshold from a nominal file size. 2- Checking the list of filepaths against a reference list. """ nominal_file_size: NonNegativeInt | None = None """The nominal size of files in bytes. This is used to check for corrupted files. Defaults to ``None``, which means the search for corrupted files will not be performed. """ file_size_relative_tolerance: NonNegativeFloat = 0.01 """The maximum relative difference in the size of a file, before it can be marked as corrupted. Defaults to ``0.01``, i.e. any file whose size differs by more than one percent from the nominal size, will be marked as corrupted. """ filepath_transform_function: TransformFunction[ReturnType] | None = None """A function to transform the file paths into other types of objects before comparing them against the reference. This can be e.g. a :func:`~monkey_wrench.date_time.DateTimeParser.parse` function to make datetime objects out of file paths. Defaults to ``None`` which means no transformation is performed on the file paths and they will be used as they are. """ reference_transform_function: TransformFunction[ReturnType] | None = None """A function to transform the reference items into other types of objects before using them for comparison. This can be e.g. :func:`~monkey_wrench.date_time.SeviriIDParser.parse()` to make datetime objects out of SEVIRI product IDs. Defaults to ``None`` which means no transformation is performed on the reference items and they will be used as they are. """ reference: ListSetTuple[InputType] | AbsolutePath[FilePath] | DirectoryVisitor | None = None """Reference items to compare against, used in finding the missing files. It can be a list/set/tuple of items, or a filepath from which the reference items can be read, or a directory visitor which can collect the reference files. Defaults to ``None`` which means the search for missing files will not be performed. """
[docs] @staticmethod def get_reference_items( reference: ListSetTuple[InputType] | AbsolutePath[FilePath] | DirectoryVisitor | None = None ) -> ListSetTuple[InputType] | None: """Return the reference items.""" match reference: case None: return None case Path(): return Reader(input_filepath=reference).read() case DirectoryVisitor(): return reference.visit() case _: return reference
def __get_reference_items( self, reference: ListSetTuple[InputType] | AbsolutePath[FilePath] | DirectoryVisitor | None = None ) -> Any: if reference is None: reference = self.reference reference = FilesIntegrityValidator.get_reference_items(reference) if reference is None: return None if self.reference_transform_function is None: return set(reference) return {self.reference_transform_function(r) for r in reference}
[docs] def file_is_corrupted(self, file_size: NonNegativeInt) -> bool: return abs(1 - file_size / self.nominal_file_size) > self.file_size_relative_tolerance
[docs] @validate_call def find_corrupted_files(self, filepaths: ListSetTuple[Path]) -> set[Path] | None: if self.nominal_file_size is None: return None file_sizes = self.run_with_results(os.path.getsize, filepaths) return {fp for fp, fs in zip(filepaths, file_sizes, strict=True) if self.file_is_corrupted(fs)}
[docs] @validate_call def transform_files(self, filepaths: ListSetTuple[Path]) -> set[Path] | None: if self.filepath_transform_function is None: return set(filepaths) return {self.filepath_transform_function(f) for f in filepaths}
[docs] @validate_call def find_missing_files( self, filepaths: ListSetTuple[Path], reference: ListSetTuple[InputType] | AbsolutePath[FilePath] | DirectoryVisitor | None = None ) -> set[Path] | None: reference = self.__get_reference_items(reference) if reference is None: return None return reference - self.transform_files(filepaths)
[docs] @validate_call def verify_files( self, filepaths: ListSetTuple[Path], reference: ListSetTuple[InputType] | AbsolutePath[FilePath] | DirectoryVisitor | None = None ) -> tuple[set[InputType] | set[ReturnType] | None, set[Path] | None]: """Check for missing and corrupted files.""" return self.find_missing_files(filepaths, reference), self.find_corrupted_files(filepaths)
[docs] class DateTimeDirectory(ParentOutputDirectory): """Pydantic model for datetime directories needed to store products and the input/output of CHIMP.""" datetime_format_string: str = "%Y/%m/%d" """The format string to create subdirectories from the datetime object. Defaults to ``"%Y/%m/%d"``.""" reset_child_datetime_directory: bool = False """Whether to remove the (child) directory first if it already exists. Defaults to ``False``. This might save us from issues regarding files being overwritten and corrupted. """
[docs] def get_datetime_directory(self, datetime_object: datetime) -> Path: """Get the full path to the datetime directory (given the datetime object). This does not create the directory. Args: datetime_object: The datetime object for which the full directory path will be returned. Returns: The full path of the datetime directory. Example: >>> path = DateTimeDirectory( ... datetime_format_string="%Y/%m/%d", ... parent_output_directory_path=Path.home() ... ).get_datetime_directory( ... datetime(2022, 3, 12) ... ) >>> expected_path = Path.home() / Path("2022/03/12") >>> expected_path == path True """ dir_path = self.parent_output_directory_path / Path(datetime_object.strftime(self.datetime_format_string)) return dir_path
[docs] def create_datetime_directory(self, datetime_object: datetime) -> Path: """Create a directory based on the datetime object. Args: datetime_object: The datetime object to create the directory for. Returns: The full path of the (created) directory. Example: >>> path = DateTimeDirectory( ... datetime_format_string="%Y/%m/%d", ... parent_output_directory_path=Path.home() ... ).create_datetime_directory( ... datetime(2022, 3, 12) ... ) >>> expected_path = Path.home() / Path("2022/03/12") >>> expected_path.exists() True >>> expected_path == path True """ dir_path = self.get_datetime_directory(datetime_object) if dir_path.exists() and self.reset_child_datetime_directory: dir_path.unlink() dir_path.mkdir(parents=True, exist_ok=True) return dir_path