From 49fafc325bae1e844ee41c356fc2fde41d3407a7 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 08:34:08 +0200 Subject: [PATCH 1/6] feat(ray): Support span streaming --- sentry_sdk/integrations/ray.py | 130 +++++++---- tests/integrations/ray/test_ray.py | 353 ++++++++++++++++++++++------- 2 files changed, 363 insertions(+), 120 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 4896f7e946..32140b0f51 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -5,7 +5,9 @@ import sentry_sdk from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( event_from_exception, logger, @@ -88,25 +90,49 @@ def new_func( ) -> "Any": _check_sentry_initialized() - transaction = sentry_sdk.continue_trace( - _sentry_tracing or {}, - op=OP.QUEUE_TASK_RAY, - name=qualname_from_function(user_f), - origin=RayIntegration.origin, - source=TransactionSource.TASK, + span_streaming = has_span_streaming_enabled( + sentry_sdk.get_client().options ) - - with sentry_sdk.start_transaction(transaction) as transaction: - try: - result = user_f(*f_args, **f_kwargs) - transaction.set_status(SPANSTATUS.OK) - except Exception: - transaction.set_status(SPANSTATUS.INTERNAL_ERROR) - exc_info = sys.exc_info() - _capture_exception(exc_info) - reraise(*exc_info) - - return result + if span_streaming: + sentry_sdk.traces.continue_trace(_sentry_tracing or {}) + + with sentry_sdk.traces.start_span( + name=qualname_from_function(user_f), + attributes={ + "sentry.op": OP.QUEUE_TASK_RAY, + "sentry.origin": RayIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + }, + parent_span=None, + ): + try: + result = user_f(*f_args, **f_kwargs) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result + else: + transaction = sentry_sdk.continue_trace( + _sentry_tracing or {}, + op=OP.QUEUE_TASK_RAY, + name=qualname_from_function(user_f), + origin=RayIntegration.origin, + source=TransactionSource.TASK, + ) + + with sentry_sdk.start_transaction(transaction) as transaction: + try: + result = user_f(*f_args, **f_kwargs) + transaction.set_status(SPANSTATUS.OK) + except Exception: + transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result _insert_sentry_tracing_in_signature(new_func) @@ -122,27 +148,53 @@ def _remote_method_with_header_propagation( """ Ray Client """ - with sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_RAY, - name=qualname_from_function(user_f), - origin=RayIntegration.origin, - ) as span: - tracing = { - k: v - for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() - } - try: - result = old_remote_method( - *args, **kwargs, _sentry_tracing=tracing - ) - span.set_status(SPANSTATUS.OK) - except Exception: - span.set_status(SPANSTATUS.INTERNAL_ERROR) - exc_info = sys.exc_info() - _capture_exception(exc_info) - reraise(*exc_info) - - return result + span_streaming = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) + if span_streaming: + with sentry_sdk.traces.start_span( + name=qualname_from_function(user_f), + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_RAY, + "sentry.origin": RayIntegration.origin, + }, + ): + tracing = { + k: v + for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() + } + try: + result = old_remote_method( + *args, **kwargs, _sentry_tracing=tracing + ) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result + else: + with sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_RAY, + name=qualname_from_function(user_f), + origin=RayIntegration.origin, + ) as span: + tracing = { + k: v + for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() + } + try: + result = old_remote_method( + *args, **kwargs, _sentry_tracing=tracing + ) + span.set_status(SPANSTATUS.OK) + except Exception: + span.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result rv.remote = _remote_method_with_header_propagation diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 4cc413eada..8240b405e1 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -9,6 +9,8 @@ import sentry_sdk from sentry_sdk.envelope import Envelope from sentry_sdk.integrations.ray import RayIntegration +from sentry_sdk.integrations.stdlib import StdlibIntegration +from sentry_sdk.traces import SegmentSource from tests.conftest import TestTransport @@ -36,14 +38,26 @@ def setup_sentry_with_logging_transport(): setup_sentry(transport=RayLoggingTransport()) -def setup_sentry(transport=None): +def setup_sentry_with_logging_transport_and_span_streaming(): + setup_sentry(span_streaming=True, transport=RayLoggingTransport()) + + +def setup_sentry(span_streaming=False, transport=None): + if span_streaming: + sentry_sdk._span_batcher.SpanBatcher.MAX_BEFORE_FLUSH = 1 + sentry_sdk.init( integrations=[RayIntegration()], transport=RayTestTransport() if transport is None else transport, traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) +def setup_sentry_with_span_streaming(): + setup_sentry(span_streaming=True) + + def read_error_from_log(job_id, ray_temp_dir): # Find the actual session directory that Ray created session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")] @@ -62,29 +76,49 @@ def read_error_from_log(job_id, ray_temp_dir): if "worker" in f and job_id in f and f.endswith(".out") ][0] + next_line_is_event = False with open(os.path.join(log_dir, log_file), "r") as file: - lines = file.readlines() + for line in file: + try: + payload = json.loads(line) + except ValueError: + continue - try: - # parse error object from log line - error = json.loads(lines[4][:-1]) - except IndexError: - error = None + if next_line_is_event: + return payload - return error + if isinstance(payload, dict) and payload.get("type") == "event": + next_line_is_event = True + return None -def example_task(): - with sentry_sdk.start_span(op="task", name="example task step"): - ... + +def example_task(span_streaming: bool): + if span_streaming: + with sentry_sdk.traces.start_span( + name="example task step", + attributes={ + "sentry.op": "task", + }, + ): + ... + else: + with sentry_sdk.start_span(op="task", name="example task step"): + ... return sentry_sdk.get_client().transport.envelopes # RayIntegration must leave variadic keyword arguments at the end -def example_task_with_kwargs(**kwargs): - with sentry_sdk.start_span(op="task", name="example task step"): - ... +def example_task_with_kwargs(span_streaming: bool, **kwargs): + if span_streaming: + with sentry_sdk.traces.start_span( + name="example task step", attributes={"sentry.op": "task"} + ): + ... + else: + with sentry_sdk.start_span(op="task", name="example task step"): + ... return sentry_sdk.get_client().transport.envelopes @@ -96,12 +130,21 @@ def example_task_with_kwargs(**kwargs): "task", [example_task, example_task_with_kwargs], ) -def test_tracing_in_ray_tasks(task_options, task): - setup_sentry() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_tracing_in_ray_tasks(task_options, task, span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry, + "worker_process_setup_hook": setup_sentry_with_span_streaming + if span_streaming + else setup_sentry, "working_dir": "./", } ) @@ -119,44 +162,101 @@ def test_tracing_in_ray_tasks(task_options, task): == f"tests.integrations.ray.test_ray.{task.__name__}" ) - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - worker_envelopes = ray.get(example_task.remote()) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + worker_envelopes = ray.get(example_task.remote(span_streaming)) + + sentry_sdk.flush() + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_spans = [ + span + for item in client_envelope.items + for span in item.payload.json["items"] + ] + assert client_spans[1]["name"] == "ray test parent" + + worker_envelope = worker_envelopes[0] + worker_spans = [ + span + for item in worker_envelope.items + for span in item.payload.json["items"] + ] + assert ( + worker_spans[1]["name"] + == f"tests.integrations.ray.test_ray.{task.__name__}" + ) + assert ( + worker_spans[1]["attributes"]["sentry.span.source"]["value"] + == SegmentSource.TASK + ) - client_envelope = sentry_sdk.get_client().transport.envelopes[0] - client_transaction = client_envelope.get_transaction_event() - assert client_transaction["transaction"] == "ray test transaction" - assert client_transaction["transaction_info"] == {"source": "custom"} + span = client_spans[0] + assert span["attributes"]["sentry.op"]["value"] == "queue.submit.ray" + assert span["attributes"]["sentry.origin"]["value"] == "auto.queue.ray" + assert span["name"] == f"tests.integrations.ray.test_ray.{task.__name__}" + assert span["parent_span_id"] == client_spans[1]["span_id"] + assert span["trace_id"] == client_spans[1]["trace_id"] + + span = worker_spans[0] + assert span["attributes"]["sentry.op"]["value"] == "task" + assert span["attributes"]["sentry.origin"]["value"] == "manual" + assert span["name"] == "example task step" + assert span["parent_span_id"] == worker_spans[1]["span_id"] + assert span["trace_id"] == worker_spans[1]["trace_id"] + + assert client_spans[1]["trace_id"] == worker_spans[1]["trace_id"] + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(example_task.remote(span_streaming)) - worker_envelope = worker_envelopes[0] - worker_transaction = worker_envelope.get_transaction_event() - assert ( - worker_transaction["transaction"] - == f"tests.integrations.ray.test_ray.{task.__name__}" - ) - assert worker_transaction["transaction_info"] == {"source": "task"} - - (span,) = client_transaction["spans"] - assert span["op"] == "queue.submit.ray" - assert span["origin"] == "auto.queue.ray" - assert span["description"] == f"tests.integrations.ray.test_ray.{task.__name__}" - assert span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"] - assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"] - - (span,) = worker_transaction["spans"] - assert span["op"] == "task" - assert span["origin"] == "manual" - assert span["description"] == "example task step" - assert span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"] - assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"] + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() + assert client_transaction["transaction"] == "ray test transaction" + assert client_transaction["transaction_info"] == {"source": "custom"} - assert ( - client_transaction["contexts"]["trace"]["trace_id"] - == worker_transaction["contexts"]["trace"]["trace_id"] - ) + worker_envelope = worker_envelopes[0] + worker_transaction = worker_envelope.get_transaction_event() + assert ( + worker_transaction["transaction"] + == f"tests.integrations.ray.test_ray.{task.__name__}" + ) + assert worker_transaction["transaction_info"] == {"source": "task"} + (span,) = client_transaction["spans"] + assert span["op"] == "queue.submit.ray" + assert span["origin"] == "auto.queue.ray" + assert span["description"] == f"tests.integrations.ray.test_ray.{task.__name__}" + assert ( + span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"] + ) + assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"] -def test_errors_in_ray_tasks(): - setup_sentry_with_logging_transport() + (span,) = worker_transaction["spans"] + assert span["op"] == "task" + assert span["origin"] == "manual" + assert span["description"] == "example task step" + assert ( + span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"] + ) + assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"] + + assert ( + client_transaction["contexts"]["trace"]["trace_id"] + == worker_transaction["contexts"]["trace"]["trace_id"] + ) + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_errors_in_ray_tasks(span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") os.makedirs(ray_temp_dir, exist_ok=True) @@ -164,7 +264,9 @@ def test_errors_in_ray_tasks(): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry_with_logging_transport, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -175,10 +277,18 @@ def test_errors_in_ray_tasks(): def example_task(): 1 / 0 - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - with pytest.raises(ZeroDivisionError): - future = example_task.remote() - ray.get(future) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + with pytest.raises(ZeroDivisionError): + future = example_task.remote() + ray.get(future) + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + future = example_task.remote() + ray.get(future) job_id = future.job_id().hex() error = read_error_from_log(job_id, ray_temp_dir) @@ -198,12 +308,21 @@ def example_task(): # Arbitrary keyword argument to test all decorator paths @pytest.mark.parametrize("remote_kwargs", [{}, {"namespace": "actors"}]) -def test_tracing_in_ray_actors(remote_kwargs): - setup_sentry() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_tracing_in_ray_actors(remote_kwargs, span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry, + "worker_process_setup_hook": setup_sentry_with_span_streaming + if span_streaming + else setup_sentry, "working_dir": "./", } ) @@ -217,8 +336,16 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - self.n += 1 + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + self.n += 1 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + self.n += 1 return sentry_sdk.get_client().transport.envelopes else: @@ -229,27 +356,72 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - self.n += 1 + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + self.n += 1 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + self.n += 1 return sentry_sdk.get_client().transport.envelopes - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - counter = Counter.remote() - worker_envelopes = ray.get(counter.increment.remote()) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + counter = Counter.remote() + worker_envelopes = ray.get(counter.increment.remote()) + + sentry_sdk.flush() + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_spans = [ + span + for item in client_envelope.items + for span in item.payload.json["items"] + ] + + # Spans for submitting the actor task are not created (actors are not supported yet) + # Only the manual "example actor execution" span is recorded. + assert len(client_spans) == 1 + + worker_envelope = worker_envelopes[0] + worker_spans = [ + span + for item in worker_envelope.items + for span in item.payload.json["items"] + ] + + # Transaction are not yet created when executing ray actors (actors are not supported yet) + # Only the manual "example actor execution" span is recorded. + assert len(worker_spans) == 1 + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + counter = Counter.remote() + worker_envelopes = ray.get(counter.increment.remote()) - client_envelope = sentry_sdk.get_client().transport.envelopes[0] - client_transaction = client_envelope.get_transaction_event() + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() - # Spans for submitting the actor task are not created (actors are not supported yet) - assert client_transaction["spans"] == [] + # Spans for submitting the actor task are not created (actors are not supported yet) + assert client_transaction["spans"] == [] - # Transaction are not yet created when executing ray actors (actors are not supported yet) - assert worker_envelopes == [] + # Transaction are not yet created when executing ray actors (actors are not supported yet) + assert worker_envelopes == [] -def test_errors_in_ray_actors(): - setup_sentry_with_logging_transport() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_errors_in_ray_actors(span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayLoggingTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") os.makedirs(ray_temp_dir, exist_ok=True) @@ -257,7 +429,9 @@ def test_errors_in_ray_actors(): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry_with_logging_transport, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -270,16 +444,33 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - 1 / 0 + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + 1 / 0 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + 1 / 0 return sentry_sdk.get_client().transport.envelopes - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - with pytest.raises(ZeroDivisionError): - counter = Counter.remote() - future = counter.increment.remote() - ray.get(future) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + with pytest.raises(ZeroDivisionError): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) job_id = future.job_id().hex() error = read_error_from_log(job_id, ray_temp_dir) From c752264ccfa25e884e3edb770e99460ca8cd3703 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 08:46:03 +0200 Subject: [PATCH 2/6] . --- sentry_sdk/integrations/ray.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 32140b0f51..f723a96f3c 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -96,8 +96,11 @@ def new_func( if span_streaming: sentry_sdk.traces.continue_trace(_sentry_tracing or {}) + function_name = qualname_from_function(user_f) with sentry_sdk.traces.start_span( - name=qualname_from_function(user_f), + name="unknown Ray task" + if function_name is None + else function_name, attributes={ "sentry.op": OP.QUEUE_TASK_RAY, "sentry.origin": RayIntegration.origin, @@ -152,8 +155,11 @@ def _remote_method_with_header_propagation( sentry_sdk.get_client().options ) if span_streaming: + function_name = qualname_from_function(user_f) with sentry_sdk.traces.start_span( - name=qualname_from_function(user_f), + name="unknown Ray task" + if function_name is None + else function_name, attributes={ "sentry.op": OP.QUEUE_SUBMIT_RAY, "sentry.origin": RayIntegration.origin, From dbf413f2d010e43c1dcd1e292a23f25c713816be Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 08:54:09 +0200 Subject: [PATCH 3/6] add flush --- tests/integrations/ray/test_ray.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 8240b405e1..7e694a33cd 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -102,6 +102,7 @@ def example_task(span_streaming: bool): }, ): ... + sentry_sdk.flush() else: with sentry_sdk.start_span(op="task", name="example task step"): ... @@ -116,6 +117,7 @@ def example_task_with_kwargs(span_streaming: bool, **kwargs): name="example task step", attributes={"sentry.op": "task"} ): ... + sentry_sdk.flush() else: with sentry_sdk.start_span(op="task", name="example task step"): ... From 565358e38c915729911797418371e1349286308f Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 09:27:52 +0200 Subject: [PATCH 4/6] . --- tests/integrations/ray/test_ray.py | 156 ++++++++++++++++++++--------- 1 file changed, 109 insertions(+), 47 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 7e694a33cd..fccea31174 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -76,7 +76,7 @@ def read_error_from_log(job_id, ray_temp_dir): if "worker" in f and job_id in f and f.endswith(".out") ][0] - next_line_is_event = False + next_line_is_event_payload = False with open(os.path.join(log_dir, log_file), "r") as file: for line in file: try: @@ -84,15 +84,51 @@ def read_error_from_log(job_id, ray_temp_dir): except ValueError: continue - if next_line_is_event: + if next_line_is_event_payload: return payload if isinstance(payload, dict) and payload.get("type") == "event": - next_line_is_event = True + next_line_is_event_payload = True return None +def read_spans_from_log(job_id, ray_temp_dir): + # Find the actual session directory that Ray created + session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")] + if not session_dirs: + raise FileNotFoundError(f"No session directory found in {ray_temp_dir}") + + session_dir = os.path.join(ray_temp_dir, session_dirs[0]) + log_dir = os.path.join(session_dir, "logs") + + if not os.path.exists(log_dir): + raise FileNotFoundError(f"No logs directory found at {log_dir}") + + log_file = [ + f + for f in os.listdir(log_dir) + if "worker" in f and job_id in f and f.endswith(".out") + ][0] + + spans = [] + next_line_is_span_payload = False + with open(os.path.join(log_dir, log_file), "r") as file: + for line in file: + try: + payload = json.loads(line) + except ValueError: + continue + if next_line_is_span_payload: + spans.extend(payload["items"]) + next_line_is_span_payload = False + continue + if isinstance(payload, dict) and payload.get("type") == "span": + next_line_is_span_payload = True + + return spans + + def example_task(span_streaming: bool): if span_streaming: with sentry_sdk.traces.start_span( @@ -102,12 +138,11 @@ def example_task(span_streaming: bool): }, ): ... - sentry_sdk.flush() else: with sentry_sdk.start_span(op="task", name="example task step"): ... - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes # RayIntegration must leave variadic keyword arguments at the end @@ -117,12 +152,11 @@ def example_task_with_kwargs(span_streaming: bool, **kwargs): name="example task step", attributes={"sentry.op": "task"} ): ... - sentry_sdk.flush() else: with sentry_sdk.start_span(op="task", name="example task step"): ... - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes @pytest.mark.parametrize( @@ -142,15 +176,6 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - ray.init( - runtime_env={ - "worker_process_setup_hook": setup_sentry_with_span_streaming - if span_streaming - else setup_sentry, - "working_dir": "./", - } - ) - # Setup ray task, calling decorator directly instead of @, # to accommodate for test parametrization if task_options: @@ -165,10 +190,31 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): ) if span_streaming: - with sentry_sdk.traces.start_span( - name="ray test parent", attributes={"sentry.op": "task"} - ): - worker_envelopes = ray.get(example_task.remote(span_streaming)) + ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") + os.makedirs(ray_temp_dir, exist_ok=True) + + try: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry, + "working_dir": "./", + }, + _temp_dir=ray_temp_dir, + ) + + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + future = example_task.remote(span_streaming) + ray.get(future) + + job_id = future.job_id().hex() + worker_spans = read_spans_from_log(job_id, ray_temp_dir) + finally: + if os.path.exists(ray_temp_dir): + shutil.rmtree(ray_temp_dir, ignore_errors=True) sentry_sdk.flush() client_envelope = sentry_sdk.get_client().transport.envelopes[0] @@ -179,12 +225,6 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): ] assert client_spans[1]["name"] == "ray test parent" - worker_envelope = worker_envelopes[0] - worker_spans = [ - span - for item in worker_envelope.items - for span in item.payload.json["items"] - ] assert ( worker_spans[1]["name"] == f"tests.integrations.ray.test_ray.{task.__name__}" @@ -210,6 +250,15 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): assert client_spans[1]["trace_id"] == worker_spans[1]["trace_id"] else: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry, + "working_dir": "./", + } + ) + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): worker_envelopes = ray.get(example_task.remote(span_streaming)) @@ -320,15 +369,6 @@ def test_tracing_in_ray_actors(remote_kwargs, span_streaming): _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) - ray.init( - runtime_env={ - "worker_process_setup_hook": setup_sentry_with_span_streaming - if span_streaming - else setup_sentry, - "working_dir": "./", - } - ) - # Setup ray actor if remote_kwargs: @@ -372,11 +412,31 @@ def increment(self): return sentry_sdk.get_client().transport.envelopes if span_streaming: - with sentry_sdk.traces.start_span( - name="ray test parent", attributes={"sentry.op": "task"} - ): - counter = Counter.remote() - worker_envelopes = ray.get(counter.increment.remote()) + ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") + os.makedirs(ray_temp_dir, exist_ok=True) + + try: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_span_streaming + if span_streaming + else setup_sentry, + "working_dir": "./", + }, + _temp_dir=ray_temp_dir, + ) + + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + future = example_task.remote(span_streaming) + ray.get(future) + + job_id = future.job_id().hex() + worker_spans = read_spans_from_log(job_id, ray_temp_dir) + finally: + if os.path.exists(ray_temp_dir): + shutil.rmtree(ray_temp_dir, ignore_errors=True) sentry_sdk.flush() client_envelope = sentry_sdk.get_client().transport.envelopes[0] @@ -390,17 +450,19 @@ def increment(self): # Only the manual "example actor execution" span is recorded. assert len(client_spans) == 1 - worker_envelope = worker_envelopes[0] - worker_spans = [ - span - for item in worker_envelope.items - for span in item.payload.json["items"] - ] - # Transaction are not yet created when executing ray actors (actors are not supported yet) # Only the manual "example actor execution" span is recorded. assert len(worker_spans) == 1 else: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_span_streaming + if span_streaming + else setup_sentry, + "working_dir": "./", + } + ) + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): counter = Counter.remote() worker_envelopes = ray.get(counter.increment.remote()) From 278e7a4b4e026ce1b596287925b2e7d10620cf99 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 09:40:57 +0200 Subject: [PATCH 5/6] . --- tests/integrations/ray/test_ray.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index fccea31174..54706b0721 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -196,9 +196,7 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming - if span_streaming - else setup_sentry, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -252,9 +250,7 @@ def test_tracing_in_ray_tasks(task_options, task, span_streaming): else: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming - if span_streaming - else setup_sentry, + "worker_process_setup_hook": setup_sentry, "working_dir": "./", } ) @@ -389,7 +385,7 @@ def increment(self): ): self.n += 1 - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes else: @ray.remote @@ -409,7 +405,7 @@ def increment(self): ): self.n += 1 - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes if span_streaming: ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") @@ -418,9 +414,7 @@ def increment(self): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_span_streaming - if span_streaming - else setup_sentry, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -429,7 +423,8 @@ def increment(self): with sentry_sdk.traces.start_span( name="ray test parent", attributes={"sentry.op": "task"} ): - future = example_task.remote(span_streaming) + counter = Counter.remote() + future = counter.increment.remote() ray.get(future) job_id = future.job_id().hex() @@ -456,9 +451,7 @@ def increment(self): else: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_span_streaming - if span_streaming - else setup_sentry, + "worker_process_setup_hook": setup_sentry, "working_dir": "./", } ) From a53ce3ece55e045e612bb8a218f3f2e0b9274c42 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 8 Jun 2026 09:44:45 +0200 Subject: [PATCH 6/6] . --- tests/integrations/ray/test_ray.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 54706b0721..9165a63261 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -54,10 +54,6 @@ def setup_sentry(span_streaming=False, transport=None): ) -def setup_sentry_with_span_streaming(): - setup_sentry(span_streaming=True) - - def read_error_from_log(job_id, ray_temp_dir): # Find the actual session directory that Ray created session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")]