diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 5aae22e89..7ca400862 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -29,6 +29,8 @@ import io.rsocket.resume.*; import java.nio.channels.ClosedChannelException; import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Function; import reactor.core.publisher.Mono; @@ -96,6 +98,8 @@ static class ResumableServerSetup extends ServerSetup { private final Duration resumeStreamTimeout; private final Function resumeStoreFactory; private final boolean cleanupStoreOnKeepAlive; + private final Map rawConnections = + new ConcurrentHashMap<>(); ResumableServerSetup( Duration timeout, @@ -136,6 +140,8 @@ public Mono acceptRSocketSetup( sessionManager.save(serverRSocketSession, resumeToken); + rawConnections.put(resumableDuplexConnection, duplexConnection); + return then.apply( new ResumableKeepAliveHandler( resumableDuplexConnection, serverRSocketSession, serverRSocketSession), @@ -145,6 +151,21 @@ public Mono acceptRSocketSetup( } } + @Override + void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) { + DuplexConnection rawConnection = rawConnections.remove(duplexConnection); + if (rawConnection != null) { + // Send the error directly on the raw connection, bypassing ResumableDuplexConnection + // which would buffer the frame instead of sending it to the client + rawConnection.sendErrorAndClose(exception); + rawConnection.receive().subscribe(); + // Dispose the resumable connection to clean up the session + duplexConnection.dispose(); + } else { + super.sendError(duplexConnection, exception); + } + } + @Override public Mono acceptRSocketResume(ByteBuf frame, DuplexConnection duplexConnection) { ServerRSocketSession session = sessionManager.get(ResumeFrameCodec.token(frame)); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java index a335ac1f3..36cdf12a8 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java @@ -198,4 +198,36 @@ public void ensuresErrorFrameDeliveredPriorConnectionDisposal() { server.dispose(); transport.alloc().assertHasNoLeaks(); } + + @Test + public void ensuresErrorFrameDeliveredWithResumeEnabled() { + TestServerTransport transport = new TestServerTransport(); + Closeable server = + RSocketServer.create() + .resume(new Resume()) + .acceptor( + (setup, sendingSocket) -> Mono.error(new RejectedSetupException("ACCESS_DENIED"))) + .bind(transport) + .block(); + + TestDuplexConnection connection = transport.connect(); + connection.addToReceivedBuffer( + SetupFrameCodec.encode( + ByteBufAllocator.DEFAULT, + false, + 0, + 1, + Unpooled.wrappedBuffer("test-resume-token".getBytes()), + "metadata_type", + "data_type", + EmptyPayload.INSTANCE)); + + StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30)); + FrameAssert.assertThat(connection.pollFrame()) + .hasStreamIdZero() + .hasData("ACCESS_DENIED") + .hasNoLeaks(); + server.dispose(); + transport.alloc().assertHasNoLeaks(); + } }