Source code for derp.derp_client

"""Derp client for interacting with database, file storage, and more."""

from __future__ import annotations

import datetime
from types import TracebackType
from typing import Self

from derp.ai import AIClient
from derp.auth.base import BaseAuthClient
from derp.auth.email import EmailClient
from derp.auth.native_client import NativeAuthClient
from derp.auth.supabase_client import SupabaseAuthClient
from derp.auth.workos_client import WorkOSAuthClient
from derp.config import DerpConfig
from derp.kv.base import KVClient
from derp.kv.valkey import ValkeyClient
from derp.orm import DatabaseEngine
from derp.orm.router import ReplicaRouter
from derp.payments import PaymentsClient
from derp.queue.base import QueueClient, Schedule, ScheduleType
from derp.queue.celery import CeleryQueueClient
from derp.queue.vercel import VercelQueueClient
from derp.storage import StorageClient


[docs] class DerpClient: """Derp client for interacting with database, file storage, and more."""
[docs] def __init__(self, config: DerpConfig): self._config: DerpConfig = config self._db: DatabaseEngine = DatabaseEngine( config.database.db_url, min_size=config.database.pool_min_size, max_size=config.database.pool_max_size, statement_cache_size=config.database.statement_cache_size, ) self._replica_db: DatabaseEngine | None = ( DatabaseEngine( config.database.replica_url, min_size=( config.database.replica_pool_min_size or config.database.pool_min_size ), max_size=( config.database.replica_pool_max_size or config.database.pool_max_size ), statement_cache_size=config.database.replica_statement_cache_size, ) if config.database.replica_url is not None else None ) self._email: EmailClient | None = ( EmailClient(self._config.email) if self._config.email is not None else None ) self._storage: StorageClient | None = ( StorageClient(self._config.storage) if self._config.storage is not None else None ) self._auth: BaseAuthClient | None = None if self._config.auth is not None: if self._config.auth.native is not None: self._auth = NativeAuthClient(self._config.auth.native) elif self._config.auth.supabase is not None: self._auth = SupabaseAuthClient(self._config.auth.supabase) elif self._config.auth.workos is not None: self._auth = WorkOSAuthClient(self._config.auth.workos) self._kv: KVClient | None = ( ValkeyClient(self._config.kv.valkey) if self._config.kv is not None and self._config.kv.valkey is not None else None ) self._payments: PaymentsClient | None = ( PaymentsClient(self._config.payments) if self._config.payments is not None else None ) self._queue: QueueClient | None = None if self._config.queue is not None: if self._config.queue.celery is not None: self._queue = CeleryQueueClient(self._config.queue.celery) elif self._config.queue.vercel is not None: self._queue = VercelQueueClient(self._config.queue.vercel) if self._queue is not None and self._config.queue.schedules: self._queue.register_schedules( [ Schedule( name=sc.name, task=sc.task, type=( ScheduleType.CRON if sc.cron else ScheduleType.INTERVAL ), cron=sc.cron, interval=( datetime.timedelta(seconds=sc.interval_seconds) if sc.interval_seconds else None ), payload=sc.payload, queue=sc.queue, path=sc.path, ) for sc in self._config.queue.schedules ] ) self._ai: AIClient | None = ( AIClient(self._config.ai) if self._config.ai is not None else None ) self._router: ReplicaRouter | None = None self._in_session = False if ( self._config.auth is not None and self._config.auth.native is not None and self._email is None ): raise ValueError( "The email client needs to be configured for native authentication " "to work. Please make sure to configure `EmailConfig` when " "`NativeAuthConfig` is configured via `derp.toml` or `DerpConfig`." )
[docs] async def connect(self) -> None: """Start a session.""" await self._db.connect() if self._replica_db is not None: await self._replica_db.connect() if self._storage is not None: await self._storage.connect() if self._kv is not None: await self._kv.connect() if self._payments is not None: await self._payments.connect() if self._queue is not None: await self._queue.connect() if self._auth is not None: await self._auth.connect() self._auth.set_db(self._db) self._auth.set_email(self._email) if self._kv is not None: self._auth.set_kv(self._kv) if self._kv is not None: self._db.set_cache(self._kv) if self._ai is not None: await self._ai.connect() # Set up replica router if replica is configured if self._replica_db is not None: self._router = ReplicaRouter( self._db.pool, self._replica_db.pool, self._config.database, ) await self._router.start() self._db.set_router(self._router) self._in_session = True
[docs] async def disconnect( self, ) -> None: """End a session.""" errors: list[Exception] = [] # Stop router before closing pools if self._router is not None: try: await self._router.stop() except Exception as exc: errors.append(exc) self._db.set_router(None) self._router = None for client in [ self._db, self._replica_db, self._storage, self._kv, self._payments, self._queue, self._ai, ]: if client is not None: try: await client.disconnect() except Exception as exc: errors.append(exc) if self._auth is not None: await self._auth.disconnect() self._auth.set_db(None) self._auth.set_email(None) self._auth.set_kv(None) self._db.set_cache(None) self._in_session = False if errors: raise ExceptionGroup("errors during disconnect", errors)
async def __aenter__(self) -> Self: await self.connect() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.disconnect() @property def db(self) -> DatabaseEngine: """Get the database engine.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") return self._db @property def email(self) -> EmailClient: """Get the email client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._email is None: raise ValueError("`EmailConfig` was not passed to `DerpConfig`.") return self._email @property def storage(self) -> StorageClient: """Get the storage client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._storage is None: raise ValueError("`StorageConfig` was not passed to `DerpConfig`.") return self._storage @property def auth(self) -> BaseAuthClient: """Get the auth service.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._auth is None: raise ValueError("`AuthConfig` was not passed to `DerpConfig`.") return self._auth @property def kv(self) -> KVClient: """Get the KV client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._kv is None: raise ValueError("`KVConfig` was not passed to `DerpConfig`.") return self._kv @property def payments(self) -> PaymentsClient: """Get the payments client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._payments is None: raise ValueError("`PaymentsConfig` was not passed to `DerpConfig`.") return self._payments @property def queue(self) -> QueueClient: """Get the queue client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._queue is None: raise ValueError("`QueueConfig` was not passed to `DerpConfig`.") return self._queue @property def ai(self) -> AIClient: """Get the AI client.""" if not self._in_session: raise ValueError("Not in a session. Call `connect()` first.") if self._ai is None: raise ValueError("`AIConfig` was not passed to `DerpConfig`.") return self._ai @property def config(self) -> DerpConfig: """Get the Derp configuration.""" return self._config