Source code for derp.queue.exceptions
"""Custom exceptions for queue integration."""
from __future__ import annotations
[docs]
class QueueError(Exception):
"""Base exception for all queue errors."""
[docs]
def __init__(self, message: str, code: str | None = None):
super().__init__(message)
self.message = message
self.code = code or "queue_error"
[docs]
class QueueNotConnectedError(QueueError):
"""Raised when queue client is used before connect()."""
[docs]
def __init__(self, message: str = "Queue not connected. Call connect() first."):
super().__init__(message, code="queue_not_connected")
[docs]
class QueueProviderError(QueueError):
"""Raised when the queue backend returns an error."""
[docs]
def __init__(
self,
message: str = "Queue provider request failed",
code: str | None = None,
):
super().__init__(message, code=code or "queue_provider_error")