monkey_wrench.query package

The package providing all utilities for querying items.

class monkey_wrench.query.CollectionMeta(*, query_string: str, snapshot_minutes: list[Annotated[int, Ge(ge=0), FieldInfo(annotation=NoneType, required=True, metadata=[Lt(lt=60)])]] | None = None)[source]

Bases: Model

Named tuple to gather the collection metadata.

query_string: str

A colon (:) delimited string which represents the query string for the collection on the EUMETSAT API.

Example

For SEVIRI we have: "EO:EUM:DAT:MSG:HRSEVIRI".

snapshot_minutes: list[Annotated[int, Ge(ge=0), FieldInfo(annotation=NoneType, required=True, metadata=[Lt(lt=60)])]] | None

The minutes for which we have data in an hour.

Warning

For collections that this does not apply, set the default value, i.e. None.

Example

For SEVIRI we have one snapshot per 15 minutes, starting from the 12th minute. As a result, we have [12, 27, 42, 57] for SEVIRI snapshots in an hour.

class monkey_wrench.query.EumetsatAPI[source]

Bases: object

Static class for EUMETSAT API functionalities.

api_base_url = HttpUrl('https://api.eumetsat.int/')

The root URL of the EUMETSAT API.

credentials_env_vars: ClassVar[dict[str, str]] = {'login': 'EUMETSAT_API_LOGIN', 'password': 'EUMETSAT_API_PASSWORD'}

The keys of environment variables used to authenticate the EUMETSAT API calls.

Example

On Linux, you can use the export command to set the credentials in a terminal,

export EUMETSAT_API_LOGIN=<login>;
export EUMETSAT_API_PASSWORD=<password>;
download_path_template = '{base}/data/download/1.0.0/collections/{collection}/products'

The template URL for the downloading collections.

classmethod get_token() AccessToken[source]

Get a token using the credentials_env_vars.

This method returns the same token if it is still valid and issues a new one otherwise.

Returns:

A token using which the datastore can be accessed.

Note

See API key management on the eumdac website for more information.

static make_collection_url(collection: EumetsatCollection) HttpUrl[source]

Make a complete collection URL from the API base URL and the given collection (query string).

Parameters:

collection – A collection of type EumetsatCollection, e.g. for the SEVIRI we have EumetsatCollection.seviri.

Returns:

The full collection URL using which the files can be fetched.

Example

>>> EumetsatAPI.make_collection_url(EumetsatCollection.seviri)
HttpUrl('https://api.eumetsat.int/data/download/1.0.0/collections/EO%3AEUM%3ADAT%3AMSG%3AHRSEVIRI/products')
static seviri_collection_url() HttpUrl[source]

Return the complete URL for the SEVIRI collection.

class monkey_wrench.query.EumetsatCollection(*values)[source]

Bases: Enum

Enum class that defines the collections for the EUMETSAT datastore.

amsu = CollectionMeta(query_string='EO:EUM:DAT:METOP:AMSUL1', snapshot_minutes=None)
avhrr = CollectionMeta(query_string='EO:EUM:DAT:METOP:AVHRRL1', snapshot_minutes=None)
mhs = CollectionMeta(query_string='EO:EUM:DAT:METOP:MHSL1', snapshot_minutes=None)
seviri = CollectionMeta(query_string='EO:EUM:DAT:MSG:HRSEVIRI', snapshot_minutes=[12, 27, 42, 57])
class monkey_wrench.query.EumetsatQuery(collection: EumetsatCollection = EumetsatCollection.seviri, log_context: str = 'EUMETSAT Query')[source]

Bases: Query

__init__(collection: EumetsatCollection = EumetsatCollection.seviri, log_context: str = 'EUMETSAT Query') None[source]

Initialize an instance of the class with API credentials read from the environment variables.

This constructor method sets up a private eumdac datastore by obtaining an authentication token using the provided API login and password which are read from the environment variables.

Parameters:
  • collection – The collection, defaults to seviri for SEVIRI.

  • log_context – A string that will be used in log messages to determine the context. Defaults to an empty string.

fetch_product(product: Product, chain: Chain, output_directory: Path, sleep_time: Annotated[int, Gt(gt=0)]) Path | None[source]

Fetch the file for a single product and write the product file to disk.

Parameters:
  • product – The Product whose corresponding file will be fetched.

  • chain – Chain to apply for customization of the output file.

  • output_directory – The directory to save the file in.

  • sleep_time – Sleep time, in seconds, between requests.

Returns:

The path of the saved file on the disk, Otherwise None in case of a failure.

fetch_products(search_results: SearchResults, output_directory: Path, bounding_box: BoundingBox | None = None, output_file_format: str = 'netcdf4', sleep_time: Annotated[int, Gt(gt=0)] = 10) list[Path | None][source]

Fetch all products from search results and write product files to disk.

Parameters:
  • search_results – Search results for which the files will be fetched.

  • output_directory – The directory to save the files in.

  • bounding_box – Bounding box, i.e. (north, south, west, east) limits. Defaults to None which means BoundingBox(90., -90, -180., 180) will be used.

  • output_file_format – Desired format of the output file(s). Defaults to netcdf4.

  • sleep_time – Sleep time, in seconds, between requests. Defaults to 10 seconds.

Returns:

A list paths for the fetched files.

static len(product_ids: SearchResults) int[source]

Return the number of product IDs.

query(datetime_period: DateTimePeriodStrict, polygon: Polygon | None = None) SearchResults[source]

Query product IDs in a single batch.

This method wraps around the eumdac.Collection().search() method to perform a search for product IDs within a specified time range and the polygon.

Note

For a given SEVIRI collection, an example product ID is "MSG3-SEVI-MSG15-0100-NA-20150731221240.036000000Z-NA".

Note

start_time and end_time are treated respectively as inclusive and exclusive when querying the IDs. For example, to obtain all the data up to and including 2022/12/31, we must set end_time=datetime(2023, 1, 1).

Parameters:
  • datetime_period – The datetime period to query for.

  • polygon – An object of type Polygon.

Returns:

The results of the search, containing the product IDs found within the specified period and the polygon.

Raises:

ValueError – Refer to assert_start_time_is_before_end_time().

query_in_batches(datetime_range_in_batches: DateTimeRangeInBatches) Generator[tuple[SearchResults, int], None, None][source]

Retrieve all the product IDs, given a time range and a batch interval, fetching one batch at a time.

Parameters:

datetime_range_in_batches – The datetime range to query for.

Note

As an example, for SEVIRI, we expect to have one file (product ID) per 15 minutes, i.e. 4 files per hour or 96 files per day. If our re-analysis period is 2022/01/01 (inclusive) to 2023/01/01 (exclusive), i.e. 365 days. This results in a maximum of 35040 files.

If we split our datetime range into intervals of 30 days and fetch product IDs in batches, there is a maximum of 2880 = 96 x 30 IDs in each batch retrieved by a single request. One might need to adapt this value to avoid running into the issue of sending too many requests to the server.

Yields:

A generator of 2-tuples. The first element of each tuple is the collection of products retrieved in that batch. The second element is the number of the retrieved products for that batch. The search results can be in turn iterated over to retrieve individual products.

Example

>>> from datetime import datetime, timedelta, UTC
>>>
>>> range_in_batches = DateTimeRangeInBatches(
...  start_datetime=datetime(2022, 1, 1, tzinfo=UTC),
...  end_datetime=datetime(2022, 1, 3, tzinfo=UTC),
...  batch_interval=timedelta(days=1)
... )
>>>
>>> try:
...  api = EumetsatQuery()
...  for batch, retrieved_count in api.query_in_batches(range_in_batches):
...     assert retrieved_count == batch.total_results
...     for product in batch:
...         pass
... except KeyError as e:  # If the API credentials are not set!
...  assert "environment variable" in str(e)
class monkey_wrench.query.List(items: list, datetime_parser: SeviriIDParser | FilePathParser, log_context: str = 'List')[source]

Bases: Query

A class to provide generic functionalities to query lists.

Note

This class is meant to behave as an immutable list.

Note

This class utilizes numpy.ndarray objects under the hood.

__get_indices(datetime_period: DateTimePeriod) array

Similar to query_indices(), but returns the numpy indices instead.

__init__(items: list, datetime_parser: SeviriIDParser | FilePathParser, log_context: str = 'List') None[source]

Make an instance of the class.

Parameters:
  • items – The complete list of items to query.

  • datetime_parser – A class of type DateTimeParser to enable parsing items into datetime objects.

  • log_context – A string that will be used in log messages to determine the context. Defaults to an empty string.

generate_k_sized_batches_by_index(k: Annotated[int, Gt(gt=0)], index_start: int = 0, index_end: int = -1, batches_as_python_lists: bool = True) Generator[source]

Generate batches (sub-lists) of size k and move forward by 1 index each time.

A batch consists of the item at the current index, as well as k-1 preceding items. In other words, a batch includes k adjacent items, with the item at the current index being the last item of the batch. Next batch is retrieved by incrementing the current index by +1. As a result, two consecutive batches have k-2 common objects.

Note

Both index_start and index_end are considered as inclusive. They can be negative as well.

Note

The indices are zero-based. If index_start is less than or equal to k-1, the first batch includes items from index 0 to index k-1 (inclusive). The next batch includes indices [1, k].

Parameters:
  • k – The size of the batches. Each batch includes the current item as well as k-1 preceding items.

  • index_start – The zero-based index of the first item to start generating the batches from. Defaults to 0 and can be negative as well.

  • index_end – The zero-based index of the last item (inclusive) up to which the batches are generated. Defaults to -1 meaning the last item of the list makes the last item of the final batch.

  • batches_as_python_lists – A boolean determining whether to return each batch as a Python built-in list or as List objects. Defaults to True.

Yields:

A generator that yields batches of size k. Adjacent batches overlap by k-2 items.

Raises:
  • ValueError – If index_start is greater than index_end.

  • ValueError – If k exceeds the size of the list.

  • IndexError – If normalized indices exceed the size of the List object. Refer to normalize_index().

static len(item) int[source]

Return the number of items in the List object.

normalize_index(index: int) int[source]

Convert a negative index into its positive equivalent, or return the original index if it is non-negative.

Raises:

IndexError – If the positive index or its positive equivalent exceeds the size of the List object.

property parsed_items: ndarray
partition_in_k_sized_batches_by_index(k: Annotated[int, Gt(gt=0)], index_start: int = 0, index_end: int = -1, batches_as_python_lists: bool = True) Generator[source]

Partition the list, where the batches are of size k or less.

Note

The partition is given for all items that are in [index_start, index_end] (both inclusive).

Note

This is similar to generate_k_sized_batches_by_index(), but there are differences. First, this method generates partitions, i.e. sub-lists do not have any common items. Second, there could be one sub-list whose size is less than k. This happens when the length of available items to partition is less than k.

query(datetime_period: DateTimePeriod) Self[source]

Query items from the List object, given a start datetime and an end datetime.

Parameters:

datetime_period – The datetime period to query the items from.

Returns:

A new List object including items that match the given query.

Raises:

ValueError – Refer to assert_start_time_is_before_end_time().

query_indices(datetime_period: DateTimePeriod) list[int][source]

Similar to query(), but returns the indices of items as a Python built-in list.

to_python_list() list[source]

Convert the List object into a Python built-in list object.

class monkey_wrench.query.LogMixin(log_context: str = '')[source]

Bases: object

__init__(log_context: str = '') None[source]

Make an instance of the class.

Parameters:

log_context – A string that will be used in log messages to determine the context. Defaults to an empty string.

property log_context: str

Get the log context as a string.

log_message(start_datetime: datetime, end_datetime: datetime, other: str = '') None[source]

Log a query message including the start and end datetime values as well as other information (if any).

class monkey_wrench.query.Query(*args, **kwargs)[source]

Bases: ABC, LogMixin

Abstract base class for queries.

__init__(*args, **kwargs) None[source]

Make an instance of the class.

abstractmethod static len(items: Any) Annotated[int, Ge(ge=0)][source]

Get the size (number) of items, e.g. the Python built-in len() function in case of a list.

abstractmethod query(datetime_period: DateTimePeriod) Any[source]

Query the specified time period.

query_in_batches(datetime_range_in_batches: DateTimeRangeInBatches, expected_total_count: Annotated[int, Ge(ge=0)] | None = None) Generator[tuple[T, int], None, None][source]

Divide the specified time range into smaller intervals (batches) and perform queries on them.

The arguments are the same as DatetimeRangeInBatches. If expected_total_count is given, it will be compared with total_retrieved_count and if they are not equal, a warning will be logged. It defaults to None which means no comparison is made.

Yields:

The result of queries, in the form of 2-tuples in which the first element is the retrieved items from the query() function in each batch and the second element is the size of the items in the batch.

Submodules