From 15c79f6e29fbb0ad31f52e346d3533d4aee53a7d Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 26 May 2026 13:02:19 -0500 Subject: [PATCH 1/2] Instrument KV operation timing logs --- crates/edgezero-core/src/key_value_store.rs | 133 ++++++++++++++++++-- docs/guide/kv.md | 6 + 2 files changed, 128 insertions(+), 11 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 1e7b535f..fe081faf 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -51,6 +51,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use web_time::Instant; use crate::error::EdgeError; @@ -404,6 +405,48 @@ impl KvHandle { .transpose() } + fn kv_timing_start() -> Option { + log::log_enabled!(log::Level::Debug).then(Instant::now) + } + + fn log_kv_timing( + started_at: Option, + operation: &str, + result: &Result, + metadata: F, + ) where + F: FnOnce() -> String, + { + if let Some(started_at) = started_at { + let status = if result.is_ok() { "ok" } else { "error" }; + log::debug!( + "kv operation={} elapsed_ms={:.3} status={} {}", + operation, + started_at.elapsed().as_secs_f64() * 1000.0, + status, + metadata() + ); + } + } + + fn kv_hit_metadata(result: &Result, KvError>) -> String { + match result.as_ref() { + Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), + Ok(None) => "hit=false bytes=0".to_string(), + Err(_) => "hit=unknown bytes=unknown".to_string(), + } + } + + fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option) -> String { + match ttl { + Some(ttl) => format!( + "key_len={key_len} bytes={bytes_len} ttl_secs={}", + ttl.as_secs() + ), + None => format!("key_len={key_len} bytes={bytes_len}"), + } + } + // -- Typed helpers (JSON) ----------------------------------------------- /// Get a value by key, deserializing from JSON. @@ -411,7 +454,13 @@ impl KvHandle { /// Returns `Ok(None)` if the key does not exist. pub async fn get(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - match self.store.get_bytes(key).await? { + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::log_kv_timing(started_at, "get", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + + match result? { Some(bytes) => { let val = serde_json::from_slice(&bytes)?; Ok(Some(val)) @@ -430,7 +479,13 @@ impl KvHandle { Self::validate_key(key)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store.put_bytes(key, Bytes::from(bytes)).await + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, Bytes::from(bytes)).await; + Self::log_kv_timing(started_at, "put", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put a value with a TTL, serializing it to JSON. @@ -444,9 +499,16 @@ impl KvHandle { Self::validate_ttl(ttl)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self + .store .put_bytes_with_ttl(key, Bytes::from(bytes), ttl) - .await + .await; + Self::log_kv_timing(started_at, "put_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Read-modify-write: get the current value (or `default`), @@ -478,14 +540,25 @@ impl KvHandle { /// Get raw bytes for a key. pub async fn get_bytes(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - self.store.get_bytes(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::log_kv_timing(started_at, "get_bytes", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + result } /// Put raw bytes for a key. pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> { Self::validate_key(key)?; Self::validate_value(&value)?; - self.store.put_bytes(key, value).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, value).await; + Self::log_kv_timing(started_at, "put_bytes", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes with a TTL. @@ -498,7 +571,13 @@ impl KvHandle { Self::validate_key(key)?; Self::validate_ttl(ttl)?; Self::validate_value(&value)?; - self.store.put_bytes_with_ttl(key, value, ttl).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes_with_ttl(key, value, ttl).await; + Self::log_kv_timing(started_at, "put_bytes_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } // -- Other operations --------------------------------------------------- @@ -506,13 +585,27 @@ impl KvHandle { /// Check whether a key exists without deserializing its value. pub async fn exists(&self, key: &str) -> Result { Self::validate_key(key)?; - self.store.exists(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.exists(key).await; + Self::log_kv_timing(started_at, "exists", &result, || { + let exists = result + .as_ref() + .map(|exists| exists.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + format!("key_len={} exists={exists}", key.len()) + }); + result } /// Delete a key. pub async fn delete(&self, key: &str) -> Result<(), KvError> { Self::validate_key(key)?; - self.store.delete(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.delete(key).await; + Self::log_kv_timing(started_at, "delete", &result, || { + format!("key_len={}", key.len()) + }); + result } /// List keys in a bounded, paginated fashion. @@ -530,10 +623,28 @@ impl KvHandle { Self::validate_prefix(prefix)?; Self::validate_list_limit(limit)?; let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?; - let page = self + let started_at = Self::kv_timing_start(); + let result = self .store .list_keys_page(prefix, decoded_cursor.as_deref(), limit) - .await?; + .await; + Self::log_kv_timing(started_at, "list_keys_page", &result, || { + let (count, next_cursor) = result + .as_ref() + .map(|page| { + ( + page.keys.len().to_string(), + page.cursor.is_some().to_string(), + ) + }) + .unwrap_or_else(|_| ("unknown".to_string(), "unknown".to_string())); + format!( + "prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}", + prefix.len(), + cursor.is_some() + ) + }); + let page = result?; Ok(KvPage { keys: page.keys, diff --git a/docs/guide/kv.md b/docs/guide/kv.md index 8d7cb329..629aeeb0 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -88,6 +88,12 @@ For strict correctness, use a transactional data store. Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter returns `KvError::Validation` for key listing because Spin's current `Store::get_keys()` API is unbounded. +## Operation Timing / Observability + +`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. + +Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. + ## Platform Specifics ### Local Development From 73a718bdfb555974bde9ea55d0cac4e484c0ca52 Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 11 Jun 2026 12:45:33 -0500 Subject: [PATCH 2/2] Address KV timing review feedback --- crates/edgezero-core/src/key_value_store.rs | 114 ++++++++++++++++---- docs/guide/kv.md | 2 +- 2 files changed, 93 insertions(+), 23 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index fe081faf..ea941cfe 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -433,7 +433,14 @@ impl KvHandle { match result.as_ref() { Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), Ok(None) => "hit=false bytes=0".to_string(), - Err(_) => "hit=unknown bytes=unknown".to_string(), + Err(_) => String::new(), + } + } + + fn kv_read_metadata(key_len: usize, result: &Result, KvError>) -> String { + match result { + Ok(_) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)), + Err(_) => format!("key_len={key_len}"), } } @@ -447,6 +454,31 @@ impl KvHandle { } } + fn kv_exists_metadata(key_len: usize, result: &Result) -> String { + match result.as_ref() { + Ok(exists) => format!("key_len={key_len} exists={exists}"), + Err(_) => format!("key_len={key_len}"), + } + } + + fn kv_list_metadata( + prefix_len: usize, + cursor_present: bool, + limit: usize, + result: &Result, + ) -> String { + match result.as_ref() { + Ok(page) => format!( + "prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}", + page.keys.len(), + page.cursor.is_some() + ), + Err(_) => { + format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}") + } + } + } + // -- Typed helpers (JSON) ----------------------------------------------- /// Get a value by key, deserializing from JSON. @@ -457,7 +489,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.get_bytes(key).await; Self::log_kv_timing(started_at, "get", &result, || { - format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + Self::kv_read_metadata(key.len(), &result) }); match result? { @@ -543,7 +575,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.get_bytes(key).await; Self::log_kv_timing(started_at, "get_bytes", &result, || { - format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + Self::kv_read_metadata(key.len(), &result) }); result } @@ -588,11 +620,7 @@ impl KvHandle { let started_at = Self::kv_timing_start(); let result = self.store.exists(key).await; Self::log_kv_timing(started_at, "exists", &result, || { - let exists = result - .as_ref() - .map(|exists| exists.to_string()) - .unwrap_or_else(|_| "unknown".to_string()); - format!("key_len={} exists={exists}", key.len()) + Self::kv_exists_metadata(key.len(), &result) }); result } @@ -629,20 +657,7 @@ impl KvHandle { .list_keys_page(prefix, decoded_cursor.as_deref(), limit) .await; Self::log_kv_timing(started_at, "list_keys_page", &result, || { - let (count, next_cursor) = result - .as_ref() - .map(|page| { - ( - page.keys.len().to_string(), - page.cursor.is_some().to_string(), - ) - }) - .unwrap_or_else(|_| ("unknown".to_string(), "unknown".to_string())); - format!( - "prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}", - prefix.len(), - cursor.is_some() - ) + Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result) }); let page = result?; @@ -975,6 +990,61 @@ mod tests { KvHandle::new(Arc::new(MockStore::new())) } + #[test] + fn read_metadata_logs_lengths_not_raw_key_or_value() { + let key = "super-secret-token"; + let value = Bytes::from_static(b"super-secret-value"); + let result = Ok(Some(value)); + + let metadata = KvHandle::kv_read_metadata(key.len(), &result); + + assert_eq!(metadata, "key_len=18 hit=true bytes=18"); + assert!(!metadata.contains(key)); + assert!(!metadata.contains("super-secret-value")); + } + + #[test] + fn error_metadata_omits_unknown_result_fields() { + let read_result = Err(KvError::Unavailable); + assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18"); + + let exists_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_exists_metadata(18, &exists_result), + "key_len=18" + ); + + let list_result = Err(KvError::Unavailable); + assert_eq!( + KvHandle::kv_list_metadata(4, true, 100, &list_result), + "prefix_len=4 cursor_present=true limit=100" + ); + } + + #[test] + fn success_metadata_keeps_stable_field_types() { + let read_result = Ok(Some(Bytes::from_static(b"abc"))); + assert_eq!( + KvHandle::kv_read_metadata(1, &read_result), + "key_len=1 hit=true bytes=3" + ); + + let exists_result = Ok(false); + assert_eq!( + KvHandle::kv_exists_metadata(1, &exists_result), + "key_len=1 exists=false" + ); + + let list_result = Ok(KvPage { + keys: vec!["a".to_string(), "b".to_string()], + cursor: Some("cursor".to_string()), + }); + assert_eq!( + KvHandle::kv_list_metadata(4, false, 100, &list_result), + "prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true" + ); + } + // -- Raw bytes ---------------------------------------------------------- #[test] diff --git a/docs/guide/kv.md b/docs/guide/kv.md index 629aeeb0..bbf2806d 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -92,7 +92,7 @@ Key listing is paginated by design. This avoids buffering an unbounded number of `KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. -Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. +Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. ## Platform Specifics