diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java index ce333675f..1e33d475e 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException; import io.modelcontextprotocol.spec.McpClientSession; import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; @@ -225,6 +226,16 @@ private void close() { this.mcpSession().close(); } + private void close(Throwable cause) { + McpClientSession mcpClientSession = this.mcpSession(); + if (mcpClientSession != null) { + mcpClientSession.close(cause); + } + else { + this.error(cause); + } + } + private Mono closeGracefully() { return this.mcpSession().closeGracefully(); } @@ -260,6 +271,13 @@ public void handleException(Throwable t) { // the implicit initialization step. this.withInitialization("re-initializing", result -> Mono.empty()).subscribe(); } + else if (t instanceof McpStdioServerProcessExitException) { + DefaultInitialization previous = this.initializationRef.get(); + if (previous != null && previous.initializeResult() == null + && this.initializationRef.compareAndSet(previous, null)) { + previous.close(t); + } + } } /** @@ -356,4 +374,4 @@ public Mono closeGracefully() { }); } -} \ No newline at end of file +} diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpStdioServerProcessExitException.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpStdioServerProcessExitException.java new file mode 100644 index 000000000..1454684a8 --- /dev/null +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpStdioServerProcessExitException.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026-2026 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import io.modelcontextprotocol.spec.McpTransportException; +import io.modelcontextprotocol.util.Assert; + +/** + * Thrown when an MCP stdio server process exits unexpectedly. + * + * @author DragonFSKY + */ +public class McpStdioServerProcessExitException extends McpTransportException { + + private static final long serialVersionUID = 1L; + + private final int exitCode; + + private final String command; + + public McpStdioServerProcessExitException(int exitCode, String command) { + super(message(exitCode, command)); + this.exitCode = exitCode; + this.command = command; + } + + public int getExitCode() { + return this.exitCode; + } + + public String getCommand() { + return this.command; + } + + private static String message(int exitCode, String command) { + Assert.hasText(command, "The command can not be empty"); + return "MCP server process exited unexpectedly with code " + exitCode + " for command: " + command; + } + +} diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java index 1b4eaca97..70c381ab1 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -48,6 +49,10 @@ public class StdioClientTransport implements McpClientTransport { /** The server process being communicated with */ private Process process; + private final AtomicReference unexpectedExitException = new AtomicReference<>(); + + private final AtomicReference> exceptionHandler = new AtomicReference<>(); + private McpJsonMapper jsonMapper; /** Scheduler for handling inbound messages from the server process */ @@ -66,6 +71,8 @@ public class StdioClientTransport implements McpClientTransport { private volatile boolean isClosing = false; + private volatile boolean closeRequested = false; + // visible for tests private Consumer stdErrorHandler = error -> logger.info("STDERR Message received: {}", error); @@ -134,6 +141,7 @@ public Mono connect(Function, Mono> h startInboundProcessing(); startOutboundProcessing(); startErrorProcessing(); + startExitMonitoring(); logger.info("MCP server started"); }).subscribeOn(Schedulers.boundedElastic()); } @@ -160,6 +168,11 @@ public void setStdErrorHandler(Consumer errorHandler) { this.stdErrorHandler = errorHandler; } + @Override + public void setExceptionHandler(Consumer handler) { + this.exceptionHandler.set(handler); + } + /** * Waits for the server process to exit. * @throws RuntimeException if the process is interrupted while waiting @@ -227,6 +240,14 @@ private void handleIncomingErrors() { @Override public Mono sendMessage(JSONRPCMessage message) { + McpStdioServerProcessExitException exitException = this.unexpectedExitException.get(); + if (exitException != null) { + return Mono.error(exitException); + } + if (!this.closeRequested && this.process != null && !this.process.isAlive()) { + exitException = signalUnexpectedProcessExit(this.process.exitValue()); + return Mono.error(exitException); + } if (this.outboundSink.tryEmitNext(message).isSuccess()) { // TODO: essentially we could reschedule ourselves in some time and make // another attempt with the already read data but pause reading until @@ -240,6 +261,32 @@ public Mono sendMessage(JSONRPCMessage message) { } } + private void startExitMonitoring() { + this.process.onExit().thenAccept(process -> { + if (!closeRequested) { + signalUnexpectedProcessExit(process.exitValue()); + } + }); + } + + private McpStdioServerProcessExitException signalUnexpectedProcessExit(int exitCode) { + McpStdioServerProcessExitException exception = new McpStdioServerProcessExitException(exitCode, + this.params.getCommand()); + if (this.unexpectedExitException.compareAndSet(null, exception)) { + logger.warn(exception.getMessage()); + isClosing = true; + inboundSink.tryEmitComplete(); + outboundSink.tryEmitComplete(); + errorSink.tryEmitComplete(); + + Consumer handler = this.exceptionHandler.get(); + if (handler != null) { + handler.accept(exception); + } + } + return this.unexpectedExitException.get(); + } + /** * Starts the inbound processing thread that reads JSON-RPC messages from the * process's input stream. Messages are deserialized and emitted to the inbound sink. @@ -335,6 +382,7 @@ protected void handleOutbound(Function, Flux closeGracefully() { return Mono.fromRunnable(() -> { + closeRequested = true; isClosing = true; logger.debug("Initiating graceful shutdown"); }).then(Mono.defer(() -> { diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index a5a51bff0..5735e9ce7 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -122,14 +122,18 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(); } - private void dismissPendingResponses() { + private void dismissPendingResponses(Throwable cause) { this.pendingResponses.forEach((id, sink) -> { - logger.warn("Abruptly terminating exchange for request {}", id); - sink.error(new RuntimeException("MCP session with server terminated")); + logger.warn("Abruptly terminating exchange for request {}: {}", id, cause.toString()); + sink.error(cause); }); this.pendingResponses.clear(); } + private void dismissPendingResponses() { + dismissPendingResponses(new RuntimeException("MCP session with server terminated")); + } + private void handle(McpSchema.JSONRPCMessage message) { if (message instanceof McpSchema.JSONRPCResponse response) { logger.debug("Received response: {}", response); @@ -300,4 +304,13 @@ public void close() { dismissPendingResponses(); } + /** + * Closes the session immediately, failing pending operations with the given cause. + * @param cause the transport-level cause of the closure + */ + public void close(Throwable cause) { + Assert.notNull(cause, "The cause can not be null"); + dismissPendingResponses(cause); + } + } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java index be7b7dc53..64f3e37db 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java @@ -11,8 +11,10 @@ import java.util.function.Function; import io.modelcontextprotocol.client.LifecycleInitializer.Initialization; +import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException; import io.modelcontextprotocol.spec.McpClientSession; import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpTransportException; import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -302,6 +304,56 @@ void shouldHandleOtherExceptions() { verify(mockSessionSupplier, times(1)).apply(any(ContextView.class)); } + @Test + void shouldCloseInProgressInitializationOnStdioProcessExit() { + var cause = new McpStdioServerProcessExitException(127, "java"); + when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never()) + .thenReturn(Mono.just(MOCK_INIT_RESULT)); + + var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult())) + .subscribe(); + + initializer.handleException(cause); + subscription.dispose(); + + verify(mockClientSession).close(cause); + + StepVerifier.create(initializer.withInitialization("retry", init -> Mono.just(init.initializeResult()))) + .expectNext(MOCK_INIT_RESULT) + .verifyComplete(); + + verify(mockSessionSupplier, times(2)).apply(any(ContextView.class)); + } + + @Test + void shouldIgnoreGenericTransportExceptionDuringInitialization() { + var cause = new McpTransportException("Transport closed"); + when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never()); + + var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult())) + .subscribe(); + + initializer.handleException(cause); + subscription.dispose(); + + verify(mockClientSession, never()).close(cause); + } + + @Test + void shouldKeepInitializedAfterTransportException() { + StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))) + .expectNext(MOCK_INIT_RESULT) + .verifyComplete(); + + var cause = new McpTransportException("Transport closed"); + + initializer.handleException(cause); + + assertThat(initializer.isInitialized()).isTrue(); + verify(mockClientSession, never()).close(cause); + verify(mockSessionSupplier, times(1)).apply(any(ContextView.class)); + } + @Test void shouldCloseGracefully() { StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))) diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java index ae5daf1f4..13cdff1af 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java @@ -107,6 +107,21 @@ void testRequestTimeout() { session.close(); } + @Test + void testPendingRequestFailsWithCloseCause() { + var transport = new MockMcpClientTransport(); + var session = new McpClientSession(TIMEOUT, transport, Map.of(), + Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: {}", params))), + Function.identity()); + var cause = new McpTransportException("Transport closed"); + + Mono responseMono = session.sendRequest(TEST_METHOD, "test", responseType); + + StepVerifier.create(responseMono).then(() -> session.close(cause)).expectErrorSatisfies(error -> { + assertThat(error).isSameAs(cause); + }).verify(); + } + @Test void testSendNotification() { var transport = new MockMcpClientTransport(); diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/FailingStdioServer.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/FailingStdioServer.java new file mode 100644 index 000000000..63fbaa2f9 --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/FailingStdioServer.java @@ -0,0 +1,17 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +final class FailingStdioServer { + + private FailingStdioServer() { + } + + public static void main(String[] args) { + System.err.println("Exiting before MCP initialization with code 127"); + System.exit(127); + } + +} diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/StdioMcpClientInitializationFailureTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/StdioMcpClientInitializationFailureTests.java new file mode 100644 index 000000000..870d9fd65 --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/StdioMcpClientInitializationFailureTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024-2026 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import io.modelcontextprotocol.client.transport.ServerParameters; +import io.modelcontextprotocol.client.transport.StdioClientTransport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static io.modelcontextprotocol.util.McpJsonMapperUtils.JSON_MAPPER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +/** + * Tests for initialization failures reported by {@link StdioClientTransport}. + * + * @author DragonFSKY + */ +@Timeout(10) +class StdioMcpClientInitializationFailureTests { + + @Test + void initializeShouldFailWithProcessExitInsteadOfRequestTimeout() { + Duration requestTimeout = Duration.ofSeconds(3); + String classpath = System.getProperty("java.class.path"); + ServerParameters stdioParams = ServerParameters.builder(javaExecutable()) + .args("-cp", classpath, FailingStdioServer.class.getName()) + .build(); + StdioClientTransport transport = new StdioClientTransport(stdioParams, JSON_MAPPER); + McpSyncClient client = McpClient.sync(transport) + .requestTimeout(requestTimeout) + .initializationTimeout(Duration.ofSeconds(5)) + .build(); + + Throwable failure; + long elapsedMillis; + try { + long startNanos = System.nanoTime(); + failure = catchThrowable(client::initialize); + elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + } + finally { + client.closeGracefully(); + } + + assertThat(failure).isNotNull(); + String stackTrace = stackTraceOf(failure); + assertThat(elapsedMillis).isLessThan(requestTimeout.toMillis()); + assertThat(stackTrace).contains("MCP server process exited", "with code 127") + .doesNotContain("TimeoutException"); + } + + private String javaExecutable() { + String executable = System.getProperty("os.name").toLowerCase().contains("win") ? "java.exe" : "java"; + return Path.of(System.getProperty("java.home"), "bin", executable).toString(); + } + + private String stackTraceOf(Throwable failure) { + StringWriter writer = new StringWriter(); + failure.printStackTrace(new PrintWriter(writer)); + return writer.toString(); + } + +}