From e77ca395a4bb8fa783743a2ad5c3c23abc05708d Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Wed, 17 Jun 2026 17:09:14 +0200 Subject: [PATCH 1/2] [CRE-1299] - support white listed requests --- libs/opsqueue_python/src/common.rs | 7 +++-- opsqueue/src/common/submission.rs | 50 ++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index f19eaa6..c3d6da8 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -386,6 +386,7 @@ pub struct Submission { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, + pub strategic_metadata: Option, } impl From for Submission { @@ -395,6 +396,7 @@ impl From for Submission { chunks_total: value.chunks_total.into(), chunks_done: value.chunks_done.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, } } } @@ -403,11 +405,12 @@ impl From for Submission { impl Submission { fn __repr__(&self) -> String { format!( - "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?})", + "Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?})", self.id.__repr__(), self.chunks_total, self.chunks_done, - self.metadata + self.metadata, + self.strategic_metadata ) } } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 69040f1..2ba8063 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -3,6 +3,7 @@ use std::fmt::Display; use std::time::Duration; use crate::common::StrategicMetadataMap; +#[allow(unused_imports)] use crate::E; use chrono::{DateTime, Utc}; use ux::u63; @@ -151,6 +152,7 @@ pub struct Submission { pub chunks_done: ChunkCount, pub chunk_size: ChunkSize, pub metadata: Option, + pub strategic_metadata: Option, pub otel_trace_carrier: String, } @@ -224,6 +226,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size: ChunkSize::default(), metadata: None, + strategic_metadata: None, otel_trace_carrier, } } @@ -243,6 +246,7 @@ impl Submission { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata: None, otel_trace_carrier, }; let chunks = chunks @@ -268,7 +272,7 @@ pub mod db { db::{Connection, True, WriterConnection, WriterPool}, }; use chunk::ChunkSize; - use sqlx::{query, query_as, Sqlite}; + use sqlx::{query, Sqlite}; use axum_prometheus::metrics::{counter, histogram}; @@ -421,6 +425,7 @@ pub mod db { chunks_done: ChunkCount::zero(), chunk_size, metadata, + strategic_metadata: None, otel_trace_carrier, }; let iter = chunks_contents @@ -461,15 +466,18 @@ pub mod db { id: SubmissionId, mut conn: impl Connection, ) -> Result> { - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -477,9 +485,18 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - match submission { + match submission_row { None => Err(E::R(SubmissionNotFound(id))), - Some(submission) => Ok(submission), + Some(row) => Ok(Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + otel_trace_carrier: row.otel_trace_carrier, + }), } } @@ -535,16 +552,19 @@ pub mod db { // NOTE: The order is important here; a concurrent writer could move a submission // from InProgress to Completed/Failed in-between the queries. - let submission = query_as!( - Submission, + let submission_row = query!( r#" SELECT id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" - , chunk_size AS "chunk_size: ChunkSize" + , chunk_size AS "chunk_size!: ChunkSize" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions.id + ) AS "strategic_metadata: sqlx::types::Json" , otel_trace_carrier FROM submissions WHERE id = $1 "#, @@ -552,7 +572,17 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(submission) = submission { + if let Some(row) = submission_row { + let submission = Submission { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + chunk_size: row.chunk_size, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + otel_trace_carrier: row.otel_trace_carrier, + }; return Ok(Some(SubmissionStatus::InProgress(submission))); } From b8151e3b7c68bfdfa3f74d7dff0d229cdafdd060 Mon Sep 17 00:00:00 2001 From: Alessandro Gallo Date: Wed, 17 Jun 2026 17:25:47 +0200 Subject: [PATCH 2/2] wip --- opsqueue/src/common/submission.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 2ba8063..d7ac22d 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -278,6 +278,12 @@ pub mod db { use super::*; + fn normalize_strategic_metadata( + strategic_metadata: Option>, + ) -> Option { + strategic_metadata.and_then(|json| (!json.0.is_empty()).then_some(json.0)) + } + impl<'q> sqlx::Encode<'q, Sqlite> for SubmissionId { fn encode_by_ref( &self, @@ -494,7 +500,7 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: normalize_strategic_metadata(row.strategic_metadata), otel_trace_carrier: row.otel_trace_carrier, }), } @@ -580,7 +586,7 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: normalize_strategic_metadata(row.strategic_metadata), otel_trace_carrier: row.otel_trace_carrier, }; return Ok(Some(SubmissionStatus::InProgress(submission))); @@ -613,7 +619,7 @@ pub mod db { chunks_total: row.chunks_total, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: normalize_strategic_metadata(row.strategic_metadata), completed_at: row.completed_at, otel_trace_carrier: row.otel_trace_carrier, }; @@ -650,7 +656,7 @@ pub mod db { chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: normalize_strategic_metadata(row.strategic_metadata), failed_at: row.failed_at, failed_chunk_id: row.failed_chunk_id, otel_trace_carrier: row.otel_trace_carrier, @@ -689,7 +695,7 @@ pub mod db { chunks_total: row.chunks_total, chunks_done: row.chunks_done, metadata: row.metadata, - strategic_metadata: row.strategic_metadata.map(|json| json.0), + strategic_metadata: normalize_strategic_metadata(row.strategic_metadata), cancelled_at: row.cancelled_at, }; return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission)));