KV¶
Async key-value client backed by Valkey (Redis-compatible). Access it via derp.kv.
Config¶
# derp.toml
[kv.valkey]
addresses = [["localhost", 6379]]
# username = "$VALKEY_USERNAME"
# password = "$VALKEY_PASSWORD"
# use_tls = false
# mode = "standalone" # or "cluster"
Basic Operations¶
# Set a key with a 1-hour TTL
await derp.kv.set(b"user:123", b'{"name":"Alice"}', ttl=3600)
# Get a key
data = await derp.kv.get(b"user:123")
# Delete a key
await derp.kv.delete(b"user:123")
# Check existence
if await derp.kv.exists(b"user:123"):
...
All keys and values are bytes. TTL is in seconds.
Batch Operations¶
# Set multiple keys
await derp.kv.mset([
(b"user:1", b'{"name":"Alice"}'),
(b"user:2", b'{"name":"Bob"}'),
], ttl=3600)
# Get multiple keys
values = await derp.kv.mget([b"user:1", b"user:2"])
# Delete multiple keys
deleted_count = await derp.kv.delete_many([b"user:1", b"user:2"])
Other Primitives¶
# Set only if key does not exist (returns True if set)
was_set = await derp.kv.set_nx(b"lock:order:99", b"1", ttl=10)
# Check remaining TTL
remaining = await derp.kv.ttl(b"user:123")
# Update TTL on existing key
await derp.kv.expire(b"user:123", ttl=7200)
# Scan keys by prefix
async for key in derp.kv.scan(prefix=b"user:"):
print(key)
Cache with Stampede Protection¶
guarded_get prevents multiple concurrent callers from recomputing the same
expensive value on a cache miss. One caller acquires a lock and computes; the
rest wait for the cache to be populated.
result = await derp.kv.guarded_get(
b"product:42",
compute=lambda: fetch_from_db(42),
ttl=300,
)
compute is an async callable returning bytes. If the lock holder is
slow, waiters retry for up to lock_ttl seconds (default 2s) before falling
through and computing directly.
result = await derp.kv.guarded_get(
b"product:42",
compute=lambda: fetch_from_db(42),
ttl=300,
lock_ttl=5.0,
retry_delay=0.1,
)
Idempotent API Endpoints¶
idempotent_execute runs a computation once per idempotency key and caches
the result (body + status code). Subsequent calls with the same key return the
cached response. Uses guarded_get internally for stampede protection.
idem_key = request.headers.get("Idempotency-Key")
if idem_key:
body, status, is_replay = await derp.kv.idempotent_execute(
key=idem_key,
compute=lambda: create_order(data),
status_code=201,
)
return JSONResponse(body, status_code=status)
Returns a tuple of (body, status_code, is_replay). is_replay is
True when the cached value was returned. Default TTL is 24 hours.
body, status, is_replay = await derp.kv.idempotent_execute(
key=idem_key,
compute=lambda: create_order(data),
status_code=201,
ttl=3600, # cache for 1 hour instead of 24h
key_prefix="myapp:idem", # custom key prefix
)
Rate Limiting¶
rate_limit implements a fixed-window counter. Returns a
RateLimitResult instead of raising, so you decide what
to do when the limit is exceeded.
result = await derp.kv.rate_limit(
f"checkout:{user.id}",
limit=5,
window=60,
)
if not result.allowed:
raise HTTPException(
status_code=429,
headers={"Retry-After": str(result.retry_after)},
)
The result carries everything you need for response headers:
allowed–Trueif the request is within the limitcount– current request count in the windowlimit– the configured limitremaining– requests left in the windowretry_after– seconds until the window resets (only set when denied)
Custom key prefix:
result = await derp.kv.rate_limit(
f"api:{user.id}",
limit=100,
window=3600,
key_prefix="myapp:rl",
)
Webhook Deduplication¶
already_processed atomically marks an event ID and returns whether it was
already seen. Useful for making webhook handlers idempotent.
if await derp.kv.already_processed(event_id=event["id"]):
return {"status": "duplicate"}
# First time seeing this event -- process it
handle_event(event)
Default TTL is 24 hours. Customize with ttl and key_prefix:
if await derp.kv.already_processed(
event_id=event["id"],
ttl=7200,
key_prefix="myapp:webhook",
):
return {"status": "duplicate"}