Source code for derp.config

"""Central configuration for Derp."""

from __future__ import annotations

import os
import tomllib
from collections.abc import Sequence
from enum import StrEnum
from pathlib import Path
from typing import Any

from pydantic import BaseModel, ConfigDict, Field, ValidationError, model_validator

CONFIG_FILE = "derp.toml"
MIGRATIONS_TABLE = "derp_migrations"
DEFAULT_MIGRATIONS_DIR = "./migrations"


[docs] class ConfigError(Exception): """Configuration error."""
def _resolve_env_value( value: Any, *, _path: tuple[str, ...] = (), _env_vars: dict[tuple[str, ...], str] | None = None, _missing: list[str] | None = None, _root: bool = True, ) -> Any: if _env_vars is None: _env_vars = {} if _missing is None: _missing = [] if isinstance(value, str): if value.startswith("$"): env_name = value[1:] if not env_name: raise ConfigError("Invalid environment variable reference: '$'") env_value = os.environ.get(env_name) if env_value is None: _missing.append(env_name) return value _env_vars[_path] = env_name return env_value return value if isinstance(value, list): result = [ _resolve_env_value( item, _path=(*_path, str(i)), _env_vars=_env_vars, _missing=_missing, _root=False, ) for i, item in enumerate(value) ] elif isinstance(value, tuple): result = tuple( _resolve_env_value( item, _path=(*_path, str(i)), _env_vars=_env_vars, _missing=_missing, _root=False, ) for i, item in enumerate(value) ) elif isinstance(value, dict): result = { key: _resolve_env_value( val, _path=(*_path, key), _env_vars=_env_vars, _missing=_missing, _root=False, ) for key, val in value.items() } else: return value if _root and _missing: names = ", ".join(f"${v}" for v in _missing) raise ConfigError(f"Missing environment variables: {names}") return result class _StrictModel(BaseModel): model_config = ConfigDict(extra="forbid")
[docs] class DatabaseConfig(_StrictModel): """Database configuration.""" db_url: str replica_url: str | None = None schema_path: str migrations_dir: str = DEFAULT_MIGRATIONS_DIR introspect_schemas: Sequence[str] = ("public",) introspect_exclude_tables: Sequence[str] = (MIGRATIONS_TABLE,) ignore_rls: bool = False pool_min_size: int = 2 pool_max_size: int = 5 # Default to 0, for PgBouncer compatibility statement_cache_size: int = 0 replica_pool_min_size: int | None = None replica_pool_max_size: int | None = None # Default to asyncpg's default since replicas don't often use PgBouncer replica_statement_cache_size: int | None = None replica_max_lag_bytes: int = 1_048_576 replica_write_fence_seconds: float = 2.0 replica_lag_check_interval_seconds: float = 5.0
[docs] class EmailConfig(_StrictModel): """Configuration for email sending via SMTP.""" site_name: str site_url: str from_email: str smtp_host: str smtp_port: int smtp_user: str smtp_password: str templates_dir: str | None = None use_tls: bool = True start_tls: bool = False
[docs] class JWTConfig(_StrictModel): """Configuration for JWT tokens.""" secret: str algorithm: str = "HS256" access_token_expire_minutes: int = 15 refresh_token_expire_days: int = 7 issuer: str | None = None audience: str | None = None
[docs] class PasswordConfig(_StrictModel): """Configuration for password validation.""" min_length: int = 8 max_length: int = 128 require_uppercase: bool = False require_lowercase: bool = False require_digit: bool = False require_special: bool = False
[docs] class GoogleOAuthConfig(_StrictModel): """Configuration for Google OAuth.""" client_id: str client_secret: str redirect_uri: str scopes: Sequence[str] = ("openid", "email", "profile")
[docs] class GitHubOAuthConfig(_StrictModel): """Configuration for GitHub OAuth.""" client_id: str client_secret: str redirect_uri: str scopes: Sequence[str] = ("user:email",)
[docs] class NativeAuthConfig(_StrictModel): """Configuration for native authentication (email/password, magic link, OAuth).""" jwt: JWTConfig password: PasswordConfig = Field(default_factory=PasswordConfig) google_oauth: GoogleOAuthConfig | None = None github_oauth: GitHubOAuthConfig | None = None enable_signup: bool = True enable_confirmation: bool = True enable_magic_link: bool = False magic_link_expire_minutes: int = 60 recovery_token_expire_minutes: int = 60 confirmation_token_expire_hours: int = 24 session_expire_days: int = 30 use_kv_cache: bool = True cache_prefix: str = "derp:auth" cache_session_ttl_seconds: int = 300 cache_user_ttl_seconds: int = 300
[docs] class SupabaseConfig(_StrictModel): """Configuration for Supabase GoTrue authentication.""" url: str anon_key: str service_role_key: str jwt_secret: str redirect_uri: str | None = None
[docs] class WorkOSConfig(_StrictModel): """Configuration for WorkOS authentication.""" api_key: str client_id: str redirect_uri: str | None = None
[docs] class AuthConfig(_StrictModel): """Auth configuration — exactly one backend must be set.""" native: NativeAuthConfig | None = None supabase: SupabaseConfig | None = None workos: WorkOSConfig | None = None @model_validator(mode="after") def _check_single_backend(self) -> AuthConfig: backends = [self.native, self.supabase, self.workos] configured = sum(1 for b in backends if b is not None) if configured > 1: raise ValueError( "Only one auth backend can be configured at a time. " "Set exactly one of [auth.native], [auth.supabase], " "or [auth.workos]." ) if configured == 0: raise ValueError("At least one auth backend must be configured.") return self
[docs] class StorageConfig(_StrictModel): """Storage configuration.""" endpoint_url: str | None = None public_urls: dict[str, str] = Field(default_factory=dict) service_name: str = "s3" access_key_id: str | None = None secret_access_key: str | None = None session_token: str | None = None region: str = "auto" use_ssl: bool = True verify: bool | str = True
[docs] class PaymentsConfig(_StrictModel): """Payments configuration.""" api_key: str webhook_secret: str | None = None max_network_retries: int = 2 timeout_seconds: float = 30.0
[docs] class ValkeyMode(StrEnum): """Valkey deployment mode.""" STANDALONE = "standalone" CLUSTER = "cluster"
[docs] class ValkeyConfig(_StrictModel): """Configuration for Valkey GLIDE connections.""" addresses: Sequence[tuple[str, int]] = (("localhost", 6379),) username: str | None = None password: str | None = None use_tls: bool = False mode: ValkeyMode = ValkeyMode.STANDALONE
[docs] class KVConfig(_StrictModel): """KV configuration.""" valkey: ValkeyConfig | None = None
[docs] class CeleryConfig(_StrictModel): """Configuration for Celery task queue.""" broker_url: str result_backend: str | None = None task_serializer: str = "json" result_serializer: str = "json" task_default_queue: str = "default"
[docs] class VercelQueueConfig(_StrictModel): """Configuration for Vercel queue (REST-based).""" api_token: str team_id: str | None = None project_id: str | None = None default_queue: str = "default"
[docs] class ScheduleConfig(_StrictModel): """A single recurring task schedule.""" name: str task: str cron: str | None = None interval_seconds: float | None = None payload: dict[str, Any] | None = None queue: str | None = None path: str | None = None @model_validator(mode="after") def _check_schedule_type(self) -> ScheduleConfig: if self.cron is not None and self.interval_seconds is not None: raise ValueError( f"Schedule '{self.name}': set either 'cron' or " "'interval_seconds', not both." ) if self.cron is None and self.interval_seconds is None: raise ValueError( f"Schedule '{self.name}': must set either 'cron' or 'interval_seconds'." ) return self
[docs] class QueueConfig(_StrictModel): """Queue configuration.""" celery: CeleryConfig | None = None vercel: VercelQueueConfig | None = None schedules: Sequence[ScheduleConfig] = () @model_validator(mode="after") def _check_single_backend(self) -> QueueConfig: if self.celery is not None and self.vercel is not None: raise ValueError( "Only one queue backend can be configured at a time. " "Set either [queue.celery] or [queue.vercel], not both." ) return self
[docs] class ModalConfig(_StrictModel): """Configuration for Modal.""" token_id: str token_secret: str endpoint_url: str | None = None
[docs] class AIConfig(_StrictModel): """AI configuration for OpenAI-compatible providers.""" api_key: str base_url: str | None = None fal_api_key: str | None = None modal: ModalConfig | None = None
[docs] class DerpConfig(_StrictModel): """Derp configuration.""" database: DatabaseConfig email: EmailConfig | None = None storage: StorageConfig | None = None auth: AuthConfig | None = None kv: KVConfig | None = None payments: PaymentsConfig | None = None queue: QueueConfig | None = None ai: AIConfig | None = None _env_vars: dict[tuple[str, ...], str] = {}
[docs] @classmethod def load(cls, path: str | Path = CONFIG_FILE) -> DerpConfig: config_path = Path(path) if not config_path.exists(): raise ConfigError( f"{CONFIG_FILE} not found in current directory. " "Run 'derp init' to create one." ) with open(config_path, "rb") as f: raw = tomllib.load(f) env_vars: dict[tuple[str, ...], str] = {} data = _resolve_env_value(raw, _env_vars=env_vars) try: config = cls(**data) except ValidationError as e: raise ConfigError("Failed to load configuration.") from e config._env_vars = env_vars return config
[docs] def redacted_dump(self) -> dict: """Return config as a dict with environment variable values redacted.""" data = self.model_dump(mode="json") for path, env_name in self._env_vars.items(): target = data for key in path[:-1]: target = target[key] target[path[-1]] = f"${env_name}" return data
[docs] def create_default_config() -> str: """Return default configuration file content.""" return f"""\\ [database] db_url = "$DATABASE_URL" # Environment variable containing the database URL schema_path = "src/schema.py" # Path to your schema module # replica_url = "$REPLICA_DATABASE_URL" # Optional replica database URL migrations_dir = "{DEFAULT_MIGRATIONS_DIR}" # Directory for migration files # introspect_schemas = ["public"] # Schemas to introspect # introspect_exclude_tables = ["{MIGRATIONS_TABLE}"] # Tables to exclude # ignore_rls = false # Ignore RLS and policy changes in migrations # [email] # site_name = "My App" # Site name for email templates # site_url = "https://example.com" # Site URL for email templates # from_email = "noreply@example.com" # From email for sending emails # smtp_host = "smtp.example.com" # smtp_port = 587 # smtp_user = "$SMTP_USER" # smtp_password = "$SMTP_PASSWORD" # [storage] # endpoint_url = "https://s3.amazonaws.com" # public_urls = {{ assets = "https://assets.example.com" }} # access_key_id = "$AWS_ACCESS_KEY_ID" # secret_access_key = "$AWS_SECRET_ACCESS_KEY" # region = "us-east-1" # [auth.native.jwt] # secret = "$JWT_SECRET" # [auth.workos] # api_key = "$WORKOS_API_KEY" # client_id = "$WORKOS_CLIENT_ID" # redirect_uri = "https://yourapp.com/callback" # [auth.supabase] # url = "$SUPABASE_URL" # anon_key = "$SUPABASE_ANON_KEY" # service_role_key = "$SUPABASE_SERVICE_ROLE_KEY" # jwt_secret = "$SUPABASE_JWT_SECRET" # [kv.valkey] # addresses = [["localhost", 6379]] # # username = "$VALKEY_USERNAME" # # password = "$VALKEY_PASSWORD" # # use_tls = false # [payments] # api_key = "$STRIPE_SECRET_KEY" # webhook_secret = "$STRIPE_WEBHOOK_SECRET" # max_network_retries = 2 # timeout_seconds = 30.0 # [queue.celery] # broker_url = "$CELERY_BROKER_URL" # result_backend = "$CELERY_RESULT_BACKEND" # task_default_queue = "default" # [queue.vercel] # api_token = "$VERCEL_QUEUE_TOKEN" # team_id = "team_xxx" # project_id = "prj_xxx" # default_queue = "default" # [ai] # api_key = "$OPENAI_API_KEY" # base_url = "https://api.openai.com/v1" # Optional, for OpenAI-compatible providers """