monkey_wrench.task.ids module

The module providing Pydantic models for tasks related to product IDs.

class monkey_wrench.task.ids.IdsTaskBase(*, context: Literal[Context.product_ids], action: Action, specifications: type[Model])[source]

Bases: TaskBase

Pydantic base model for tasks related to product IDs.

context: Literal[Context.product_ids]
class monkey_wrench.task.ids.FetchIdsSpecifications(*, output_filepath: ~typing.Annotated[~pathlib.Path, ~pydantic.types.PathType(path_type=file), ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.input_output._types.<lambda>), ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.input_output._types.ensure_path_does_not_end_with_slash)] | ~typing.Annotated[~pathlib.Path, ~pydantic.types.PathType(path_type=new), ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.input_output._types.<lambda>), ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.input_output._types.ensure_path_does_not_end_with_slash)] | None = None, open_mode: ~typing.Literal['w', 'a'] = 'w', pre_writing_transformation: ~monkey_wrench.generic.models._pattern.StringTransformation = StringTransformation(trim=True, transform_function=None), on_write_catch_exceptions: tuple[type[Exception], ...] | None = None, end_datetime: ~typing.Annotated[~pydantic.types.AwareDatetime, ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.date_time.models._base.<lambda>)] | None = None, start_datetime: ~typing.Annotated[~pydantic.types.AwareDatetime, ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.date_time.models._base.<lambda>)] | None = None, batch_interval: ~datetime.timedelta | ~typing.Annotated[dict[~typing.Literal['weeks', 'days', 'hours', 'minutes', 'seconds'], float], FieldInfo(annotation=NoneType, required=True, metadata=[MinLen(min_length=1), MaxLen(max_length=5)]), ~pydantic.functional_validators.AfterValidator(func=~monkey_wrench.date_time.models._base.<lambda>)])[source]

Bases: DateTimeRangeInBatches, Writer

Pydantic base model for the specifications of a fetch task.

class monkey_wrench.task.ids.FetchIds(*, context: Literal[Context.product_ids], action: Literal[Action.fetch], specifications: FetchIdsSpecifications)[source]

Bases: IdsTaskBase

Pydantic base model for the fetch task.

action: Literal[Action.fetch]
specifications: FetchIdsSpecifications
perform() dict[str, Annotated[int, Ge(ge=0)]][source]

Fetch the product IDs.

monkey_wrench.task.ids.IdsTask

alias of FetchIds