Source code for derp.queue.exceptions

"""Custom exceptions for queue integration."""

from __future__ import annotations


[docs] class QueueError(Exception): """Base exception for all queue errors."""
[docs] def __init__(self, message: str, code: str | None = None): super().__init__(message) self.message = message self.code = code or "queue_error"
[docs] class QueueNotConnectedError(QueueError): """Raised when queue client is used before connect()."""
[docs] def __init__(self, message: str = "Queue not connected. Call connect() first."): super().__init__(message, code="queue_not_connected")
[docs] class QueueProviderError(QueueError): """Raised when the queue backend returns an error."""
[docs] def __init__( self, message: str = "Queue provider request failed", code: str | None = None, ): super().__init__(message, code=code or "queue_provider_error")