derp.kv – Key-Value Store¶
KV client interfaces and backends.
- class derp.kv.KVClient[source]¶
Bases:
ABCByte-level async KV client.
- abstractmethod async set_nx(key, value, *, ttl=None)[source]¶
Set a value only if the key does not exist. Returns True if set.
- abstractmethod async incr(key)[source]¶
Increment a key by 1 and return the new value.
Creates the key with value 1 if it does not exist.
- abstractmethod async scan(*, prefix=None, limit=None)[source]¶
Iterate keys with optional prefix and limit.
- Parameters:
- Return type:
- async guarded_get(cache_key, *, compute, ttl, lock_ttl=2.0, retry_delay=0.05)[source]¶
Fetch from cache with stampede protection.
On a cache miss, only one caller acquires a lock and computes the value. Other callers wait for the cache to be populated. If retries are exhausted, the caller falls through and computes directly.
The wait budget is derived from
lock_ttlso waiters keep retrying for the full duration the lock could be held.- Parameters:
cache_key (bytes) – The cache key to read/write.
compute (Callable[[], Awaitable[bytes]]) – Async callable that produces the value on cache miss.
ttl (float) – TTL in seconds for the cached value.
lock_ttl (float) – TTL in seconds for the lock key.
retry_delay (float) – Seconds to sleep between retry attempts.
- Returns:
The cached or freshly computed value.
- Return type:
- async idempotent_execute(*, key, compute, status_code=200, ttl=86400, key_prefix='derp:idempotency')[source]¶
Execute idempotently: run
computeonce per key.On the first call for a given key,
computeis invoked and the result is cached. Subsequent calls return the cached result without re-invokingcompute. Usesguarded_get()for stampede protection.- Parameters:
key (str) – Idempotency key (typically from a client header).
compute (Callable[[], Awaitable[Any]]) – Async callable producing a JSON-serializable result.
status_code (int) – HTTP status code to cache alongside the body.
ttl (float) – Cache TTL in seconds (default 24h).
key_prefix (str) – KV key prefix.
- Returns:
(body, status_code, is_replay)— body is the deserialized result, status_code is the cached status, and is_replay isTruewhen the cached value was used.- Return type:
- async already_processed(*, event_id, ttl=86400, key_prefix='derp:webhook')[source]¶
Check if an event has already been processed.
Uses
set_nx()to atomically mark the event. ReturnsTrueif the event was already seen,Falseon first call.
- async rate_limit(key, *, limit, window, key_prefix='derp:ratelimit')[source]¶
Fixed-window rate limit check.
Increments a counter for the given key. The counter resets after
windowseconds. Returns aRateLimitResultindicating whether the request is allowed.- Parameters:
- Return type:
- class derp.kv.ValkeyClient[source]¶
Bases:
KVClientByte-level KV client backed by Valkey GLIDE.
- __init__(config)[source]¶
- Parameters:
config (ValkeyConfig)
- property client: GlideClient | GlideClusterClient¶
- async set_nx(key, value, *, ttl=None)[source]¶
Set a value only if the key does not exist. Returns True if set.
- class derp.kv.ValkeyConfig[source]¶
Bases:
_StrictModelConfiguration for Valkey GLIDE connections.
- mode: ValkeyMode¶
- model_config = {'extra': 'forbid'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].