Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions libs/opsqueue_python/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ pub struct Submission {
pub chunks_total: u64,
pub chunks_done: u64,
pub metadata: Option<submission::Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
}

impl From<opsqueue::common::submission::Submission> for Submission {
Expand All @@ -395,6 +396,7 @@ impl From<opsqueue::common::submission::Submission> for Submission {
chunks_total: value.chunks_total.into(),
chunks_done: value.chunks_done.into(),
metadata: value.metadata,
strategic_metadata: value.strategic_metadata,
}
}
}
Expand All @@ -403,11 +405,12 @@ impl From<opsqueue::common::submission::Submission> for Submission {
impl Submission {
fn __repr__(&self) -> String {
format!(
"Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?})",
"Submission(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?})",
self.id.__repr__(),
self.chunks_total,
self.chunks_done,
self.metadata
self.metadata,
self.strategic_metadata
)
}
}
Expand Down
62 changes: 49 additions & 13 deletions opsqueue/src/common/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::Display;
use std::time::Duration;

use crate::common::StrategicMetadataMap;
#[allow(unused_imports)]
use crate::E;
use chrono::{DateTime, Utc};
Comment on lines 5 to 8
use ux::u63;
Expand Down Expand Up @@ -151,6 +152,7 @@ pub struct Submission {
pub chunks_done: ChunkCount,
pub chunk_size: ChunkSize,
pub metadata: Option<Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub otel_trace_carrier: String,
}

Expand Down Expand Up @@ -224,6 +226,7 @@ impl Submission {
chunks_done: ChunkCount::zero(),
chunk_size: ChunkSize::default(),
metadata: None,
strategic_metadata: None,
otel_trace_carrier,
}
}
Expand All @@ -243,6 +246,7 @@ impl Submission {
chunks_done: ChunkCount::zero(),
chunk_size,
metadata,
strategic_metadata: None,
otel_trace_carrier,
};
let chunks = chunks
Expand All @@ -268,12 +272,18 @@ pub mod db {
db::{Connection, True, WriterConnection, WriterPool},
};
use chunk::ChunkSize;
use sqlx::{query, query_as, Sqlite};
use sqlx::{query, Sqlite};

use axum_prometheus::metrics::{counter, histogram};

use super::*;

fn normalize_strategic_metadata(
strategic_metadata: Option<sqlx::types::Json<StrategicMetadataMap>>,
) -> Option<StrategicMetadataMap> {
strategic_metadata.and_then(|json| (!json.0.is_empty()).then_some(json.0))
}
Comment on lines +281 to +285

impl<'q> sqlx::Encode<'q, Sqlite> for SubmissionId {
fn encode_by_ref(
&self,
Expand Down Expand Up @@ -421,6 +431,7 @@ pub mod db {
chunks_done: ChunkCount::zero(),
chunk_size,
metadata,
strategic_metadata: None,
otel_trace_carrier,
};
let iter = chunks_contents
Expand Down Expand Up @@ -461,25 +472,37 @@ pub mod db {
id: SubmissionId,
mut conn: impl Connection,
) -> Result<Submission, E<DatabaseError, SubmissionNotFound>> {
let submission = query_as!(
Submission,
let submission_row = query!(
r#"
SELECT id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunks_done AS "chunks_done: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, otel_trace_carrier
FROM submissions WHERE id = $1
"#,
id
)
.fetch_optional(conn.get_inner())
.await?;
match submission {
match submission_row {
None => Err(E::R(SubmissionNotFound(id))),
Some(submission) => Ok(submission),
Some(row) => Ok(Submission {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunks_done: row.chunks_done,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: normalize_strategic_metadata(row.strategic_metadata),
otel_trace_carrier: row.otel_trace_carrier,
}),
}
}

Expand Down Expand Up @@ -535,24 +558,37 @@ pub mod db {
// NOTE: The order is important here; a concurrent writer could move a submission
// from InProgress to Completed/Failed in-between the queries.

let submission = query_as!(
Submission,
let submission_row = query!(
r#"
SELECT
id AS "id: SubmissionId"
, prefix
, chunks_total AS "chunks_total: ChunkCount"
, chunks_done AS "chunks_done: ChunkCount"
, chunk_size AS "chunk_size: ChunkSize"
, chunk_size AS "chunk_size!: ChunkSize"
, metadata
, ( SELECT json_group_object(metadata_key, metadata_value)
FROM submissions_metadata
WHERE submission_id = submissions.id
) AS "strategic_metadata: sqlx::types::Json<StrategicMetadataMap>"
, otel_trace_carrier
FROM submissions WHERE id = $1
"#,
id
)
.fetch_optional(conn.get_inner())
.await?;
if let Some(submission) = submission {
if let Some(row) = submission_row {
let submission = Submission {
id: row.id,
prefix: row.prefix,
chunks_total: row.chunks_total,
chunks_done: row.chunks_done,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: normalize_strategic_metadata(row.strategic_metadata),
otel_trace_carrier: row.otel_trace_carrier,
};
return Ok(Some(SubmissionStatus::InProgress(submission)));
}

Expand Down Expand Up @@ -583,7 +619,7 @@ pub mod db {
chunks_total: row.chunks_total,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
strategic_metadata: normalize_strategic_metadata(row.strategic_metadata),
completed_at: row.completed_at,
otel_trace_carrier: row.otel_trace_carrier,
};
Expand Down Expand Up @@ -620,7 +656,7 @@ pub mod db {
chunks_done: row.chunks_done,
chunk_size: row.chunk_size,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
strategic_metadata: normalize_strategic_metadata(row.strategic_metadata),
failed_at: row.failed_at,
failed_chunk_id: row.failed_chunk_id,
otel_trace_carrier: row.otel_trace_carrier,
Expand Down Expand Up @@ -659,7 +695,7 @@ pub mod db {
chunks_total: row.chunks_total,
chunks_done: row.chunks_done,
metadata: row.metadata,
strategic_metadata: row.strategic_metadata.map(|json| json.0),
strategic_metadata: normalize_strategic_metadata(row.strategic_metadata),
cancelled_at: row.cancelled_at,
};
return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission)));
Expand Down
Loading