Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
import io.modelcontextprotocol.spec.McpClientSession;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
Expand Down Expand Up @@ -225,6 +226,16 @@ private void close() {
this.mcpSession().close();
}

private void close(Throwable cause) {
McpClientSession mcpClientSession = this.mcpSession();
if (mcpClientSession != null) {
mcpClientSession.close(cause);
}
else {
this.error(cause);
}
}

private Mono<Void> closeGracefully() {
return this.mcpSession().closeGracefully();
}
Expand Down Expand Up @@ -260,6 +271,13 @@ public void handleException(Throwable t) {
// the implicit initialization step.
this.withInitialization("re-initializing", result -> Mono.empty()).subscribe();
}
else if (t instanceof McpStdioServerProcessExitException) {
DefaultInitialization previous = this.initializationRef.get();
if (previous != null && previous.initializeResult() == null
&& this.initializationRef.compareAndSet(previous, null)) {
previous.close(t);
}
}
}

/**
Expand Down Expand Up @@ -356,4 +374,4 @@ public Mono<?> closeGracefully() {
});
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.util.Assert;

/**
* Thrown when an MCP stdio server process exits unexpectedly.
*
* @author DragonFSKY
*/
public class McpStdioServerProcessExitException extends McpTransportException {

private static final long serialVersionUID = 1L;

private final int exitCode;

private final String command;

public McpStdioServerProcessExitException(int exitCode, String command) {
super(message(exitCode, command));
this.exitCode = exitCode;
this.command = command;
}

public int getExitCode() {
return this.exitCode;
}

public String getCommand() {
return this.command;
}

private static String message(int exitCode, String command) {
Assert.hasText(command, "The command can not be empty");
return "MCP server process exited unexpectedly with code " + exitCode + " for command: " + command;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -48,6 +49,10 @@ public class StdioClientTransport implements McpClientTransport {
/** The server process being communicated with */
private Process process;

private final AtomicReference<McpStdioServerProcessExitException> unexpectedExitException = new AtomicReference<>();

private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();

private McpJsonMapper jsonMapper;

/** Scheduler for handling inbound messages from the server process */
Expand All @@ -66,6 +71,8 @@ public class StdioClientTransport implements McpClientTransport {

private volatile boolean isClosing = false;

private volatile boolean closeRequested = false;

// visible for tests
private Consumer<String> stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);

Expand Down Expand Up @@ -134,6 +141,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
startInboundProcessing();
startOutboundProcessing();
startErrorProcessing();
startExitMonitoring();
logger.info("MCP server started");
}).subscribeOn(Schedulers.boundedElastic());
}
Expand All @@ -160,6 +168,11 @@ public void setStdErrorHandler(Consumer<String> errorHandler) {
this.stdErrorHandler = errorHandler;
}

@Override
public void setExceptionHandler(Consumer<Throwable> handler) {
this.exceptionHandler.set(handler);
}

/**
* Waits for the server process to exit.
* @throws RuntimeException if the process is interrupted while waiting
Expand Down Expand Up @@ -227,6 +240,14 @@ private void handleIncomingErrors() {

@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
McpStdioServerProcessExitException exitException = this.unexpectedExitException.get();
if (exitException != null) {
return Mono.error(exitException);
}
if (!this.closeRequested && this.process != null && !this.process.isAlive()) {
exitException = signalUnexpectedProcessExit(this.process.exitValue());
return Mono.error(exitException);
}
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
// TODO: essentially we could reschedule ourselves in some time and make
// another attempt with the already read data but pause reading until
Expand All @@ -240,6 +261,32 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
}
}

private void startExitMonitoring() {
this.process.onExit().thenAccept(process -> {
if (!closeRequested) {
signalUnexpectedProcessExit(process.exitValue());
}
});
}

private McpStdioServerProcessExitException signalUnexpectedProcessExit(int exitCode) {
McpStdioServerProcessExitException exception = new McpStdioServerProcessExitException(exitCode,
this.params.getCommand());
if (this.unexpectedExitException.compareAndSet(null, exception)) {
logger.warn(exception.getMessage());
isClosing = true;
inboundSink.tryEmitComplete();
outboundSink.tryEmitComplete();
errorSink.tryEmitComplete();

Consumer<Throwable> handler = this.exceptionHandler.get();
if (handler != null) {
handler.accept(exception);
}
}
return this.unexpectedExitException.get();
}

/**
* Starts the inbound processing thread that reads JSON-RPC messages from the
* process's input stream. Messages are deserialized and emitted to the inbound sink.
Expand Down Expand Up @@ -335,6 +382,7 @@ protected void handleOutbound(Function<Flux<JSONRPCMessage>, Flux<JSONRPCMessage
@Override
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(() -> {
closeRequested = true;
isClosing = true;
logger.debug("Initiating graceful shutdown");
}).then(Mono.<Void>defer(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,18 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
}

private void dismissPendingResponses() {
private void dismissPendingResponses(Throwable cause) {
this.pendingResponses.forEach((id, sink) -> {
logger.warn("Abruptly terminating exchange for request {}", id);
sink.error(new RuntimeException("MCP session with server terminated"));
logger.warn("Abruptly terminating exchange for request {}: {}", id, cause.toString());
sink.error(cause);
});
this.pendingResponses.clear();
}

private void dismissPendingResponses() {
dismissPendingResponses(new RuntimeException("MCP session with server terminated"));
}

private void handle(McpSchema.JSONRPCMessage message) {
if (message instanceof McpSchema.JSONRPCResponse response) {
logger.debug("Received response: {}", response);
Expand Down Expand Up @@ -300,4 +304,13 @@ public void close() {
dismissPendingResponses();
}

/**
* Closes the session immediately, failing pending operations with the given cause.
* @param cause the transport-level cause of the closure
*/
public void close(Throwable cause) {
Assert.notNull(cause, "The cause can not be null");
dismissPendingResponses(cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import java.util.function.Function;

import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
import io.modelcontextprotocol.spec.McpClientSession;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -302,6 +304,56 @@ void shouldHandleOtherExceptions() {
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
}

@Test
void shouldCloseInProgressInitializationOnStdioProcessExit() {
var cause = new McpStdioServerProcessExitException(127, "java");
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never())
.thenReturn(Mono.just(MOCK_INIT_RESULT));

var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
.subscribe();

initializer.handleException(cause);
subscription.dispose();

verify(mockClientSession).close(cause);

StepVerifier.create(initializer.withInitialization("retry", init -> Mono.just(init.initializeResult())))
.expectNext(MOCK_INIT_RESULT)
.verifyComplete();

verify(mockSessionSupplier, times(2)).apply(any(ContextView.class));
}

@Test
void shouldIgnoreGenericTransportExceptionDuringInitialization() {
var cause = new McpTransportException("Transport closed");
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never());

var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
.subscribe();

initializer.handleException(cause);
subscription.dispose();

verify(mockClientSession, never()).close(cause);
}

@Test
void shouldKeepInitializedAfterTransportException() {
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))
.expectNext(MOCK_INIT_RESULT)
.verifyComplete();

var cause = new McpTransportException("Transport closed");

initializer.handleException(cause);

assertThat(initializer.isInitialized()).isTrue();
verify(mockClientSession, never()).close(cause);
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
}

@Test
void shouldCloseGracefully() {
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,21 @@ void testRequestTimeout() {
session.close();
}

@Test
void testPendingRequestFailsWithCloseCause() {
var transport = new MockMcpClientTransport();
var session = new McpClientSession(TIMEOUT, transport, Map.of(),
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: {}", params))),
Function.identity());
var cause = new McpTransportException("Transport closed");

Mono<String> responseMono = session.sendRequest(TEST_METHOD, "test", responseType);

StepVerifier.create(responseMono).then(() -> session.close(cause)).expectErrorSatisfies(error -> {
assertThat(error).isSameAs(cause);
}).verify();
}

@Test
void testSendNotification() {
var transport = new MockMcpClientTransport();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024-2026 the original author or authors.
*/

package io.modelcontextprotocol.client;

final class FailingStdioServer {

private FailingStdioServer() {
}

public static void main(String[] args) {
System.err.println("Exiting before MCP initialization with code 127");
System.exit(127);
}

}
Loading