From d6acc4b730cd3379976c1bccca4e0f5e50cd8031 Mon Sep 17 00:00:00 2001 From: Michael Johansen Date: Wed, 17 Jun 2026 08:59:14 -0500 Subject: [PATCH] Update batch publishing to fallback to t0/now if timestamp is unset Signed-off-by: Michael Johansen --- src/ni/datastore/data/_data_store_client.py | 8 +- src/ni/datastore/data/_grpc_conversion.py | 46 ++++++++++ tests/unit/data/test_publish_measurement.py | 97 +++++++++++++++++++++ 3 files changed, 147 insertions(+), 4 deletions(-) diff --git a/src/ni/datastore/data/_data_store_client.py b/src/ni/datastore/data/_data_store_client.py index 6cde16b..5f96db2 100644 --- a/src/ni/datastore/data/_data_store_client.py +++ b/src/ni/datastore/data/_data_store_client.py @@ -31,14 +31,12 @@ ReadConditionValueRequest, ReadMeasurementValueRequest, ) -from ni.protobuf.types.precision_timestamp_conversion import ( - hightime_datetime_to_protobuf, -) from ni_grpc_extensions.channelpool import GrpcChannelPool from ni.datastore.data._grpc_conversion import ( convert_read_condition_response_from_protobuf, convert_read_measurement_response_from_protobuf, + get_publish_measurement_batch_timestamps, get_publish_measurement_timestamp, populate_publish_condition_batch_request_values, populate_publish_condition_request_value, @@ -347,7 +345,6 @@ def publish_measurement_batch( publish_request = PublishMeasurementBatchRequest( name=name, step_id=step_id, - timestamps=[hightime_datetime_to_protobuf(ts) for ts in timestamps], outcomes=[outcome.to_protobuf() for outcome in outcomes], error_information=( [ei.to_protobuf() for ei in (error_information or [])] if error_information else [] @@ -358,6 +355,9 @@ def publish_measurement_batch( notes=notes, ) populate_publish_measurement_batch_request_values(publish_request, values) + publish_request.timestamps.extend( + get_publish_measurement_batch_timestamps(publish_request, timestamps) + ) publish_response = self._get_data_store_client().publish_measurement_batch(publish_request) return publish_response.measurement_ids diff --git a/src/ni/datastore/data/_grpc_conversion.py b/src/ni/datastore/data/_grpc_conversion.py index 70bb863..91ae477 100644 --- a/src/ni/datastore/data/_grpc_conversion.py +++ b/src/ni/datastore/data/_grpc_conversion.py @@ -393,3 +393,49 @@ def get_publish_measurement_timestamp( if no_client_timestamp_provided: publish_time = waveform_t0 return publish_time + + +def get_publish_measurement_batch_timestamps( + publish_request: PublishMeasurementBatchRequest, + client_provided_timestamps: Iterable[ht.datetime], +) -> list[PrecisionTimestamp]: + """Determine the correct timestamps to use for publishing a measurement batch.""" + client_timestamps = [hightime_datetime_to_protobuf(ts) for ts in client_provided_timestamps] + + waveform_t0s: list[PrecisionTimestamp] = [] + value_case = publish_request.WhichOneof("values") + if value_case == "double_analog_waveform_values": + waveform_t0s = [w.t0 for w in publish_request.double_analog_waveform_values.waveforms] + elif value_case == "i16_analog_waveform_values": + waveform_t0s = [w.t0 for w in publish_request.i16_analog_waveform_values.waveforms] + elif value_case == "double_complex_waveform_values": + waveform_t0s = [w.t0 for w in publish_request.double_complex_waveform_values.waveforms] + elif value_case == "i16_complex_waveform_values": + waveform_t0s = [w.t0 for w in publish_request.i16_complex_waveform_values.waveforms] + elif value_case == "digital_waveform_values": + waveform_t0s = [w.t0 for w in publish_request.digital_waveform_values.waveforms] + + # Determining count here accounts for the case where the user passes in less + # timestamps than the number of waveforms in the batch. In that case, we will + # backfill the "missing" timestamps with waveform t0 or "now". + # TODO: If the user passes in more timestamps than the number of waveforms, + # we'll end up passing that same larger number of timestamps into the publish + # request. Is this OK? Is the server going to get confused about this? + count = max(len(client_timestamps), len(waveform_t0s)) + now = hightime_datetime_to_protobuf(ht.datetime.now(std_datetime.timezone.utc)) + default_t0 = PrecisionTimestamp() + + publish_times: list[PrecisionTimestamp] = [] + for i in range(count): + if i < len(client_timestamps): + publish_times.append(client_timestamps[i]) + else: + t0 = waveform_t0s[i] if i < len(waveform_t0s) else None + # If an initialized waveform t0 value is present and no client timestamp was + # provided, use the waveform t0 as the measurement start time. + if t0 is not None and t0 != default_t0: + publish_times.append(t0) + else: + publish_times.append(now) + + return publish_times diff --git a/tests/unit/data/test_publish_measurement.py b/tests/unit/data/test_publish_measurement.py index b4c9f7f..bcebef4 100644 --- a/tests/unit/data/test_publish_measurement.py +++ b/tests/unit/data/test_publish_measurement.py @@ -206,6 +206,33 @@ def test___publish_analog_waveform_data_without_timestamp_parameter___uses_wavef assert request.timestamp == hightime_datetime_to_protobuf(timestamp) +def test___batch_publish_analog_waveform_data_without_timestamp_parameter___uses_waveform_t0s( + data_store_client: DataStoreClient, + mocked_data_store_service_client: NonCallableMock, +) -> None: + timestamp = datetime.now(tz=std_datetime.timezone.utc) + waveform_values = [1.0, 2.0, 3.0] + analog_waveforms = [ + AnalogWaveform( + sample_count=len(waveform_values), + raw_data=np.array(waveform_values, dtype=np.float64), + timing=Timing.create_with_regular_interval(timedelta(seconds=1), timestamp), + ), + ] + expected_response = PublishMeasurementBatchResponse(measurement_ids=["response_id"]) + mocked_data_store_service_client.publish_measurement_batch.return_value = expected_response + + measurement_ids = data_store_client.publish_measurement_batch( + "name", analog_waveforms, "step_id" + ) + + args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args + request = cast(PublishMeasurementBatchRequest, args[0]) + assert next(iter(measurement_ids)) == "response_id" + timestamp_proto = hightime_datetime_to_protobuf(timestamp) + assert request.timestamps == [timestamp_proto] + + def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter( data_store_client: DataStoreClient, mocked_data_store_service_client: NonCallableMock, @@ -225,6 +252,29 @@ def test___publish_analog_waveform_data_without_t0___uses_timestamp_parameter( assert request.timestamp == hightime_datetime_to_protobuf(timestamp) +def test___batch_publish_analog_waveform_data_without_t0___uses_timestamps_parameter( + data_store_client: DataStoreClient, + mocked_data_store_service_client: NonCallableMock, +) -> None: + timestamp = datetime.now(tz=std_datetime.timezone.utc) + analog_waveforms = [AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float)] + publish_measurement_batch_response = PublishMeasurementBatchResponse( + measurement_ids=["response_id"] + ) + mocked_data_store_service_client.publish_measurement_batch.return_value = ( + publish_measurement_batch_response + ) + + measurement_ids = data_store_client.publish_measurement_batch( + "name", analog_waveforms, "step_id", [timestamp] + ) + + args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args + request = cast(PublishMeasurementBatchRequest, args[0]) + assert measurement_ids == ["response_id"] + assert request.timestamps == [hightime_datetime_to_protobuf(timestamp)] + + def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___uses_provided_timestamp( data_store_client: DataStoreClient, mocked_data_store_service_client: NonCallableMock, @@ -251,6 +301,34 @@ def test___publish_analog_waveform_data_with_mismatched_timestamp_parameter___us assert request.timestamp == hightime_datetime_to_protobuf(mismatched_timestamp) +def test___batch_publish_analog_waveform_data_with_mismatched_timestamp_parameter___uses_provided_timestamps( + data_store_client: DataStoreClient, + mocked_data_store_service_client: NonCallableMock, +) -> None: + timestamp = datetime.now(tz=std_datetime.timezone.utc) + waveform_values = [1.0, 2.0, 3.0] + analog_waveforms = [ + AnalogWaveform( + sample_count=len(waveform_values), + raw_data=np.array(waveform_values, dtype=np.float64), + timing=Timing.create_with_regular_interval(timedelta(seconds=1), timestamp), + ) + ] + mismatched_timestamp = timestamp + timedelta(seconds=1) + mocked_data_store_service_client.publish_measurement_batch.return_value = ( + PublishMeasurementBatchResponse(measurement_ids=["response_id"]) + ) + + measurement_ids = data_store_client.publish_measurement_batch( + "name", analog_waveforms, "step_id", [mismatched_timestamp] + ) + + args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args + request = cast(PublishMeasurementBatchRequest, args[0]) + assert measurement_ids == ["response_id"] + assert request.timestamps == [hightime_datetime_to_protobuf(mismatched_timestamp)] + + def test___publish_analog_waveform_data_without_t0_or_timestamp___uses_now( data_store_client: DataStoreClient, mocked_data_store_service_client: NonCallableMock, @@ -270,6 +348,25 @@ def test___publish_analog_waveform_data_without_t0_or_timestamp___uses_now( assert request.timestamp == hightime_datetime_to_protobuf(now) +def test___batch_publish_analog_waveform_data_without_t0_or_timestamp___uses_now( + data_store_client: DataStoreClient, + mocked_data_store_service_client: NonCallableMock, +) -> None: + now = datetime.now(tz=std_datetime.timezone.utc) + analog_waveforms = [AnalogWaveform.from_array_1d([1.0, 2.0, 3.0], dtype=float)] + mocked_data_store_service_client.publish_measurement_batch.return_value = ( + PublishMeasurementBatchResponse(measurement_ids=["response_id"]) + ) + + with unittest.mock.patch("ni.datastore.data._grpc_conversion.ht.datetime") as mock_ht_datetime: + mock_ht_datetime.now.return_value = now + data_store_client.publish_measurement_batch("name", analog_waveforms, "step_id") + + args, __ = mocked_data_store_service_client.publish_measurement_batch.call_args + request = cast(PublishMeasurementBatchRequest, args[0]) + assert request.timestamps == [hightime_datetime_to_protobuf(now)] + + def test___none___publish_measurement___raises_type_error( data_store_client: DataStoreClient, ) -> None: