diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py index 4896f7e946..f723a96f3c 100644 --- a/sentry_sdk/integrations/ray.py +++ b/sentry_sdk/integrations/ray.py @@ -5,7 +5,9 @@ import sentry_sdk from sentry_sdk.consts import OP, SPANSTATUS from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version +from sentry_sdk.traces import SegmentSource from sentry_sdk.tracing import TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( event_from_exception, logger, @@ -88,25 +90,52 @@ def new_func( ) -> "Any": _check_sentry_initialized() - transaction = sentry_sdk.continue_trace( - _sentry_tracing or {}, - op=OP.QUEUE_TASK_RAY, - name=qualname_from_function(user_f), - origin=RayIntegration.origin, - source=TransactionSource.TASK, + span_streaming = has_span_streaming_enabled( + sentry_sdk.get_client().options ) - - with sentry_sdk.start_transaction(transaction) as transaction: - try: - result = user_f(*f_args, **f_kwargs) - transaction.set_status(SPANSTATUS.OK) - except Exception: - transaction.set_status(SPANSTATUS.INTERNAL_ERROR) - exc_info = sys.exc_info() - _capture_exception(exc_info) - reraise(*exc_info) - - return result + if span_streaming: + sentry_sdk.traces.continue_trace(_sentry_tracing or {}) + + function_name = qualname_from_function(user_f) + with sentry_sdk.traces.start_span( + name="unknown Ray task" + if function_name is None + else function_name, + attributes={ + "sentry.op": OP.QUEUE_TASK_RAY, + "sentry.origin": RayIntegration.origin, + "sentry.span.source": SegmentSource.TASK, + }, + parent_span=None, + ): + try: + result = user_f(*f_args, **f_kwargs) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result + else: + transaction = sentry_sdk.continue_trace( + _sentry_tracing or {}, + op=OP.QUEUE_TASK_RAY, + name=qualname_from_function(user_f), + origin=RayIntegration.origin, + source=TransactionSource.TASK, + ) + + with sentry_sdk.start_transaction(transaction) as transaction: + try: + result = user_f(*f_args, **f_kwargs) + transaction.set_status(SPANSTATUS.OK) + except Exception: + transaction.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result _insert_sentry_tracing_in_signature(new_func) @@ -122,27 +151,56 @@ def _remote_method_with_header_propagation( """ Ray Client """ - with sentry_sdk.start_span( - op=OP.QUEUE_SUBMIT_RAY, - name=qualname_from_function(user_f), - origin=RayIntegration.origin, - ) as span: - tracing = { - k: v - for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() - } - try: - result = old_remote_method( - *args, **kwargs, _sentry_tracing=tracing - ) - span.set_status(SPANSTATUS.OK) - except Exception: - span.set_status(SPANSTATUS.INTERNAL_ERROR) - exc_info = sys.exc_info() - _capture_exception(exc_info) - reraise(*exc_info) - - return result + span_streaming = has_span_streaming_enabled( + sentry_sdk.get_client().options + ) + if span_streaming: + function_name = qualname_from_function(user_f) + with sentry_sdk.traces.start_span( + name="unknown Ray task" + if function_name is None + else function_name, + attributes={ + "sentry.op": OP.QUEUE_SUBMIT_RAY, + "sentry.origin": RayIntegration.origin, + }, + ): + tracing = { + k: v + for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() + } + try: + result = old_remote_method( + *args, **kwargs, _sentry_tracing=tracing + ) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result + else: + with sentry_sdk.start_span( + op=OP.QUEUE_SUBMIT_RAY, + name=qualname_from_function(user_f), + origin=RayIntegration.origin, + ) as span: + tracing = { + k: v + for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() + } + try: + result = old_remote_method( + *args, **kwargs, _sentry_tracing=tracing + ) + span.set_status(SPANSTATUS.OK) + except Exception: + span.set_status(SPANSTATUS.INTERNAL_ERROR) + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result rv.remote = _remote_method_with_header_propagation diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py index 4cc413eada..9165a63261 100644 --- a/tests/integrations/ray/test_ray.py +++ b/tests/integrations/ray/test_ray.py @@ -9,6 +9,8 @@ import sentry_sdk from sentry_sdk.envelope import Envelope from sentry_sdk.integrations.ray import RayIntegration +from sentry_sdk.integrations.stdlib import StdlibIntegration +from sentry_sdk.traces import SegmentSource from tests.conftest import TestTransport @@ -36,11 +38,19 @@ def setup_sentry_with_logging_transport(): setup_sentry(transport=RayLoggingTransport()) -def setup_sentry(transport=None): +def setup_sentry_with_logging_transport_and_span_streaming(): + setup_sentry(span_streaming=True, transport=RayLoggingTransport()) + + +def setup_sentry(span_streaming=False, transport=None): + if span_streaming: + sentry_sdk._span_batcher.SpanBatcher.MAX_BEFORE_FLUSH = 1 + sentry_sdk.init( integrations=[RayIntegration()], transport=RayTestTransport() if transport is None else transport, traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) @@ -62,31 +72,87 @@ def read_error_from_log(job_id, ray_temp_dir): if "worker" in f and job_id in f and f.endswith(".out") ][0] + next_line_is_event_payload = False with open(os.path.join(log_dir, log_file), "r") as file: - lines = file.readlines() + for line in file: + try: + payload = json.loads(line) + except ValueError: + continue - try: - # parse error object from log line - error = json.loads(lines[4][:-1]) - except IndexError: - error = None + if next_line_is_event_payload: + return payload + + if isinstance(payload, dict) and payload.get("type") == "event": + next_line_is_event_payload = True + + return None - return error +def read_spans_from_log(job_id, ray_temp_dir): + # Find the actual session directory that Ray created + session_dirs = [d for d in os.listdir(ray_temp_dir) if d.startswith("session_")] + if not session_dirs: + raise FileNotFoundError(f"No session directory found in {ray_temp_dir}") -def example_task(): - with sentry_sdk.start_span(op="task", name="example task step"): - ... + session_dir = os.path.join(ray_temp_dir, session_dirs[0]) + log_dir = os.path.join(session_dir, "logs") + + if not os.path.exists(log_dir): + raise FileNotFoundError(f"No logs directory found at {log_dir}") + + log_file = [ + f + for f in os.listdir(log_dir) + if "worker" in f and job_id in f and f.endswith(".out") + ][0] + + spans = [] + next_line_is_span_payload = False + with open(os.path.join(log_dir, log_file), "r") as file: + for line in file: + try: + payload = json.loads(line) + except ValueError: + continue + if next_line_is_span_payload: + spans.extend(payload["items"]) + next_line_is_span_payload = False + continue + if isinstance(payload, dict) and payload.get("type") == "span": + next_line_is_span_payload = True + + return spans + + +def example_task(span_streaming: bool): + if span_streaming: + with sentry_sdk.traces.start_span( + name="example task step", + attributes={ + "sentry.op": "task", + }, + ): + ... + else: + with sentry_sdk.start_span(op="task", name="example task step"): + ... - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes # RayIntegration must leave variadic keyword arguments at the end -def example_task_with_kwargs(**kwargs): - with sentry_sdk.start_span(op="task", name="example task step"): - ... +def example_task_with_kwargs(span_streaming: bool, **kwargs): + if span_streaming: + with sentry_sdk.traces.start_span( + name="example task step", attributes={"sentry.op": "task"} + ): + ... + else: + with sentry_sdk.start_span(op="task", name="example task step"): + ... - return sentry_sdk.get_client().transport.envelopes + return sentry_sdk.get_client().transport.envelopes @pytest.mark.parametrize( @@ -96,14 +162,14 @@ def example_task_with_kwargs(**kwargs): "task", [example_task, example_task_with_kwargs], ) -def test_tracing_in_ray_tasks(task_options, task): - setup_sentry() - - ray.init( - runtime_env={ - "worker_process_setup_hook": setup_sentry, - "working_dir": "./", - } +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_tracing_in_ray_tasks(task_options, task, span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Setup ray task, calling decorator directly instead of @, @@ -119,44 +185,121 @@ def test_tracing_in_ray_tasks(task_options, task): == f"tests.integrations.ray.test_ray.{task.__name__}" ) - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - worker_envelopes = ray.get(example_task.remote()) + if span_streaming: + ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") + os.makedirs(ray_temp_dir, exist_ok=True) - client_envelope = sentry_sdk.get_client().transport.envelopes[0] - client_transaction = client_envelope.get_transaction_event() - assert client_transaction["transaction"] == "ray test transaction" - assert client_transaction["transaction_info"] == {"source": "custom"} + try: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming, + "working_dir": "./", + }, + _temp_dir=ray_temp_dir, + ) + + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + future = example_task.remote(span_streaming) + ray.get(future) - worker_envelope = worker_envelopes[0] - worker_transaction = worker_envelope.get_transaction_event() - assert ( - worker_transaction["transaction"] - == f"tests.integrations.ray.test_ray.{task.__name__}" - ) - assert worker_transaction["transaction_info"] == {"source": "task"} - - (span,) = client_transaction["spans"] - assert span["op"] == "queue.submit.ray" - assert span["origin"] == "auto.queue.ray" - assert span["description"] == f"tests.integrations.ray.test_ray.{task.__name__}" - assert span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"] - assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"] - - (span,) = worker_transaction["spans"] - assert span["op"] == "task" - assert span["origin"] == "manual" - assert span["description"] == "example task step" - assert span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"] - assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"] + job_id = future.job_id().hex() + worker_spans = read_spans_from_log(job_id, ray_temp_dir) + finally: + if os.path.exists(ray_temp_dir): + shutil.rmtree(ray_temp_dir, ignore_errors=True) + + sentry_sdk.flush() + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_spans = [ + span + for item in client_envelope.items + for span in item.payload.json["items"] + ] + assert client_spans[1]["name"] == "ray test parent" - assert ( - client_transaction["contexts"]["trace"]["trace_id"] - == worker_transaction["contexts"]["trace"]["trace_id"] - ) + assert ( + worker_spans[1]["name"] + == f"tests.integrations.ray.test_ray.{task.__name__}" + ) + assert ( + worker_spans[1]["attributes"]["sentry.span.source"]["value"] + == SegmentSource.TASK + ) + + span = client_spans[0] + assert span["attributes"]["sentry.op"]["value"] == "queue.submit.ray" + assert span["attributes"]["sentry.origin"]["value"] == "auto.queue.ray" + assert span["name"] == f"tests.integrations.ray.test_ray.{task.__name__}" + assert span["parent_span_id"] == client_spans[1]["span_id"] + assert span["trace_id"] == client_spans[1]["trace_id"] + + span = worker_spans[0] + assert span["attributes"]["sentry.op"]["value"] == "task" + assert span["attributes"]["sentry.origin"]["value"] == "manual" + assert span["name"] == "example task step" + assert span["parent_span_id"] == worker_spans[1]["span_id"] + assert span["trace_id"] == worker_spans[1]["trace_id"] + + assert client_spans[1]["trace_id"] == worker_spans[1]["trace_id"] + else: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry, + "working_dir": "./", + } + ) + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(example_task.remote(span_streaming)) + + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() + assert client_transaction["transaction"] == "ray test transaction" + assert client_transaction["transaction_info"] == {"source": "custom"} + + worker_envelope = worker_envelopes[0] + worker_transaction = worker_envelope.get_transaction_event() + assert ( + worker_transaction["transaction"] + == f"tests.integrations.ray.test_ray.{task.__name__}" + ) + assert worker_transaction["transaction_info"] == {"source": "task"} + + (span,) = client_transaction["spans"] + assert span["op"] == "queue.submit.ray" + assert span["origin"] == "auto.queue.ray" + assert span["description"] == f"tests.integrations.ray.test_ray.{task.__name__}" + assert ( + span["parent_span_id"] == client_transaction["contexts"]["trace"]["span_id"] + ) + assert span["trace_id"] == client_transaction["contexts"]["trace"]["trace_id"] -def test_errors_in_ray_tasks(): - setup_sentry_with_logging_transport() + (span,) = worker_transaction["spans"] + assert span["op"] == "task" + assert span["origin"] == "manual" + assert span["description"] == "example task step" + assert ( + span["parent_span_id"] == worker_transaction["contexts"]["trace"]["span_id"] + ) + assert span["trace_id"] == worker_transaction["contexts"]["trace"]["trace_id"] + + assert ( + client_transaction["contexts"]["trace"]["trace_id"] + == worker_transaction["contexts"]["trace"]["trace_id"] + ) + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_errors_in_ray_tasks(span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") os.makedirs(ray_temp_dir, exist_ok=True) @@ -164,7 +307,9 @@ def test_errors_in_ray_tasks(): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry_with_logging_transport, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -175,10 +320,18 @@ def test_errors_in_ray_tasks(): def example_task(): 1 / 0 - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - with pytest.raises(ZeroDivisionError): - future = example_task.remote() - ray.get(future) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + with pytest.raises(ZeroDivisionError): + future = example_task.remote() + ray.get(future) + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + future = example_task.remote() + ray.get(future) job_id = future.job_id().hex() error = read_error_from_log(job_id, ray_temp_dir) @@ -198,14 +351,14 @@ def example_task(): # Arbitrary keyword argument to test all decorator paths @pytest.mark.parametrize("remote_kwargs", [{}, {"namespace": "actors"}]) -def test_tracing_in_ray_actors(remote_kwargs): - setup_sentry() - - ray.init( - runtime_env={ - "worker_process_setup_hook": setup_sentry, - "working_dir": "./", - } +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_tracing_in_ray_actors(remote_kwargs, span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayTestTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, ) # Setup ray actor @@ -217,10 +370,18 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - self.n += 1 - - return sentry_sdk.get_client().transport.envelopes + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + self.n += 1 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + self.n += 1 + + return sentry_sdk.get_client().transport.envelopes else: @ray.remote @@ -229,27 +390,91 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - self.n += 1 + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + self.n += 1 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + self.n += 1 + + return sentry_sdk.get_client().transport.envelopes + + if span_streaming: + ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") + os.makedirs(ray_temp_dir, exist_ok=True) - return sentry_sdk.get_client().transport.envelopes + try: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming, + "working_dir": "./", + }, + _temp_dir=ray_temp_dir, + ) + + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - counter = Counter.remote() - worker_envelopes = ray.get(counter.increment.remote()) + job_id = future.job_id().hex() + worker_spans = read_spans_from_log(job_id, ray_temp_dir) + finally: + if os.path.exists(ray_temp_dir): + shutil.rmtree(ray_temp_dir, ignore_errors=True) + + sentry_sdk.flush() + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_spans = [ + span + for item in client_envelope.items + for span in item.payload.json["items"] + ] + + # Spans for submitting the actor task are not created (actors are not supported yet) + # Only the manual "example actor execution" span is recorded. + assert len(client_spans) == 1 + + # Transaction are not yet created when executing ray actors (actors are not supported yet) + # Only the manual "example actor execution" span is recorded. + assert len(worker_spans) == 1 + else: + ray.init( + runtime_env={ + "worker_process_setup_hook": setup_sentry, + "working_dir": "./", + } + ) - client_envelope = sentry_sdk.get_client().transport.envelopes[0] - client_transaction = client_envelope.get_transaction_event() + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + counter = Counter.remote() + worker_envelopes = ray.get(counter.increment.remote()) - # Spans for submitting the actor task are not created (actors are not supported yet) - assert client_transaction["spans"] == [] + client_envelope = sentry_sdk.get_client().transport.envelopes[0] + client_transaction = client_envelope.get_transaction_event() - # Transaction are not yet created when executing ray actors (actors are not supported yet) - assert worker_envelopes == [] + # Spans for submitting the actor task are not created (actors are not supported yet) + assert client_transaction["spans"] == [] + # Transaction are not yet created when executing ray actors (actors are not supported yet) + assert worker_envelopes == [] -def test_errors_in_ray_actors(): - setup_sentry_with_logging_transport() + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_errors_in_ray_actors(span_streaming): + sentry_sdk.init( + integrations=[RayIntegration()], + disabled_integrations=[StdlibIntegration], + transport=RayLoggingTransport(), + traces_sample_rate=1.0, + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) ray_temp_dir = os.path.join("/tmp", f"ray_test_{uuid.uuid4().hex[:8]}") os.makedirs(ray_temp_dir, exist_ok=True) @@ -257,7 +482,9 @@ def test_errors_in_ray_actors(): try: ray.init( runtime_env={ - "worker_process_setup_hook": setup_sentry_with_logging_transport, + "worker_process_setup_hook": setup_sentry_with_logging_transport_and_span_streaming + if span_streaming + else setup_sentry_with_logging_transport, "working_dir": "./", }, _temp_dir=ray_temp_dir, @@ -270,16 +497,33 @@ def __init__(self): self.n = 0 def increment(self): - with sentry_sdk.start_span(op="task", name="example actor execution"): - 1 / 0 + if span_streaming: + with sentry_sdk.traces.start_span( + name="example actor execution", attributes={"sentry.op": "task"} + ): + 1 / 0 + else: + with sentry_sdk.start_span( + op="task", name="example actor execution" + ): + 1 / 0 return sentry_sdk.get_client().transport.envelopes - with sentry_sdk.start_transaction(op="task", name="ray test transaction"): - with pytest.raises(ZeroDivisionError): - counter = Counter.remote() - future = counter.increment.remote() - ray.get(future) + if span_streaming: + with sentry_sdk.traces.start_span( + name="ray test parent", attributes={"sentry.op": "task"} + ): + with pytest.raises(ZeroDivisionError): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) + else: + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + with pytest.raises(ZeroDivisionError): + counter = Counter.remote() + future = counter.increment.remote() + ray.get(future) job_id = future.job_id().hex() error = read_error_from_log(job_id, ray_temp_dir)