Source code for derp.queue.vercel
"""Vercel queue client (REST-based)."""
from __future__ import annotations
import uuid
from collections.abc import Sequence
from datetime import timedelta
from typing import Any
from urllib.parse import quote
import httpx
from derp.config import VercelQueueConfig
from derp.queue.base import QueueClient, Schedule, ScheduleType, TaskStatus
from derp.queue.exceptions import QueueNotConnectedError, QueueProviderError
VERCEL_API_BASE = "https://api.vercel.com"
[docs]
class VercelQueueClient(QueueClient):
"""Queue client backed by Vercel Queues (REST API)."""
supports_result = False
supports_revoke = False
supports_delay = True
[docs]
def __init__(self, config: VercelQueueConfig):
self._config = config
self._client: httpx.AsyncClient | None = None
self._schedules: list[Schedule] = []
[docs]
async def connect(self) -> None:
if self._client is not None:
return
self._client = httpx.AsyncClient(
base_url=VERCEL_API_BASE,
headers={
"Authorization": f"Bearer {self._config.api_token}",
"Content-Type": "application/json",
},
)
[docs]
async def disconnect(self) -> None:
if self._client is not None:
await self._client.aclose()
self._client = None
def _build_params(self) -> dict[str, str]:
params: dict[str, str] = {}
if self._config.team_id is not None:
params["teamId"] = self._config.team_id
if self._config.project_id is not None:
params["projectId"] = self._config.project_id
return params
[docs]
async def enqueue(
self,
task_name: str,
payload: dict[str, Any] | None = None,
*,
task_id: str | None = None,
queue: str | None = None,
delay: int | timedelta | None = None,
) -> str:
if self._client is None:
raise QueueNotConnectedError()
queue_name = queue or self._config.default_queue
if task_id is None:
task_id = uuid.uuid4().hex
body: dict[str, Any] = {
"task_name": task_name,
"task_id": task_id,
"payload": payload or {},
}
if delay is not None:
if isinstance(delay, timedelta):
body["delay_seconds"] = int(delay.total_seconds())
else:
body["delay_seconds"] = delay
try:
resp = await self._client.post(
f"/v1/queues/{quote(queue_name, safe='')}/messages",
json=body,
params=self._build_params(),
)
except Exception as exc:
raise QueueProviderError(str(exc) or "Failed to enqueue task") from exc
if resp.status_code != 200:
raise QueueProviderError(
f"Error connecting to Vercel API: {resp.status_code} {resp.text}",
)
return task_id
[docs]
async def get_status(self, task_id: str) -> TaskStatus:
"""Vercel queues do not expose per-message status."""
raise NotImplementedError("Vercel queues do not expose per-message status.")
[docs]
def register_schedules(self, schedules: Sequence[Schedule]) -> None:
"""Register recurring schedules. Vercel only supports cron expressions."""
for s in schedules:
if s.type == ScheduleType.INTERVAL:
raise QueueProviderError(
f"Schedule '{s.name}': Vercel cron only supports "
"cron expressions, not intervals.",
)
self._schedules = list(schedules)
[docs]
def get_schedules(self) -> list[Schedule]:
"""Return the currently registered schedules."""
return self._schedules
[docs]
def generate_vercel_cron_config(self) -> list[dict[str, str]]:
"""Generate the ``crons`` section for vercel.json."""
return [
{
"path": s.path or f"/api/cron/{s.name}",
"schedule": s.cron or "",
}
for s in self._schedules
]