Source code for derp.auth.password

"""Password hashing and validation."""

from __future__ import annotations

import abc
import asyncio
import concurrent.futures
import dataclasses
import re
import secrets

from etils import epy

from derp.config import PasswordConfig

with epy.lazy_imports():
    import argon2
    import argon2.exceptions as argon2_exceptions


[docs] class PasswordHasher(abc.ABC): """Abstract base class for password hashers."""
[docs] def __init__(self, *, max_workers: int = 8) -> None: self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="password-hasher" )
[docs] @abc.abstractmethod def hash(self, password: str) -> str: """Hash a password synchronously."""
[docs] @abc.abstractmethod def verify(self, password: str, hashed: str) -> bool: """Verify a password synchronously."""
[docs] @abc.abstractmethod def needs_rehash(self, hashed: str) -> bool: """Check if a hash needs to be rehashed (e.g., algorithm upgrade)."""
[docs] async def async_hash(self, password: str) -> str: """Hash a password without blocking the event loop.""" return await asyncio.get_running_loop().run_in_executor( self._executor, self.hash, password )
[docs] async def async_verify(self, password: str, hashed: str) -> bool: """Verify a password without blocking the event loop.""" return await asyncio.get_running_loop().run_in_executor( self._executor, self.verify, password, hashed )
[docs] class Argon2Hasher(PasswordHasher): """Password hasher using Argon2id (recommended)."""
[docs] def __init__( self, time_cost: int = 3, memory_cost: int = 65536, parallelism: int = 4, *, max_workers: int = 8, ): super().__init__(max_workers=max_workers) self._hasher = argon2.PasswordHasher( time_cost=time_cost, memory_cost=memory_cost, parallelism=parallelism, )
[docs] def hash(self, password: str) -> str: """Hash a password using Argon2id.""" return self._hasher.hash(password)
[docs] def verify(self, password: str, hashed: str) -> bool: """Verify a password against an Argon2 hash.""" try: self._hasher.verify(hashed, password) return True except ( argon2_exceptions.VerifyMismatchError, argon2_exceptions.InvalidHashError, ): return False
[docs] def needs_rehash(self, hashed: str) -> bool: """Check if the hash needs to be updated with new parameters.""" return self._hasher.check_needs_rehash(hashed)
[docs] @dataclasses.dataclass class PasswordValidationResult: """Result of password validation.""" valid: bool errors: list[str]
def validate_password( config: PasswordConfig, password: str ) -> PasswordValidationResult: errors: list[str] = [] if len(password) < config.min_length: errors.append(f"Password must be at least {config.min_length} characters") if len(password) > config.max_length: errors.append(f"Password must be at most {config.max_length} characters") if config.require_uppercase and not re.search(r"[A-Z]", password): errors.append("Password must contain at least one uppercase letter") if config.require_lowercase and not re.search(r"[a-z]", password): errors.append("Password must contain at least one lowercase letter") if config.require_digit and not re.search(r"\d", password): errors.append("Password must contain at least one digit") if config.require_special and not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): errors.append("Password must contain at least one special character") return PasswordValidationResult(valid=len(errors) == 0, errors=errors)
[docs] def generate_secure_token(length: int = 32) -> str: """Generate a cryptographically secure random token.""" return secrets.token_urlsafe(length)