Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ fn category_from_body(body_str: &str) -> &'static str {
/// and returns a `Map` of the flattened json
/// this function is called recursively for each log record object in the otel logs
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
let mut log_record_json: Map<String, Value> = Map::new();
let mut log_record_json: Map<String, Value> =
Map::with_capacity(24 + log_record.attributes.len());
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(convert_epoch_nano_to_timestamp(
Expand All @@ -144,16 +145,26 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
log_record_json.insert(key.clone(), value.clone());

// If value is a string that can be parsed as JSON object, extract its fields
if let Value::String(s) = value
&& let Ok(parsed) = serde_json::from_str::<Value>(s)
&& parsed.is_object()
&& let Ok(flattened_values) = generic_flattening(&parsed)
{
for flattened_value in flattened_values {
if let Value::Object(flattened_obj) = flattened_value {
for (inner_key, inner_value) in flattened_obj {
let prefixed_key = format!("{key}_{inner_key}");
log_record_json.insert(prefixed_key, inner_value);
//
if let Value::String(s) = value {
let trimmed = s.trim();
let looks_like_json = trimmed.len() >= 2
&& matches!(
(trimmed.as_bytes().first(), trimmed.as_bytes().last()),
(Some(b'{'), Some(b'}')) | (Some(b'['), Some(b']'))
);
// Skip speculative JSON parsing unless the body looks like structured JSON
if looks_like_json
&& let Ok(parsed) = serde_json::from_str::<Value>(s)
&& parsed.is_object()
&& let Ok(flattened_values) = generic_flattening(&parsed)
{
for flattened_value in flattened_values {
if let Value::Object(flattened_obj) = flattened_value {
for (inner_key, inner_value) in flattened_obj {
let prefixed_key = format!("{key}_{inner_key}");
log_record_json.insert(prefixed_key, inner_value);
}
}
}
}
Expand Down Expand Up @@ -185,7 +196,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
"log_record_dropped_attributes_count".to_string(),
Value::Number(log_record.dropped_attributes_count.into()),
);

log_record_json.insert(
"flags".to_string(),
Value::Number((log_record.flags).into()),
Expand All @@ -204,9 +214,14 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {

/// this function flattens the `ScopeLogs` object
/// and returns a `Vec` of `Map` of the flattened json
fn flatten_scope_log(scope_log: &ScopeLogs, tenant_id: &str) -> Vec<Map<String, Value>> {
let mut vec_scope_log_json = Vec::new();
let mut scope_log_json = Map::new();
fn flatten_scope_log(
scope_log: &ScopeLogs,
tenant_id: &str,
resource_base_json: &Map<String, Value>,
) -> Vec<Map<String, Value>> {
let mut vec_scope_log_json = Vec::with_capacity(scope_log.log_records.len());
// resources and scope merged once
let mut scope_log_json = resource_base_json.clone();
if let Some(scope) = &scope_log.scope {
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
scope_log_json.insert(
Expand All @@ -219,11 +234,11 @@ fn flatten_scope_log(scope_log: &ScopeLogs, tenant_id: &str) -> Vec<Map<String,
Value::Number(scope.dropped_attributes_count.into()),
);
}

scope_log_json.insert(
"scope_log_schema_url".to_string(),
Value::String(scope_log.schema_url.clone()),
);

for log_record in &scope_log.log_records {
let log_record_json = flatten_log_record(log_record);
let mut combined_json = scope_log_json.clone();
Expand Down Expand Up @@ -262,20 +277,22 @@ where
);
}

resource_log_json.insert(
"schema_url".to_string(),
Value::String(get_schema_url(resource_log).to_string()),
);
let mut vec_resource_logs_json = Vec::new();
let scope_logs = get_scope_logs(resource_log);

for scope_log in scope_logs {
vec_resource_logs_json.extend(flatten_scope_log(scope_log, tenant_id));
vec_resource_logs_json.extend(flatten_scope_log(
scope_log,
tenant_id,
&resource_log_json,
));
}

resource_log_json.insert(
"schema_url".to_string(),
Value::String(get_schema_url(resource_log).to_string()),
);

for resource_logs_json in &mut vec_resource_logs_json {
resource_logs_json.extend(resource_log_json.clone());
vec_otel_json.push(Value::Object(resource_logs_json.clone()));
Comment thread
ygndotgg marked this conversation as resolved.
}
}
Expand All @@ -292,7 +309,6 @@ pub fn flatten_otel_protobuf(message: &ExportLogsServiceRequest, tenant_id: &str
tenant_id,
)
}

/// this function performs the custom flattening of the otel logs
/// and returns a `Vec` of `Value::Object` of the flattened json
pub fn flatten_otel_logs(message: &LogsData, tenant_id: &str) -> Vec<Value> {
Expand Down
75 changes: 60 additions & 15 deletions src/otel/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use opentelemetry_proto::tonic::metrics::v1::{
use serde_json::{Map, Value};

use rustc_hash::FxHasher;
use std::borrow::Cow;
use std::collections::HashSet;
use std::hash::Hasher;
use tracing::info_span;
Expand Down Expand Up @@ -87,6 +86,37 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [
static OTEL_METRICS_KNOWN_FIELDS: Lazy<HashSet<&'static str>> =
Lazy::new(|| OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect());

/// To avoid heap allocations for non-string attributes (int/float/bool)
/// values are formatted via a zero-alloc [`StackBuf`] backed by a 32-byte
/// stack array. This limit is safely bounded by the maximum string length
/// of OTel `i64` (20 bytes) and `f64` (24 bytes) values
struct StackBuf {
data: [u8; 32],
len: usize,
}

impl StackBuf {
fn new() -> Self {
Self {
data: [0u8; 32],
len: 0,
}
}
fn as_bytes(&self) -> &[u8] {
&self.data[..self.len]
}
}

impl std::fmt::Write for StackBuf {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
let bytes = s.as_bytes();
let end = std::cmp::min(self.len + bytes.len(), 32);
self.data[self.len..end].copy_from_slice(&bytes[..end - self.len]);
self.len = end;
Ok(())
}
}

/// Compute a stable u64 identifier for the physical series a sample
/// belongs to. Hashes `metric_name` plus every attribute key/value pair
/// that survived OTel flattening — everything in the flattened data
Expand All @@ -97,18 +127,13 @@ static OTEL_METRICS_KNOWN_FIELDS: Lazy<HashSet<&'static str>> =
/// non-cryptographic) and feeds keys in sorted order so the hash
/// doesn't depend on JSON Map iteration order.
fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
let mut label_pairs: Vec<(&str, Cow<'_, str>)> = Vec::with_capacity(dp.len());
for (key, value) in dp {
if OTEL_METRICS_KNOWN_FIELDS.contains(key.as_str()) {
continue;
let mut keys: Vec<&str> = Vec::with_capacity(dp.len());
for key in dp.keys() {
if !OTEL_METRICS_KNOWN_FIELDS.contains(key.as_str()) {
keys.push(key);
}
let value = match value {
Value::String(s) => Cow::Borrowed(s.as_str()),
other => Cow::Owned(other.to_string()),
};
label_pairs.push((key.as_str(), value));
}
label_pairs.sort_by(|a, b| a.0.cmp(b.0));
keys.sort_unstable();

let mut hasher = FxHasher::default();
// Include metric_name in the identity. Without it, two different
Expand All @@ -117,10 +142,28 @@ fn compute_series_hash(dp: &Map<String, Value>) -> u64 {
hasher.write(name.as_bytes());
hasher.write(b"\0");
}
for (k, v) in &label_pairs {
hasher.write(k.as_bytes());
for key in keys {
hasher.write(key.as_bytes());
hasher.write(b"=");
hasher.write(v.as_bytes());
if let Some(val) = dp.get(key) {
match val {
Value::String(s) => hasher.write(s.as_bytes()),
Value::Bool(b) => {
if *b {
hasher.write(b"true");
} else {
hasher.write(b"false");
}
}
Value::Number(n) => {
// Zero- Alloc number formatting
let mut buf = StackBuf::new();
let _ = std::fmt::Write::write_fmt(&mut buf, format_args!("{}", n));
hasher.write(buf.as_bytes());
}
_ => hasher.write(b"null"),
}
}
hasher.write(b"\0");
}
hasher.finish()
Expand Down Expand Up @@ -172,7 +215,9 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec<Map<String
data_points
.iter()
.map(|data_point| {
let mut data_point_json = Map::with_capacity(data_point.attributes.len() + 8);
let mut data_point_json = Map::with_capacity(
data_point.attributes.len() + 8 + (data_point.exemplars.len() * 4),
);
insert_attributes(&mut data_point_json, &data_point.attributes);
data_point_json.insert(
"start_time_unix_nano".to_string(),
Expand Down
25 changes: 17 additions & 8 deletions src/otel/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,18 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 32] = [
];
/// this function flattens the `ScopeSpans` object
/// and returns a `Vec` of `Map` of the flattened json
fn flatten_scope_span(scope_span: &ScopeSpans, tenant_id: &str) -> Vec<Map<String, Value>> {
fn flatten_scope_span(
scope_span: &ScopeSpans,
tenant_id: &str,
date: &str,
) -> Vec<Map<String, Value>> {
let mut vec_scope_span_json = Vec::new();
let mut scope_span_json = Map::new();
for span in &scope_span.spans {
let span_record_json = flatten_span_record(span);
vec_scope_span_json.extend(span_record_json);
}

let date = chrono::Utc::now().date_naive().to_string();
increment_traces_collected_by_date(scope_span.spans.len() as u64, &date, tenant_id);
increment_traces_collected_by_date(scope_span.spans.len() as u64, date, tenant_id);

if let Some(scope) = &scope_span.scope {
scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
Expand Down Expand Up @@ -114,6 +116,7 @@ fn process_resource_spans<T>(
get_scope_spans: fn(&T) -> &[ScopeSpans],
get_schema_url: fn(&T) -> &str,
tenant_id: &str,
date: &str,
) -> Vec<Value>
where
T: std::fmt::Debug,
Expand All @@ -135,7 +138,7 @@ where
// Process scope spans
let mut vec_resource_spans_json = Vec::new();
for scope_span in get_scope_spans(resource_span) {
let scope_span_json = flatten_scope_span(scope_span, tenant_id);
let scope_span_json = flatten_scope_span(scope_span, tenant_id, date);
vec_resource_spans_json.extend(scope_span_json);
}

Expand All @@ -146,9 +149,9 @@ where
);

// Merge resource-level fields into each span record
for resource_spans_json in &mut vec_resource_spans_json {
for mut resource_spans_json in vec_resource_spans_json {
resource_spans_json.extend(resource_span_json.clone());
vec_otel_json.push(Value::Object(resource_spans_json.clone()));
vec_otel_json.push(Value::Object(resource_spans_json));
}
}

Expand All @@ -160,24 +163,29 @@ pub fn flatten_otel_traces_protobuf(
message: &ExportTraceServiceRequest,
tenant_id: &str,
) -> Vec<Value> {
let date = chrono::Utc::now().date_naive().to_string();

process_resource_spans(
&message.resource_spans,
|rs| rs.resource.as_ref(),
|rs| &rs.scope_spans,
|rs| &rs.schema_url,
tenant_id,
&date,
)
}

/// this function performs the custom flattening of the otel traces event
/// and returns a `Vec` of `Value::Object` of the flattened json
pub fn flatten_otel_traces(message: &TracesData, tenant_id: &str) -> Vec<Value> {
let date = chrono::Utc::now().date_naive().to_string();
process_resource_spans(
&message.resource_spans,
|rs| rs.resource.as_ref(),
|rs| &rs.scope_spans,
|rs| &rs.schema_url,
tenant_id,
&date,
)
}

Expand Down Expand Up @@ -326,7 +334,8 @@ fn flatten_kind(kind: i32) -> Map<String, Value> {
/// and returns a `Vec` of `Map` of the flattened json
/// this function is called recursively for each span record object in the otel traces event
fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
let mut span_records_json = Vec::new();
let total_records = span_record.events.len() + span_record.links.len();
let mut span_records_json = Vec::with_capacity(total_records);
let mut span_record_json = Map::new();
span_record_json.insert(
"span_trace_id".to_string(),
Expand Down
Loading