Source code for derp.queue.base
"""Base interface for queue clients."""
from __future__ import annotations
import abc
import dataclasses
import enum
from datetime import timedelta
from typing import Any
[docs]
class TaskState(enum.StrEnum):
"""State of a queued task."""
PENDING = "pending"
STARTED = "started"
SUCCESS = "success"
FAILURE = "failure"
REVOKED = "revoked"
UNKNOWN = "unknown"
[docs]
@dataclasses.dataclass(slots=True)
class TaskStatus:
"""Status of a queued task."""
task_id: str
state: TaskState
result: Any | None = None
error: str | None = None
[docs]
class ScheduleType(enum.StrEnum):
"""Type of recurring schedule."""
CRON = "cron"
INTERVAL = "interval"
[docs]
@dataclasses.dataclass(slots=True, frozen=True)
class Schedule:
"""A recurring task schedule."""
name: str
task: str
type: ScheduleType
cron: str | None = None
interval: timedelta | None = None
payload: dict[str, Any] | None = None
queue: str | None = None
path: str | None = None
[docs]
class QueueClient(abc.ABC):
"""Async producer-side queue client."""
supports_result: bool
supports_revoke: bool
supports_delay: bool
[docs]
@abc.abstractmethod
async def connect(self) -> None:
"""Connect to the queue backend."""
[docs]
@abc.abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the queue backend."""
[docs]
@abc.abstractmethod
async def enqueue(
self,
task_name: str,
payload: dict[str, Any] | None = None,
*,
task_id: str | None = None,
queue: str | None = None,
delay: int | timedelta | None = None,
) -> str:
"""Enqueue a task. Returns a task ID."""
[docs]
@abc.abstractmethod
async def get_status(self, task_id: str) -> TaskStatus:
"""Get the status of a task by ID."""