From 7c64403c333633e7aba30a36f02069cdef946c7d Mon Sep 17 00:00:00 2001 From: "iman.khadiv@gmail.com" Date: Mon, 29 Jun 2026 16:06:49 -0700 Subject: [PATCH] fix(client): ignore unknown SSE fields instead of erroring the stream The SSE line parser rejected any line it did not recognize (e.g. the standard `retry:` field), throwing McpTransportException and tearing down an otherwise valid stream. Per the SSE spec, unknown fields must be ignored. Log such lines at debug and continue. Refs #1047 --- .../client/transport/ResponseSubscribers.java | 10 +-- .../transport/ResponseSubscribersTests.java | 79 +++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 mcp-core/src/test/java/io/modelcontextprotocol/client/transport/ResponseSubscribersTests.java diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 29dc23c35..b1255c823 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -15,7 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.modelcontextprotocol.spec.McpTransportException; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.FluxSink; @@ -180,10 +179,11 @@ else if (line.startsWith(":")) { upstream().request(1); } else { - // If the response is not successful, emit an error - this.sink.error(new McpTransportException( - "Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line)); - + // Per the SSE spec, fields the client does not recognize (e.g. + // "retry:") + // must be ignored rather than treated as an invalid response. + logger.debug("Ignoring unrecognized SSE line: {}", line); + upstream().request(1); } } } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/ResponseSubscribersTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/ResponseSubscribersTests.java new file mode 100644 index 000000000..17bed6cbd --- /dev/null +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/ResponseSubscribersTests.java @@ -0,0 +1,79 @@ +/* + * Copyright 2026-2026 the original author or authors. + */ +package io.modelcontextprotocol.client.transport; + +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.Map; + +import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; +import io.modelcontextprotocol.client.transport.ResponseSubscribers.SseResponseEvent; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.type; + +/** + * Tests for {@link ResponseSubscribers} SSE line parsing. + * + * @author Iman Rastkhadiv + */ +class ResponseSubscribersTests { + + private static HttpResponse.ResponseInfo responseInfo(int statusCode) { + return new HttpResponse.ResponseInfo() { + @Override + public int statusCode() { + return statusCode; + } + + @Override + public HttpHeaders headers() { + return HttpHeaders.of(Map.of(), (k, v) -> true); + } + + @Override + public HttpClient.Version version() { + return HttpClient.Version.HTTP_1_1; + } + }; + } + + private List parse(List lines) { + return Flux + .create(sink -> Flux.fromIterable(lines) + .subscribe(new ResponseSubscribers.SseLineSubscriber(responseInfo(200), sink))) + .collectList() + .block(Duration.ofSeconds(5)); + } + + @Test + void retryFieldIsNotRejected() { + // `retry:` is a valid SSE field. The client does not support resumability, but a + // server may still send it; it must be ignored, not error the stream. + List events = parse(List.of("id: event-1", "retry: 500", "data: hello", "")); + + assertThat(events).singleElement().asInstanceOf(type(SseResponseEvent.class)).satisfies(e -> { + assertThat(e.sseEvent().id()).isEqualTo("event-1"); + assertThat(e.sseEvent().data()).isEqualTo("hello"); + }); + } + + @Test + void unknownFieldIsIgnored() { + // Per the SSE spec, fields the client does not recognize must be ignored rather + // than treated as an invalid response. + List events = parse(List.of("id: event-1", "someFutureField: whatever", "data: hello", "")); + + assertThat(events).singleElement().asInstanceOf(type(SseResponseEvent.class)).satisfies(e -> { + assertThat(e.sseEvent().id()).isEqualTo("event-1"); + assertThat(e.sseEvent().data()).isEqualTo("hello"); + }); + } + +}