@@ -491,10 +491,6 @@ public Mono<Void> onClose() {
491491
492492 onCloseDelayer .tryEmitEmpty ();
493493
494- onCloseSubscriber .assertNotTerminated ();
495-
496- rule .otherClosedSink .tryEmitEmpty ();
497-
498494 onCloseSubscriber .assertTerminated ().assertComplete ();
499495
500496 Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
@@ -516,10 +512,6 @@ public void shouldResolveOnStartSource() {
516512
517513 rule .client .onClose ().subscribe (assertSubscriber1 );
518514
519- assertSubscriber1 .assertNotTerminated ();
520-
521- rule .otherClosedSink .tryEmitEmpty ();
522-
523515 assertSubscriber1 .assertTerminated ().assertComplete ();
524516
525517 Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
@@ -579,8 +571,6 @@ public void shouldBeRestartedIfSourceWasClosed() {
579571
580572 rule .client .dispose ();
581573
582- rule .otherClosedSink .tryEmitEmpty ();
583-
584574 terminateSubscriber .assertTerminated ().assertComplete ();
585575
586576 Assertions .assertThat (rule .client .connect ()).isFalse ();
@@ -636,8 +626,6 @@ public void shouldStartOriginalSourceOnceIfRacing() {
636626
637627 rule .client .dispose ();
638628
639- rule .otherClosedSink .tryEmitEmpty ();
640-
641629 Assertions .assertThat (rule .client .isDisposed ()).isTrue ();
642630 Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
643631
@@ -656,10 +644,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
656644 protected Sinks .One <RSocket > producer ;
657645
658646 protected Sinks .Empty <Void > onGracefulShutdownStartedSink ;
659- protected Sinks .Empty <Void > otherGracefulShutdownSink ;
660647 protected Sinks .Empty <Void > thisGracefulShutdownSink ;
661648 protected Sinks .Empty <Void > thisClosedSink ;
662- protected Sinks .Empty <Void > otherClosedSink ;
663649
664650 @ Override
665651 protected void doInit () {
@@ -679,10 +665,8 @@ protected void doInit() {
679665 @ Override
680666 protected RSocket newRSocket () {
681667 this .onGracefulShutdownStartedSink = Sinks .empty ();
682- this .otherGracefulShutdownSink = Sinks .empty ();
683668 this .thisGracefulShutdownSink = Sinks .empty ();
684669 this .thisClosedSink = Sinks .empty ();
685- this .otherClosedSink = Sinks .empty ();
686670 return new RSocketRequester (
687671 connection ,
688672 PayloadDecoder .ZERO_COPY ,
@@ -698,8 +682,8 @@ protected RSocket newRSocket() {
698682 onGracefulShutdownStartedSink ,
699683 thisGracefulShutdownSink ,
700684 thisClosedSink ,
701- otherGracefulShutdownSink . asMono (). and ( thisGracefulShutdownSink .asMono () ),
702- otherClosedSink . asMono (). and ( thisClosedSink .asMono () ));
685+ thisGracefulShutdownSink .asMono (),
686+ thisClosedSink .asMono ());
703687 }
704688 }
705689}
0 commit comments