diff --git a/src/otel/logs.rs b/src/otel/logs.rs index f6d770fad..ee853871d 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -117,7 +117,8 @@ fn category_from_body(body_str: &str) -> &'static str { /// and returns a `Map` of the flattened json /// this function is called recursively for each log record object in the otel logs pub fn flatten_log_record(log_record: &LogRecord) -> Map { - let mut log_record_json: Map = Map::new(); + let mut log_record_json: Map = + Map::with_capacity(24 + log_record.attributes.len()); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -144,16 +145,26 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { log_record_json.insert(key.clone(), value.clone()); // If value is a string that can be parsed as JSON object, extract its fields - if let Value::String(s) = value - && let Ok(parsed) = serde_json::from_str::(s) - && parsed.is_object() - && let Ok(flattened_values) = generic_flattening(&parsed) - { - for flattened_value in flattened_values { - if let Value::Object(flattened_obj) = flattened_value { - for (inner_key, inner_value) in flattened_obj { - let prefixed_key = format!("{key}_{inner_key}"); - log_record_json.insert(prefixed_key, inner_value); + // + if let Value::String(s) = value { + let trimmed = s.trim(); + let looks_like_json = trimmed.len() >= 2 + && matches!( + (trimmed.as_bytes().first(), trimmed.as_bytes().last()), + (Some(b'{'), Some(b'}')) | (Some(b'['), Some(b']')) + ); + // Skip speculative JSON parsing unless the body looks like structured JSON + if looks_like_json + && let Ok(parsed) = serde_json::from_str::(s) + && parsed.is_object() + && let Ok(flattened_values) = generic_flattening(&parsed) + { + for flattened_value in flattened_values { + if let Value::Object(flattened_obj) = flattened_value { + for (inner_key, inner_value) in flattened_obj { + let prefixed_key = format!("{key}_{inner_key}"); + log_record_json.insert(prefixed_key, inner_value); + } } } } @@ -185,7 +196,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { "log_record_dropped_attributes_count".to_string(), Value::Number(log_record.dropped_attributes_count.into()), ); - log_record_json.insert( "flags".to_string(), Value::Number((log_record.flags).into()), @@ -204,9 +214,14 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { /// this function flattens the `ScopeLogs` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_scope_log(scope_log: &ScopeLogs, tenant_id: &str) -> Vec> { - let mut vec_scope_log_json = Vec::new(); - let mut scope_log_json = Map::new(); +fn flatten_scope_log( + scope_log: &ScopeLogs, + tenant_id: &str, + resource_base_json: &Map, +) -> Vec> { + let mut vec_scope_log_json = Vec::with_capacity(scope_log.log_records.len()); + // resources and scope merged once + let mut scope_log_json = resource_base_json.clone(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_log_json.insert( @@ -219,11 +234,11 @@ fn flatten_scope_log(scope_log: &ScopeLogs, tenant_id: &str) -> Vec Vec { diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 7d6a44d63..3f5924ec9 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -25,7 +25,6 @@ use opentelemetry_proto::tonic::metrics::v1::{ use serde_json::{Map, Value}; use rustc_hash::FxHasher; -use std::borrow::Cow; use std::collections::HashSet; use std::hash::Hasher; use tracing::info_span; @@ -87,6 +86,37 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [ static OTEL_METRICS_KNOWN_FIELDS: Lazy> = Lazy::new(|| OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()); +/// To avoid heap allocations for non-string attributes (int/float/bool) +/// values are formatted via a zero-alloc [`StackBuf`] backed by a 32-byte +/// stack array. This limit is safely bounded by the maximum string length +/// of OTel `i64` (20 bytes) and `f64` (24 bytes) values +struct StackBuf { + data: [u8; 32], + len: usize, +} + +impl StackBuf { + fn new() -> Self { + Self { + data: [0u8; 32], + len: 0, + } + } + fn as_bytes(&self) -> &[u8] { + &self.data[..self.len] + } +} + +impl std::fmt::Write for StackBuf { + fn write_str(&mut self, s: &str) -> std::fmt::Result { + let bytes = s.as_bytes(); + let end = std::cmp::min(self.len + bytes.len(), 32); + self.data[self.len..end].copy_from_slice(&bytes[..end - self.len]); + self.len = end; + Ok(()) + } +} + /// Compute a stable u64 identifier for the physical series a sample /// belongs to. Hashes `metric_name` plus every attribute key/value pair /// that survived OTel flattening — everything in the flattened data @@ -97,18 +127,13 @@ static OTEL_METRICS_KNOWN_FIELDS: Lazy> = /// non-cryptographic) and feeds keys in sorted order so the hash /// doesn't depend on JSON Map iteration order. fn compute_series_hash(dp: &Map) -> u64 { - let mut label_pairs: Vec<(&str, Cow<'_, str>)> = Vec::with_capacity(dp.len()); - for (key, value) in dp { - if OTEL_METRICS_KNOWN_FIELDS.contains(key.as_str()) { - continue; + let mut keys: Vec<&str> = Vec::with_capacity(dp.len()); + for key in dp.keys() { + if !OTEL_METRICS_KNOWN_FIELDS.contains(key.as_str()) { + keys.push(key); } - let value = match value { - Value::String(s) => Cow::Borrowed(s.as_str()), - other => Cow::Owned(other.to_string()), - }; - label_pairs.push((key.as_str(), value)); } - label_pairs.sort_by(|a, b| a.0.cmp(b.0)); + keys.sort_unstable(); let mut hasher = FxHasher::default(); // Include metric_name in the identity. Without it, two different @@ -117,10 +142,28 @@ fn compute_series_hash(dp: &Map) -> u64 { hasher.write(name.as_bytes()); hasher.write(b"\0"); } - for (k, v) in &label_pairs { - hasher.write(k.as_bytes()); + for key in keys { + hasher.write(key.as_bytes()); hasher.write(b"="); - hasher.write(v.as_bytes()); + if let Some(val) = dp.get(key) { + match val { + Value::String(s) => hasher.write(s.as_bytes()), + Value::Bool(b) => { + if *b { + hasher.write(b"true"); + } else { + hasher.write(b"false"); + } + } + Value::Number(n) => { + // Zero- Alloc number formatting + let mut buf = StackBuf::new(); + let _ = std::fmt::Write::write_fmt(&mut buf, format_args!("{}", n)); + hasher.write(buf.as_bytes()); + } + _ => hasher.write(b"null"), + } + } hasher.write(b"\0"); } hasher.finish() @@ -172,7 +215,9 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { +fn flatten_scope_span( + scope_span: &ScopeSpans, + tenant_id: &str, + date: &str, +) -> Vec> { let mut vec_scope_span_json = Vec::new(); let mut scope_span_json = Map::new(); for span in &scope_span.spans { let span_record_json = flatten_span_record(span); vec_scope_span_json.extend(span_record_json); } - - let date = chrono::Utc::now().date_naive().to_string(); - increment_traces_collected_by_date(scope_span.spans.len() as u64, &date, tenant_id); + increment_traces_collected_by_date(scope_span.spans.len() as u64, date, tenant_id); if let Some(scope) = &scope_span.scope { scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); @@ -114,6 +116,7 @@ fn process_resource_spans( get_scope_spans: fn(&T) -> &[ScopeSpans], get_schema_url: fn(&T) -> &str, tenant_id: &str, + date: &str, ) -> Vec where T: std::fmt::Debug, @@ -135,7 +138,7 @@ where // Process scope spans let mut vec_resource_spans_json = Vec::new(); for scope_span in get_scope_spans(resource_span) { - let scope_span_json = flatten_scope_span(scope_span, tenant_id); + let scope_span_json = flatten_scope_span(scope_span, tenant_id, date); vec_resource_spans_json.extend(scope_span_json); } @@ -146,9 +149,9 @@ where ); // Merge resource-level fields into each span record - for resource_spans_json in &mut vec_resource_spans_json { + for mut resource_spans_json in vec_resource_spans_json { resource_spans_json.extend(resource_span_json.clone()); - vec_otel_json.push(Value::Object(resource_spans_json.clone())); + vec_otel_json.push(Value::Object(resource_spans_json)); } } @@ -160,24 +163,29 @@ pub fn flatten_otel_traces_protobuf( message: &ExportTraceServiceRequest, tenant_id: &str, ) -> Vec { + let date = chrono::Utc::now().date_naive().to_string(); + process_resource_spans( &message.resource_spans, |rs| rs.resource.as_ref(), |rs| &rs.scope_spans, |rs| &rs.schema_url, tenant_id, + &date, ) } /// this function performs the custom flattening of the otel traces event /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_traces(message: &TracesData, tenant_id: &str) -> Vec { + let date = chrono::Utc::now().date_naive().to_string(); process_resource_spans( &message.resource_spans, |rs| rs.resource.as_ref(), |rs| &rs.scope_spans, |rs| &rs.schema_url, tenant_id, + &date, ) } @@ -326,7 +334,8 @@ fn flatten_kind(kind: i32) -> Map { /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each span record object in the otel traces event fn flatten_span_record(span_record: &Span) -> Vec> { - let mut span_records_json = Vec::new(); + let total_records = span_record.events.len() + span_record.links.len(); + let mut span_records_json = Vec::with_capacity(total_records); let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(),