derp.kv – Key-Value Store

KV client interfaces and backends.

class derp.kv.KVClient[source]

Bases: ABC

Byte-level async KV client.

supports_ttl: bool
supports_scan: bool
supports_batch: bool
abstractmethod async connect()[source]

Connect to the store.

Return type:

None

abstractmethod async disconnect()[source]

Disconnect from the store.

Return type:

None

abstractmethod async get(key)[source]

Fetch a value by key.

Parameters:

key (bytes)

Return type:

bytes | None

abstractmethod async set(key, value, *, ttl=None)[source]

Set a value by key.

Parameters:
Return type:

None

abstractmethod async delete(key)[source]

Delete a key.

Parameters:

key (bytes)

Return type:

bool

abstractmethod async exists(key)[source]

Check if a key exists.

Parameters:

key (bytes)

Return type:

bool

abstractmethod async mget(keys)[source]

Fetch multiple keys.

Parameters:

keys (Sequence[bytes])

Return type:

Sequence[bytes | None]

abstractmethod async mset(items, *, ttl=None)[source]

Set multiple key/value pairs.

Parameters:
Return type:

None

abstractmethod async delete_many(keys)[source]

Delete multiple keys.

Parameters:

keys (Sequence[bytes])

Return type:

int

abstractmethod async ttl(key)[source]

Return remaining TTL in seconds, or None.

Parameters:

key (bytes)

Return type:

float | None

abstractmethod async expire(key, ttl)[source]

Set TTL for a key. Returns False if missing.

Parameters:
Return type:

bool

abstractmethod async set_nx(key, value, *, ttl=None)[source]

Set a value only if the key does not exist. Returns True if set.

Parameters:
Return type:

bool

abstractmethod async incr(key)[source]

Increment a key by 1 and return the new value.

Creates the key with value 1 if it does not exist.

Parameters:

key (bytes)

Return type:

int

abstractmethod async scan(*, prefix=None, limit=None)[source]

Iterate keys with optional prefix and limit.

Parameters:
  • prefix (bytes | None)

  • limit (int | None)

Return type:

AsyncIterator[bytes]

async guarded_get(cache_key, *, compute, ttl, lock_ttl=2.0, retry_delay=0.05)[source]

Fetch from cache with stampede protection.

On a cache miss, only one caller acquires a lock and computes the value. Other callers wait for the cache to be populated. If retries are exhausted, the caller falls through and computes directly.

The wait budget is derived from lock_ttl so waiters keep retrying for the full duration the lock could be held.

Parameters:
  • cache_key (bytes) – The cache key to read/write.

  • compute (Callable[[], Awaitable[bytes]]) – Async callable that produces the value on cache miss.

  • ttl (float) – TTL in seconds for the cached value.

  • lock_ttl (float) – TTL in seconds for the lock key.

  • retry_delay (float) – Seconds to sleep between retry attempts.

Returns:

The cached or freshly computed value.

Return type:

bytes

async idempotent_execute(*, key, compute, status_code=200, ttl=86400, key_prefix='derp:idempotency')[source]

Execute idempotently: run compute once per key.

On the first call for a given key, compute is invoked and the result is cached. Subsequent calls return the cached result without re-invoking compute. Uses guarded_get() for stampede protection.

Parameters:
  • key (str) – Idempotency key (typically from a client header).

  • compute (Callable[[], Awaitable[Any]]) – Async callable producing a JSON-serializable result.

  • status_code (int) – HTTP status code to cache alongside the body.

  • ttl (float) – Cache TTL in seconds (default 24h).

  • key_prefix (str) – KV key prefix.

Returns:

(body, status_code, is_replay)body is the deserialized result, status_code is the cached status, and is_replay is True when the cached value was used.

Return type:

tuple[Any, int, bool]

async already_processed(*, event_id, ttl=86400, key_prefix='derp:webhook')[source]

Check if an event has already been processed.

Uses set_nx() to atomically mark the event. Returns True if the event was already seen, False on first call.

Parameters:
  • event_id (str) – Unique event identifier (e.g. Stripe event ID).

  • ttl (float) – How long to remember the event (default 24h).

  • key_prefix (str) – KV key prefix.

Return type:

bool

async rate_limit(key, *, limit, window, key_prefix='derp:ratelimit')[source]

Fixed-window rate limit check.

Increments a counter for the given key. The counter resets after window seconds. Returns a RateLimitResult indicating whether the request is allowed.

Parameters:
  • key (str) – Identifier to rate limit (e.g. f"checkout:{user_id}").

  • limit (int) – Maximum number of requests per window.

  • window (float) – Window duration in seconds.

  • key_prefix (str) – KV key prefix.

Return type:

RateLimitResult

class derp.kv.RateLimitResult[source]

Bases: object

Result of a rate limit check.

allowed: bool
count: int
limit: int
remaining: int
retry_after: float | None
__init__(allowed, count, limit, remaining, retry_after)
Parameters:
Return type:

None

class derp.kv.ValkeyClient[source]

Bases: KVClient

Byte-level KV client backed by Valkey GLIDE.

supports_ttl: bool = True
supports_scan: bool = True
supports_batch: bool = True
__init__(config)[source]
Parameters:

config (ValkeyConfig)

async connect()[source]

Connect to the store.

Return type:

None

async disconnect()[source]

Disconnect from the store.

Return type:

None

property client: GlideClient | GlideClusterClient
async get(key)[source]

Fetch a value by key.

Parameters:

key (bytes)

Return type:

bytes | None

async set(key, value, *, ttl=None)[source]

Set a value by key.

Parameters:
Return type:

None

async set_nx(key, value, *, ttl=None)[source]

Set a value only if the key does not exist. Returns True if set.

Parameters:
Return type:

bool

async delete(key)[source]

Delete a key.

Parameters:

key (bytes)

Return type:

bool

async exists(key)[source]

Check if a key exists.

Parameters:

key (bytes)

Return type:

bool

async mget(keys)[source]

Fetch multiple keys.

Parameters:

keys (Sequence[bytes])

Return type:

Sequence[bytes | None]

async mset(items, *, ttl=None)[source]

Set multiple key/value pairs.

Parameters:
Return type:

None

async delete_many(keys)[source]

Delete multiple keys.

Parameters:

keys (Sequence[bytes])

Return type:

int

async ttl(key)[source]

Return remaining TTL in seconds, or None.

Parameters:

key (bytes)

Return type:

float | None

async expire(key, ttl)[source]

Set TTL for a key. Returns False if missing.

Parameters:
Return type:

bool

async incr(key)[source]

Increment a key by 1 and return the new value.

Creates the key with value 1 if it does not exist.

Parameters:

key (bytes)

Return type:

int

async scan(*, prefix=None, limit=None)[source]

Iterate keys with optional prefix and limit.

Parameters:
  • prefix (bytes | None)

  • limit (int | None)

Return type:

AsyncIterator[bytes]

class derp.kv.ValkeyConfig[source]

Bases: _StrictModel

Configuration for Valkey GLIDE connections.

addresses: Sequence[tuple[str, int]]
username: str | None
password: str | None
use_tls: bool
mode: ValkeyMode
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class derp.kv.ValkeyMode[source]

Bases: StrEnum

Valkey deployment mode.

STANDALONE = 'standalone'
CLUSTER = 'cluster'
__new__(value)