Source code for derp.queue.base

"""Base interface for queue clients."""

from __future__ import annotations

import abc
import dataclasses
import enum
from datetime import timedelta
from typing import Any


[docs] class TaskState(enum.StrEnum): """State of a queued task.""" PENDING = "pending" STARTED = "started" SUCCESS = "success" FAILURE = "failure" REVOKED = "revoked" UNKNOWN = "unknown"
[docs] @dataclasses.dataclass(slots=True) class TaskStatus: """Status of a queued task.""" task_id: str state: TaskState result: Any | None = None error: str | None = None
[docs] class ScheduleType(enum.StrEnum): """Type of recurring schedule.""" CRON = "cron" INTERVAL = "interval"
[docs] @dataclasses.dataclass(slots=True, frozen=True) class Schedule: """A recurring task schedule.""" name: str task: str type: ScheduleType cron: str | None = None interval: timedelta | None = None payload: dict[str, Any] | None = None queue: str | None = None path: str | None = None
[docs] class QueueClient(abc.ABC): """Async producer-side queue client.""" supports_result: bool supports_revoke: bool supports_delay: bool
[docs] @abc.abstractmethod async def connect(self) -> None: """Connect to the queue backend."""
[docs] @abc.abstractmethod async def disconnect(self) -> None: """Disconnect from the queue backend."""
[docs] @abc.abstractmethod async def enqueue( self, task_name: str, payload: dict[str, Any] | None = None, *, task_id: str | None = None, queue: str | None = None, delay: int | timedelta | None = None, ) -> str: """Enqueue a task. Returns a task ID."""
[docs] @abc.abstractmethod async def get_status(self, task_id: str) -> TaskStatus: """Get the status of a task by ID."""