diff --git a/documentation/components/bridges/symfony-telemetry-bundle.md b/documentation/components/bridges/symfony-telemetry-bundle.md index eddb06e859..1bbff8c543 100644 --- a/documentation/components/bridges/symfony-telemetry-bundle.md +++ b/documentation/components/bridges/symfony-telemetry-bundle.md @@ -1024,8 +1024,22 @@ flow_telemetry: messenger: enabled: true context_propagation: true # Propagate context across message boundaries + propagation_style: link # How the consumer span relates to the producer span ``` +When `context_propagation` is enabled, `propagation_style` controls how a consumed message's span +relates to the producing (publishing) span: + +- `link` (default) — the consumer span stays in the worker's own trace (under the `messenger:consume` + console span) and carries a span link back to the producer span. Producer and consumer get separate, + clean traces connected by a link. Recommended for decoupled, batch, or long-delay queues, where + continuing the trace would otherwise absorb the entire queue wait into a single span's duration. +- `continue` — the consumer span adopts the producer's trace and becomes its child, so + publish → queue → consume is one continuous distributed trace. Fine for fast, 1:1 processing. + +`propagation_style` has no effect when `context_propagation` is `false` (there is nothing to relate to). +The producer side is identical in both modes — the telemetry stamp is always written on dispatch. + #### Twig Traces Twig template rendering. @@ -1580,6 +1594,7 @@ flow_telemetry: messenger: enabled: true context_propagation: true + propagation_style: link dbal: enabled: true log_sql: true diff --git a/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php index 61aa680a0c..c2a1256871 100644 --- a/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php +++ b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/FlowTelemetryBundle.php @@ -16,6 +16,7 @@ use Flow\Bridge\Symfony\TelemetryBundle\DependencyInjection\Compiler\ProfilerSignalCapturePass; use Flow\Bridge\Symfony\TelemetryBundle\DependencyInjection\Compiler\Psr18ClientTelemetryPass; use Flow\Bridge\Symfony\TelemetryBundle\Exception\RuntimeException; +use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\MessengerTracePropagation; use Flow\Bridge\Symfony\TelemetryBundle\Resource\Detector\SymfonyDeploymentDetector; use Flow\Bridge\Telemetry\OTLP\Exporter\OTLPExporter; use Flow\Bridge\Telemetry\OTLP\Serializer\JsonSerializer; @@ -497,6 +498,13 @@ public function configure(DefinitionConfigurator $definition): void ->info('Enable context propagation across message boundaries (requires propagator)') ->defaultTrue() ->end() + ->enumNode('propagation_style') + ->info('When context propagation is enabled, how the consumer span relates to the producer span: ' + . '"link" (default) keeps the consumer in the worker\'s own trace and links back to the producer; ' + . '"continue" makes the consumer a child in the producer\'s trace.') + ->values(['continue', 'link']) + ->defaultValue('link') + ->end() ->end() ->end() ->arrayNode('twig') @@ -707,7 +715,7 @@ public function configure(DefinitionConfigurator $definition): void } /** - * @param array{resource: array{detectors?: array{enabled?: bool, static?: array{cache?: array{enabled?: bool, path?: null|string}, os?: array{enabled?: bool}, host?: array{enabled?: bool}, service?: array{enabled?: bool}, deployment?: array{enabled?: bool}, environment?: array{enabled?: bool}}, dynamic?: array{process?: array{enabled?: bool}}}, custom?: array}, clock_service_id?: null|string, framework_logger?: null|string, capture_framework_channels?: bool, channel_attribute_target?: 'scope'|'signal'|'both', context_storage?: array{type?: string, service_id?: null|string}, propagator?: array{type?: string, service_id?: null|string}, exporters?: array>, error_handlers?: array>, tracer_provider?: array, meter_provider?: array, logger_provider?: array, instrumentation?: array{http_kernel?: array{enabled?: bool, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}}, profiler?: array{enabled?: bool|null, capture_logs?: bool}, tracers?: array, signal?: array}}>, meters?: array, signal?: array}}>, loggers?: array, signal?: array}}>} $config + * @param array{resource: array{detectors?: array{enabled?: bool, static?: array{cache?: array{enabled?: bool, path?: null|string}, os?: array{enabled?: bool}, host?: array{enabled?: bool}, service?: array{enabled?: bool}, deployment?: array{enabled?: bool}, environment?: array{enabled?: bool}}, dynamic?: array{process?: array{enabled?: bool}}}, custom?: array}, clock_service_id?: null|string, framework_logger?: null|string, capture_framework_channels?: bool, channel_attribute_target?: 'scope'|'signal'|'both', context_storage?: array{type?: string, service_id?: null|string}, propagator?: array{type?: string, service_id?: null|string}, exporters?: array>, error_handlers?: array>, tracer_provider?: array, meter_provider?: array, logger_provider?: array, instrumentation?: array{http_kernel?: array{enabled?: bool, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}}, profiler?: array{enabled?: bool|null, capture_logs?: bool}, tracers?: array, signal?: array}}>, meters?: array, signal?: array}}>, loggers?: array, signal?: array}}>} $config */ #[Override] public function loadExtension(array $config, ContainerConfigurator $container, ContainerBuilder $builder): void @@ -2526,7 +2534,7 @@ private function registerGlobalServices(array $config, ContainerBuilder $builder } /** - * @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}} $config + * @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}} $config */ private function registerInstrumentation(array $config, ContainerConfigurator $container, ContainerBuilder $builder): void { @@ -2569,6 +2577,7 @@ private function registerInstrumentation(array $config, ContainerConfigurator $c $definition = $builder->getDefinition('flow.telemetry.messenger.middleware'); $definition->setArgument(1, new Reference('flow.telemetry.context_storage')); $definition->setArgument(2, new Reference('flow.telemetry.propagator')); + $definition->setArgument(3, MessengerTracePropagation::from($messengerConfig['propagation_style'] ?? 'link')); } } @@ -2884,7 +2893,7 @@ private function registerNamedExporters(array $config, ContainerBuilder $builder } /** - * @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}} $config + * @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array, exclude_paths?: array, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array}, http_client?: array{enabled?: bool, exclude_clients?: array}, psr18_client?: array{enabled?: bool, exclude_clients?: array}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array}, cache?: array{enabled?: bool, exclude_pools?: array}} $config */ private function registerParameterOnlyInstrumentation(array $config, ContainerBuilder $builder): void { diff --git a/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/MessengerTracePropagation.php b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/MessengerTracePropagation.php new file mode 100644 index 0000000000..815db16637 --- /dev/null +++ b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/MessengerTracePropagation.php @@ -0,0 +1,25 @@ + queue -> consume is one continuous distributed trace. + */ + case Continuation = 'continue'; + + /** + * The consumer span stays in the worker's own trace and carries a span link + * back to the producer span. Producer and consumer get separate, clean traces + * connected by a link (recommended for decoupled / batch / long-delay queues). + */ + case Link = 'link'; +} diff --git a/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/TracingMiddleware.php b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/TracingMiddleware.php index cc1214e24c..5fc9b7f3d6 100644 --- a/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/TracingMiddleware.php +++ b/src/bridge/symfony/telemetry-bundle/src/Flow/Bridge/Symfony/TelemetryBundle/Instrumentation/Messenger/TracingMiddleware.php @@ -13,6 +13,7 @@ use Flow\Telemetry\Telemetry; use Flow\Telemetry\Tracer\SpanContext; use Flow\Telemetry\Tracer\SpanKind; +use Flow\Telemetry\Tracer\SpanLink; use Flow\Telemetry\Tracer\SpanStatus; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; @@ -31,6 +32,7 @@ public function __construct( private Telemetry $telemetry, private ?ContextStorage $contextStorage = null, private ?Propagator $propagator = null, + private MessengerTracePropagation $propagation = MessengerTracePropagation::Link, ) {} public function handle(Envelope $envelope, StackInterface $stack): Envelope @@ -47,10 +49,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope $isReceived = $receivedStamp !== null; - if ($isReceived) { - $this->extractContext($envelope); - } - $kind = $isReceived ? SpanKind::CONSUMER : SpanKind::PRODUCER; $operation = $isReceived ? 'receive' : 'send'; @@ -73,7 +71,34 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope $attributes['messaging.message.id'] = (string) $transportIdStamp->getId(); } - $span = $tracer->span($spanName, $kind, $attributes); + $remote = $isReceived ? $this->extractRemoteContext($envelope) : null; + $links = []; + + if ($remote !== null && $remote->spanContext !== null) { + $remoteSpanContext = $remote->spanContext; + $remoteBaggage = $remote->baggage; + + if ($this->propagation === MessengerTracePropagation::Continuation) { + $context = Context::withTraceId($remoteSpanContext->traceId)->withActiveSpan($remoteSpanContext->spanId); + + if ($remoteBaggage !== null) { + $context = $context->withBaggage($remoteBaggage); + } + + $this->contextStorage?->attach($context); + } else { + $links[] = SpanLink::create( + SpanContext::createRemote($remoteSpanContext->traceId, $remoteSpanContext->spanId), + ['messaging.operation.type' => 'process'], + ); + + if ($remoteBaggage !== null && $this->contextStorage !== null) { + $this->contextStorage->attach($this->contextStorage->current()->withBaggage($remoteBaggage)); + } + } + } + + $span = $tracer->span($spanName, $kind, $attributes, $links); if (!$isReceived) { $envelope = $this->injectContext($envelope); @@ -94,33 +119,19 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope } } - private function extractContext(Envelope $envelope): void + private function extractRemoteContext(Envelope $envelope): ?PropagationContext { if ($this->contextStorage === null || $this->propagator === null) { - return; + return null; } $stamp = $envelope->last(TelemetryStamp::class); if (!$stamp instanceof TelemetryStamp) { - return; + return null; } - $carrier = new TelemetryStampCarrier($stamp); - $propagationContext = $this->propagator->extract($carrier); - - $spanContext = $propagationContext->spanContext; - - if ($spanContext !== null) { - $context = Context::withTraceId($spanContext->traceId); - $context = $context->withActiveSpan($spanContext->spanId); - - if ($propagationContext->baggage !== null) { - $context = $context->withBaggage($propagationContext->baggage); - } - - $this->contextStorage->attach($context); - } + return $this->propagator->extract(new TelemetryStampCarrier($stamp)); } private function getShortClassName(string $className): string diff --git a/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/FlowTelemetryExtensionTest.php b/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/FlowTelemetryExtensionTest.php index 320b6876cd..f11d8e9b0b 100644 --- a/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/FlowTelemetryExtensionTest.php +++ b/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/FlowTelemetryExtensionTest.php @@ -8,6 +8,7 @@ use Flow\Bridge\Symfony\TelemetryBundle\DependencyInjection\Compiler\OTLPAvailabilityPass; use Flow\Bridge\Symfony\TelemetryBundle\Exception\RuntimeException; use Flow\Bridge\Symfony\TelemetryBundle\FlowTelemetryBundle; +use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\MessengerTracePropagation; use Flow\Bridge\Symfony\TelemetryBundle\Tests\Fixtures\TestKernel; use Flow\Bridge\Telemetry\OTLP\Exporter\OTLPExporter; use Flow\Bridge\Telemetry\OTLP\Transport\CurlTransport; @@ -44,9 +45,11 @@ use Symfony\Component\DependencyInjection\Definition; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\HttpKernel\Log\Logger as SymfonyDefaultLogger; +use Symfony\Component\Messenger\Middleware\MiddlewareInterface as MessengerMiddlewareInterface; use function bin2hex; use function extension_loaded; +use function interface_exists; use function is_file; use function random_bytes; use function sys_get_temp_dir; @@ -480,6 +483,91 @@ public function test_otlp_exporter_uses_named_error_handler(): void static::assertSame('flow.telemetry.error_handler.silent', (string) $definition->getArgument(1)); } + public function test_messenger_middleware_defaults_to_link_propagation_style(): void + { + if (!interface_exists(MessengerMiddlewareInterface::class)) { + static::markTestSkipped('symfony/messenger is not installed'); + } + + $container = new ContainerBuilder(); + $container->setParameter('kernel.environment', 'test'); + $container->setParameter('kernel.project_dir', sys_get_temp_dir()); + $container->setParameter('kernel.build_dir', sys_get_temp_dir()); + $extension = (new FlowTelemetryBundle())->getContainerExtension(); + assert($extension !== null); + $extension->load([[ + 'resource' => [], + 'instrumentation' => [ + 'messenger' => [ + 'enabled' => true, + ], + ], + ]], $container); + + static::assertSame( + MessengerTracePropagation::Link, + $container->getDefinition('flow.telemetry.messenger.middleware')->getArgument(3), + ); + } + + public function test_messenger_middleware_receives_continue_propagation_style(): void + { + if (!interface_exists(MessengerMiddlewareInterface::class)) { + static::markTestSkipped('symfony/messenger is not installed'); + } + + $container = new ContainerBuilder(); + $container->setParameter('kernel.environment', 'test'); + $container->setParameter('kernel.project_dir', sys_get_temp_dir()); + $container->setParameter('kernel.build_dir', sys_get_temp_dir()); + $extension = (new FlowTelemetryBundle())->getContainerExtension(); + assert($extension !== null); + $extension->load([[ + 'resource' => [], + 'instrumentation' => [ + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + 'propagation_style' => 'continue', + ], + ], + ]], $container); + + static::assertSame( + MessengerTracePropagation::Continuation, + $container->getDefinition('flow.telemetry.messenger.middleware')->getArgument(3), + ); + } + + public function test_messenger_middleware_receives_link_propagation_style(): void + { + if (!interface_exists(MessengerMiddlewareInterface::class)) { + static::markTestSkipped('symfony/messenger is not installed'); + } + + $container = new ContainerBuilder(); + $container->setParameter('kernel.environment', 'test'); + $container->setParameter('kernel.project_dir', sys_get_temp_dir()); + $container->setParameter('kernel.build_dir', sys_get_temp_dir()); + $extension = (new FlowTelemetryBundle())->getContainerExtension(); + assert($extension !== null); + $extension->load([[ + 'resource' => [], + 'instrumentation' => [ + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + 'propagation_style' => 'link', + ], + ], + ]], $container); + + static::assertSame( + MessengerTracePropagation::Link, + $container->getDefinition('flow.telemetry.messenger.middleware')->getArgument(3), + ); + } + public function test_otlp_transport_failover_inline_curl_with_stream_failover(): void { $this->bootKernel([ diff --git a/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/Instrumentation/Messenger/TracingMiddlewareTest.php b/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/Instrumentation/Messenger/TracingMiddlewareTest.php index acc16c5235..40874279e3 100644 --- a/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/Instrumentation/Messenger/TracingMiddlewareTest.php +++ b/src/bridge/symfony/telemetry-bundle/tests/Flow/Bridge/Symfony/TelemetryBundle/Tests/Integration/Instrumentation/Messenger/TracingMiddlewareTest.php @@ -4,6 +4,7 @@ namespace Flow\Bridge\Symfony\TelemetryBundle\Tests\Integration\Instrumentation\Messenger; +use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\MessengerTracePropagation; use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\TelemetryStamp; use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\TracingMiddleware; use Flow\Bridge\Symfony\TelemetryBundle\Tests\Fixtures\Message\TestMessage; @@ -84,7 +85,7 @@ public function test_context_is_extracted_on_consume(): void $handler = new TestMessageHandler(); $bus = new MessageBus([ - new TracingMiddleware($telemetry, $contextStorage, $propagator), + new TracingMiddleware($telemetry, $contextStorage, $propagator, MessengerTracePropagation::Continuation), new HandleMessageMiddleware(new HandlersLocator([ TestMessage::class => [$handler], ])), @@ -110,6 +111,258 @@ public function test_context_is_extracted_on_consume(): void static::assertSame($originalTraceId, $span->context()->traceId->toHex()); } + public function test_consumer_span_has_no_links_in_continue_mode(): void + { + $this->bootKernel([ + 'config' => static function (TestKernel $kernel): void { + $kernel->addTestExtensionConfig('flow_telemetry', [ + 'resource' => [], + 'exporters' => ['memory' => ['memory' => null], 'void' => ['void' => null]], + 'tracer_provider' => [ + 'processor' => [ + 'type' => 'memory', + 'exporter' => 'memory', + ], + ], + 'propagator' => ['type' => 'w3c'], + 'instrumentation' => [ + 'http_kernel' => false, + 'console' => false, + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + ], + ], + ]); + }, + ]); + + $this->getContainer(); + + $telemetry = $this->symfonyContext()->getService(Telemetry::class, Telemetry::class); + $contextStorage = $this->symfonyContext()->getService('flow.telemetry.context_storage', ContextStorage::class); + $propagator = $this->symfonyContext()->getService('flow.telemetry.propagator', Propagator::class); + + $originalTraceId = 'abcdef0123456789abcdef0123456789'; + $originalSpanId = '0123456789abcdef'; + $traceparent = "00-{$originalTraceId}-{$originalSpanId}-01"; + + $bus = new MessageBus([ + new TracingMiddleware($telemetry, $contextStorage, $propagator, MessengerTracePropagation::Continuation), + new HandleMessageMiddleware(new HandlersLocator([ + TestMessage::class => [new TestMessageHandler()], + ])), + ]); + + $bus->dispatch(new Envelope(new TestMessage('test'), [ + new ReceivedStamp('async'), + new TelemetryStamp(['traceparent' => $traceparent]), + ])); + + $processor = $this->symfonyContext()->getService( + 'flow.telemetry.tracer_provider.processor', + MemorySpanProcessor::class, + ); + $spans = $processor->endedSpans(); + + static::assertCount(1, $spans); + $span = $spans[0]; + + static::assertSame(SpanKind::CONSUMER, $span->kind()); + static::assertSame($originalTraceId, $span->context()->traceId->toHex()); + static::assertCount(0, $span->links()); + } + + public function test_consumer_span_links_to_producer_in_link_mode(): void + { + $this->bootKernel([ + 'config' => static function (TestKernel $kernel): void { + $kernel->addTestExtensionConfig('flow_telemetry', [ + 'resource' => [], + 'exporters' => ['memory' => ['memory' => null], 'void' => ['void' => null]], + 'tracer_provider' => [ + 'processor' => [ + 'type' => 'memory', + 'exporter' => 'memory', + ], + ], + 'propagator' => ['type' => 'w3c'], + 'instrumentation' => [ + 'http_kernel' => false, + 'console' => false, + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + ], + ], + ]); + }, + ]); + + $this->getContainer(); + + $telemetry = $this->symfonyContext()->getService(Telemetry::class, Telemetry::class); + $contextStorage = $this->symfonyContext()->getService('flow.telemetry.context_storage', ContextStorage::class); + $propagator = $this->symfonyContext()->getService('flow.telemetry.propagator', Propagator::class); + + $originalTraceId = 'abcdef0123456789abcdef0123456789'; + $originalSpanId = '0123456789abcdef'; + $traceparent = "00-{$originalTraceId}-{$originalSpanId}-01"; + + $bus = new MessageBus([ + new TracingMiddleware($telemetry, $contextStorage, $propagator, MessengerTracePropagation::Link), + new HandleMessageMiddleware(new HandlersLocator([ + TestMessage::class => [new TestMessageHandler()], + ])), + ]); + + $bus->dispatch(new Envelope(new TestMessage('test'), [ + new ReceivedStamp('async'), + new TelemetryStamp(['traceparent' => $traceparent]), + ])); + + $processor = $this->symfonyContext()->getService( + 'flow.telemetry.tracer_provider.processor', + MemorySpanProcessor::class, + ); + $spans = $processor->endedSpans(); + + static::assertCount(1, $spans); + $span = $spans[0]; + + static::assertSame(SpanKind::CONSUMER, $span->kind()); + static::assertNotSame($originalTraceId, $span->context()->traceId->toHex()); + + $links = $span->links(); + static::assertCount(1, $links); + + $linkedContext = $links[0]->context; + static::assertSame($originalTraceId, $linkedContext->traceId->toHex()); + static::assertSame($originalSpanId, $linkedContext->spanId->toHex()); + static::assertTrue($linkedContext->isRemote); + } + + public function test_consumer_attaches_producer_baggage_in_link_mode(): void + { + $this->bootKernel([ + 'config' => static function (TestKernel $kernel): void { + $kernel->addTestExtensionConfig('flow_telemetry', [ + 'resource' => [], + 'exporters' => ['memory' => ['memory' => null], 'void' => ['void' => null]], + 'tracer_provider' => [ + 'processor' => [ + 'type' => 'memory', + 'exporter' => 'memory', + ], + ], + 'propagator' => ['type' => 'w3c'], + 'instrumentation' => [ + 'http_kernel' => false, + 'console' => false, + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + ], + ], + ]); + }, + ]); + + $this->getContainer(); + + $telemetry = $this->symfonyContext()->getService(Telemetry::class, Telemetry::class); + $contextStorage = $this->symfonyContext()->getService('flow.telemetry.context_storage', ContextStorage::class); + $propagator = $this->symfonyContext()->getService('flow.telemetry.propagator', Propagator::class); + + $traceparent = '00-abcdef0123456789abcdef0123456789-0123456789abcdef-01'; + + // Link mode runs the consumer span under the worker's own trace (e.g. the + // messenger:consume console span); establish that active worker trace here. + $workerTracer = $telemetry->tracer('worker'); + $workerSpan = $workerTracer->span('messenger:consume'); + + $bus = new MessageBus([ + new TracingMiddleware($telemetry, $contextStorage, $propagator, MessengerTracePropagation::Link), + new HandleMessageMiddleware(new HandlersLocator([ + TestMessage::class => [new TestMessageHandler()], + ])), + ]); + + $bus->dispatch(new Envelope(new TestMessage('test'), [ + new ReceivedStamp('async'), + new TelemetryStamp(['traceparent' => $traceparent, 'baggage' => 'user.id=42']), + ])); + + $baggage = $contextStorage->current()->baggage; + static::assertFalse($baggage->isEmpty()); + static::assertSame('42', $baggage->get('user.id')); + + $workerTracer->complete($workerSpan); + } + + public function test_producer_injects_stamp_in_link_mode(): void + { + $this->bootKernel([ + 'config' => static function (TestKernel $kernel): void { + $kernel->addTestExtensionConfig('flow_telemetry', [ + 'resource' => [], + 'exporters' => ['memory' => ['memory' => null], 'void' => ['void' => null]], + 'tracer_provider' => [ + 'processor' => [ + 'type' => 'memory', + 'exporter' => 'memory', + ], + ], + 'propagator' => ['type' => 'w3c'], + 'instrumentation' => [ + 'http_kernel' => false, + 'console' => false, + 'messenger' => [ + 'enabled' => true, + 'context_propagation' => true, + ], + ], + ]); + }, + ]); + + $this->getContainer(); + + $telemetry = $this->symfonyContext()->getService(Telemetry::class, Telemetry::class); + $contextStorage = $this->symfonyContext()->getService('flow.telemetry.context_storage', ContextStorage::class); + $propagator = $this->symfonyContext()->getService('flow.telemetry.propagator', Propagator::class); + + $traceId = TraceId::generate(); + $context = Context::withTraceId($traceId); + $contextStorage->attach($context); + + $tracer = $telemetry->tracer('test'); + $span = $tracer->span('parent-span'); + $contextStorage->attach($context->withActiveSpan($span->context()->spanId)); + + $capturingMiddleware = new CapturingMiddleware(); + + $bus = new MessageBus([ + new TracingMiddleware($telemetry, $contextStorage, $propagator, MessengerTracePropagation::Link), + $capturingMiddleware, + new HandleMessageMiddleware(new HandlersLocator([ + TestMessage::class => [new TestMessageHandler()], + ])), + ]); + + $bus->dispatch(new TestMessage('test')); + + $tracer->complete($span); + + static::assertNotNull($capturingMiddleware->captured); + $stamp = $capturingMiddleware->captured->last(TelemetryStamp::class); + static::assertInstanceOf(TelemetryStamp::class, $stamp); + + $traceparent = $stamp->get('traceparent'); + static::assertNotNull($traceparent); + static::assertStringContainsString($traceId->toHex(), $traceparent); + } + public function test_context_is_injected_on_dispatch_when_context_propagation_enabled(): void { $this->bootKernel([