derp.queue – Task Queue

Queue client and typed models.

class derp.queue.CeleryConfig[source]

Bases: _StrictModel

Configuration for Celery task queue.

broker_url: str
result_backend: str | None
task_serializer: str
result_serializer: str
task_default_queue: str
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class derp.queue.CeleryQueueClient[source]

Bases: QueueClient

Queue client backed by Celery.

supports_result: bool = True
supports_revoke: bool = True
supports_delay: bool = True
__init__(config)[source]
Parameters:

config (CeleryConfig)

property app: Celery

Expose the underlying Celery app for worker-side task registration.

async connect()[source]

Connect to the queue backend.

Return type:

None

async disconnect()[source]

Disconnect from the queue backend.

Return type:

None

async enqueue(task_name, payload=None, *, task_id=None, queue=None, delay=None)[source]

Enqueue a task. Returns a task ID.

Parameters:
Return type:

str

async get_status(task_id)[source]

Get the status of a task by ID.

Parameters:

task_id (str)

Return type:

TaskStatus

register_schedules(schedules)[source]

Register recurring schedules with Celery Beat.

Parameters:

schedules (Sequence[Schedule])

Return type:

None

get_schedules()[source]

Return the currently registered schedules.

Return type:

list[Schedule]

class derp.queue.QueueClient[source]

Bases: ABC

Async producer-side queue client.

supports_result: bool
supports_revoke: bool
supports_delay: bool
abstractmethod async connect()[source]

Connect to the queue backend.

Return type:

None

abstractmethod async disconnect()[source]

Disconnect from the queue backend.

Return type:

None

abstractmethod async enqueue(task_name, payload=None, *, task_id=None, queue=None, delay=None)[source]

Enqueue a task. Returns a task ID.

Parameters:
Return type:

str

abstractmethod async get_status(task_id)[source]

Get the status of a task by ID.

Parameters:

task_id (str)

Return type:

TaskStatus

class derp.queue.QueueConfig[source]

Bases: _StrictModel

Queue configuration.

celery: CeleryConfig | None
vercel: VercelQueueConfig | None
schedules: Sequence[ScheduleConfig]
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception derp.queue.QueueError[source]

Bases: Exception

Base exception for all queue errors.

__init__(message, code=None)[source]
Parameters:
  • message (str)

  • code (str | None)

exception derp.queue.QueueNotConnectedError[source]

Bases: QueueError

Raised when queue client is used before connect().

__init__(message='Queue not connected. Call connect() first.')[source]
Parameters:

message (str)

exception derp.queue.QueueProviderError[source]

Bases: QueueError

Raised when the queue backend returns an error.

__init__(message='Queue provider request failed', code=None)[source]
Parameters:
  • message (str)

  • code (str | None)

class derp.queue.Schedule[source]

Bases: object

A recurring task schedule.

name: str
task: str
type: ScheduleType
cron: str | None
interval: timedelta | None
payload: dict[str, Any] | None
queue: str | None
path: str | None
__init__(name, task, type, cron=None, interval=None, payload=None, queue=None, path=None)
Parameters:
Return type:

None

class derp.queue.ScheduleConfig[source]

Bases: _StrictModel

A single recurring task schedule.

name: str
task: str
cron: str | None
interval_seconds: float | None
payload: dict[str, Any] | None
queue: str | None
path: str | None
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class derp.queue.ScheduleType[source]

Bases: StrEnum

Type of recurring schedule.

CRON = 'cron'
INTERVAL = 'interval'
__new__(value)
class derp.queue.TaskState[source]

Bases: StrEnum

State of a queued task.

PENDING = 'pending'
STARTED = 'started'
SUCCESS = 'success'
FAILURE = 'failure'
REVOKED = 'revoked'
UNKNOWN = 'unknown'
__new__(value)
class derp.queue.TaskStatus[source]

Bases: object

Status of a queued task.

task_id: str
state: TaskState
result: Any | None
error: str | None
__init__(task_id, state, result=None, error=None)
Parameters:
Return type:

None

class derp.queue.VercelQueueClient[source]

Bases: QueueClient

Queue client backed by Vercel Queues (REST API).

supports_result: bool = False
supports_revoke: bool = False
supports_delay: bool = True
__init__(config)[source]
Parameters:

config (VercelQueueConfig)

async connect()[source]

Connect to the queue backend.

Return type:

None

async disconnect()[source]

Disconnect from the queue backend.

Return type:

None

async enqueue(task_name, payload=None, *, task_id=None, queue=None, delay=None)[source]

Enqueue a task. Returns a task ID.

Parameters:
Return type:

str

async get_status(task_id)[source]

Vercel queues do not expose per-message status.

Parameters:

task_id (str)

Return type:

TaskStatus

register_schedules(schedules)[source]

Register recurring schedules. Vercel only supports cron expressions.

Parameters:

schedules (Sequence[Schedule])

Return type:

None

get_schedules()[source]

Return the currently registered schedules.

Return type:

list[Schedule]

generate_vercel_cron_config()[source]

Generate the crons section for vercel.json.

Return type:

list[dict[str, str]]

class derp.queue.VercelQueueConfig[source]

Bases: _StrictModel

Configuration for Vercel queue (REST-based).

api_token: str
team_id: str | None
project_id: str | None
default_queue: str
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].