From 46b8cc31dea9fc9a9765615d26cb69e4c701335e Mon Sep 17 00:00:00 2001 From: Will Droste Date: Tue, 17 Feb 2026 00:46:25 -0600 Subject: [PATCH] Fix Resume preventing error frame on RejectedSetupException (#1121) When Resume was configured, RejectedSetupException errors were queued in the ResumableDuplexConnection's buffer instead of being sent directly to the client. The error frame now bypasses resume buffering during setup rejection, ensuring clients receive the rejection and connections close properly. Co-Authored-By: Claude Opus 4.6 --- .../java/io/rsocket/core/ServerSetup.java | 21 ++++++++++++ .../io/rsocket/core/RSocketServerTest.java | 32 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 5aae22e89..7ca400862 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -29,6 +29,8 @@ import io.rsocket.resume.*; import java.nio.channels.ClosedChannelException; import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Function; import reactor.core.publisher.Mono; @@ -96,6 +98,8 @@ static class ResumableServerSetup extends ServerSetup { private final Duration resumeStreamTimeout; private final Function resumeStoreFactory; private final boolean cleanupStoreOnKeepAlive; + private final Map rawConnections = + new ConcurrentHashMap<>(); ResumableServerSetup( Duration timeout, @@ -136,6 +140,8 @@ public Mono acceptRSocketSetup( sessionManager.save(serverRSocketSession, resumeToken); + rawConnections.put(resumableDuplexConnection, duplexConnection); + return then.apply( new ResumableKeepAliveHandler( resumableDuplexConnection, serverRSocketSession, serverRSocketSession), @@ -145,6 +151,21 @@ public Mono acceptRSocketSetup( } } + @Override + void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) { + DuplexConnection rawConnection = rawConnections.remove(duplexConnection); + if (rawConnection != null) { + // Send the error directly on the raw connection, bypassing ResumableDuplexConnection + // which would buffer the frame instead of sending it to the client + rawConnection.sendErrorAndClose(exception); + rawConnection.receive().subscribe(); + // Dispose the resumable connection to clean up the session + duplexConnection.dispose(); + } else { + super.sendError(duplexConnection, exception); + } + } + @Override public Mono acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) { ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame)); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java index a335ac1f3..36cdf12a8 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java @@ -198,4 +198,36 @@ public void ensuresErrorFrameDeliveredPriorConnectionDisposal() { server.dispose(); transport.alloc().assertHasNoLeaks(); } + + @Test + public void ensuresErrorFrameDeliveredWithResumeEnabled() { + TestServerTransport transport = new TestServerTransport(); + Closeable server = + RSocketServer.create() + .resume(new Resume()) + .acceptor( + (setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED"))) + .bind(transport) + .block(); + + TestDuplexConnection connection = transport.connect(); + connection.addToReceivedBuffer( + SetupFrameCodec.encode( + ByteBufAllocator.DEFAULT, + false, + 0, + 1, + Unpooled.wrappedBuffer("test-resume-token".getBytes()), + "metadata_type", + "data_type", + EmptyPayload.INSTANCE)); + + StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30)); + FrameAssert.assertThat(connection.pollFrame()) + .hasStreamIdZero() + .hasData("ACCESS_DENIED") + .hasNoLeaks(); + server.dispose(); + transport.alloc().assertHasNoLeaks(); + } }