Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 190 additions & 11 deletions crates/edgezero-core/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use web_time::Instant;

use crate::error::EdgeError;

Expand Down Expand Up @@ -415,7 +416,12 @@ impl KvHandle {
#[inline]
pub async fn delete(&self, key: &str) -> Result<(), KvError> {
Self::validate_key(key)?;
self.store.delete(key).await
let started_at = Self::kv_timing_start();
let result = self.store.delete(key).await;
Self::kv_timing_log(started_at, "delete", &result, || {
format!("key_len={}", key.len())
});
result
}

fn encode_list_cursor(prefix: &str, cursor: Option<String>) -> Result<Option<String>, KvError> {
Expand All @@ -437,7 +443,12 @@ impl KvHandle {
#[inline]
pub async fn exists(&self, key: &str) -> Result<bool, KvError> {
Self::validate_key(key)?;
self.store.exists(key).await
let started_at = Self::kv_timing_start();
let result = self.store.exists(key).await;
Self::kv_timing_log(started_at, "exists", &result, || {
Self::kv_exists_metadata(key.len(), &result)
});
result
}

/// Get a value by key, deserializing from JSON.
Expand All @@ -449,7 +460,13 @@ impl KvHandle {
#[inline]
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, KvError> {
Self::validate_key(key)?;
match self.store.get_bytes(key).await? {
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::kv_timing_log(started_at, "get", &result, || {
Self::kv_read_metadata(key.len(), &result)
});

match result? {
Some(bytes) => {
let val = serde_json::from_slice(&bytes)?;
Ok(Some(val))
Expand All @@ -465,7 +482,12 @@ impl KvHandle {
#[inline]
pub async fn get_bytes(&self, key: &str) -> Result<Option<Bytes>, KvError> {
Self::validate_key(key)?;
self.store.get_bytes(key).await
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::kv_timing_log(started_at, "get_bytes", &result, || {
Self::kv_read_metadata(key.len(), &result)
});
result
}

/// Get a value by key, returning `default` if the key does not exist.
Expand All @@ -477,6 +499,78 @@ impl KvHandle {
Ok(self.get(key).await?.unwrap_or(default))
}

fn kv_exists_metadata(key_len: usize, result: &Result<bool, KvError>) -> String {
match result.as_ref() {
Ok(exists) => format!("key_len={key_len} exists={exists}"),
Err(_err) => format!("key_len={key_len}"),
}
}

fn kv_hit_metadata(result: &Result<Option<Bytes>, KvError>) -> String {
match result.as_ref() {
Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()),
Ok(None) => "hit=false bytes=0".to_owned(),
Err(_err) => String::new(),
}
}

fn kv_list_metadata(
prefix_len: usize,
cursor_present: bool,
limit: usize,
result: &Result<KvPage, KvError>,
) -> String {
match result.as_ref() {
Ok(page) => format!(
"prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}",
page.keys.len(),
page.cursor.is_some()
),
Err(_err) => {
format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}")
}
}
}

fn kv_read_metadata(key_len: usize, result: &Result<Option<Bytes>, KvError>) -> String {
match result {
Ok(_value) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)),
Err(_err) => format!("key_len={key_len}"),
}
}

fn kv_timing_log<ResultValue, Metadata>(
started_at: Option<Instant>,
operation: &str,
result: &Result<ResultValue, KvError>,
metadata: Metadata,
) where
Metadata: FnOnce() -> String,
{
if let Some(start) = started_at {
let status = if result.is_ok() { "ok" } else { "error" };
log::debug!(
"kv operation={operation} elapsed_ms={} status={status} {}",
start.elapsed().as_millis(),
metadata()
);
}
}

fn kv_timing_start() -> Option<Instant> {
log::log_enabled!(log::Level::Debug).then(Instant::now)
}

fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option<Duration>) -> String {
match ttl {
Some(duration) => format!(
"key_len={key_len} bytes={bytes_len} ttl_secs={}",
duration.as_secs()
),
None => format!("key_len={key_len} bytes={bytes_len}"),
}
}

/// List keys in a bounded, paginated fashion.
///
/// The cursor is opaque, prefix-bound, and should be passed back unchanged
Expand All @@ -496,10 +590,15 @@ impl KvHandle {
Self::validate_prefix(prefix)?;
Self::validate_list_limit(limit)?;
let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?;
let page = self
let started_at = Self::kv_timing_start();
let result = self
.store
.list_keys_page(prefix, decoded_cursor.as_deref(), limit)
.await?;
.await;
Self::kv_timing_log(started_at, "list_keys_page", &result, || {
Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result)
});
let page = result?;

Ok(KvPage {
cursor: Self::encode_list_cursor(prefix, page.cursor)?,
Expand All @@ -522,7 +621,13 @@ impl KvHandle {
Self::validate_key(key)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store.put_bytes(key, Bytes::from(bytes)).await
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, Bytes::from(bytes)).await;
Self::kv_timing_log(started_at, "put", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes for a key.
Expand All @@ -533,7 +638,13 @@ impl KvHandle {
pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> {
Self::validate_key(key)?;
Self::validate_value(&value)?;
self.store.put_bytes(key, value).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, value).await;
Self::kv_timing_log(started_at, "put_bytes", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes with a TTL.
Expand All @@ -550,7 +661,13 @@ impl KvHandle {
Self::validate_key(key)?;
Self::validate_ttl(ttl)?;
Self::validate_value(&value)?;
self.store.put_bytes_with_ttl(key, value, ttl).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes_with_ttl(key, value, ttl).await;
Self::kv_timing_log(started_at, "put_bytes_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Put a value with a TTL, serializing it to JSON.
Expand All @@ -568,9 +685,16 @@ impl KvHandle {
Self::validate_ttl(ttl)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self
.store
.put_bytes_with_ttl(key, Bytes::from(bytes), ttl)
.await
.await;
Self::kv_timing_log(started_at, "put_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Read-modify-write: get the current value (or `default`),
Expand Down Expand Up @@ -995,6 +1119,24 @@ mod tests {
});
}

#[test]
fn error_metadata_omits_unknown_result_fields() {
let read_result = Err(KvError::Unavailable);
assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18");

let exists_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_exists_metadata(18, &exists_result),
"key_len=18"
);

let list_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_list_metadata(4, true, 100, &list_result),
"prefix_len=4 cursor_present=true limit=100"
);
}

#[test]
fn exists_returns_false_after_delete() {
let kv = handle();
Expand Down Expand Up @@ -1204,6 +1346,43 @@ mod tests {
});
}

#[test]
fn read_metadata_logs_lengths_not_raw_key_or_value() {
let key = "super-secret-token";
let value = Bytes::from_static(b"super-secret-value");
let result = Ok(Some(value));

let metadata = KvHandle::kv_read_metadata(key.len(), &result);

assert_eq!(metadata, "key_len=18 hit=true bytes=18");
assert!(!metadata.contains(key));
assert!(!metadata.contains("super-secret-value"));
}

#[test]
fn success_metadata_keeps_stable_field_types() {
let read_result = Ok(Some(Bytes::from_static(b"abc")));
assert_eq!(
KvHandle::kv_read_metadata(1, &read_result),
"key_len=1 hit=true bytes=3"
);

let exists_result = Ok(false);
assert_eq!(
KvHandle::kv_exists_metadata(1, &exists_result),
"key_len=1 exists=false"
);

let list_result = Ok(KvPage {
cursor: Some("cursor".to_owned()),
keys: vec!["a".to_owned(), "b".to_owned()],
});
assert_eq!(
KvHandle::kv_list_metadata(4, false, 100, &list_result),
"prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true"
);
}

#[test]
fn typed_get_bad_json_returns_serialization_error() {
let kv = handle();
Expand Down
6 changes: 6 additions & 0 deletions docs/guide/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ For strict correctness, use a transactional data store.

Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter materialises `Store::get_keys()` and pages client-side; a `max_list_keys` cap (configurable via `EDGEZERO__STORES__KV__<ID>__MAX_LIST_KEYS`, default `1000`) guards against runaway lists and yields `KvError::LimitExceeded` when exceeded.

## Operation Timing / Observability

`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts.
Comment thread
ChristianPavilonis marked this conversation as resolved.

Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs.

## Platform Specifics

### Local Development
Expand Down