1+ /*
2+ * Copyright (c) 2019, Salesforce.com, Inc.
3+ * All rights reserved.
4+ * Licensed under the BSD 3-Clause license.
5+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+ */
7+
8+ package com .salesforce .reactorgrpc ;
9+
10+ import io .grpc .testing .GrpcServerRule ;
11+ import org .junit .*;
12+ import reactor .core .publisher .Flux ;
13+ import reactor .core .publisher .Hooks ;
14+ import reactor .core .publisher .Mono ;
15+ import reactor .test .StepVerifier ;
16+
17+ import java .util .stream .Collectors ;
18+
19+ import static org .assertj .core .api .Assertions .assertThat ;
20+
21+ public class ReactorContextPropagationTest {
22+
23+ @ Rule
24+ public GrpcServerRule serverRule = new GrpcServerRule ();
25+
26+ private static class SimpleGreeter extends ReactorGreeterGrpc .GreeterImplBase {
27+ @ Override
28+ public Mono <HelloResponse > sayHello (Mono <HelloRequest > request ) {
29+ return request .map (HelloRequest ::getName )
30+ .map (name -> HelloResponse .newBuilder ().setMessage ("Hello " + name ).build ());
31+ }
32+
33+ @ Override
34+ public Mono <HelloResponse > sayHelloReqStream (Flux <HelloRequest > request ) {
35+ return request .transformDeferredContextual ((f , ctx ) -> f .map (HelloRequest ::getName ))
36+ .collect (Collectors .joining ("and" ))
37+ .map (names -> HelloResponse .newBuilder ().setMessage ("Hello " + names ).build ());
38+ }
39+
40+ @ Override
41+ public Flux <HelloResponse > sayHelloRespStream (Mono <HelloRequest > request ) {
42+ return request .repeat (2 )
43+ .map (HelloRequest ::getName )
44+ .zipWith (Flux .just ("Hello " , "Hi " , "Greetings " ), String ::join )
45+ .map (greeting -> HelloResponse .newBuilder ().setMessage (greeting ).build ());
46+ }
47+
48+ @ Override
49+ public Flux <HelloResponse > sayHelloBothStream (Flux <HelloRequest > request ) {
50+ return request .map (HelloRequest ::getName )
51+ .map (name -> HelloResponse .newBuilder ().setMessage ("Hello " + name ).build ());
52+ }
53+ }
54+
55+ @ BeforeClass
56+ public static void beforeAll (){
57+ Hooks .enableContextLossTracking ();
58+ Hooks .onOperatorDebug ();
59+ }
60+
61+ @ AfterClass
62+ public static void afterAll (){
63+ Hooks .disableContextLossTracking ();
64+ Hooks .resetOnOperatorDebug ();
65+ }
66+
67+ @ Before
68+ public void setup () {
69+ serverRule .getServiceRegistry ().addService (new SimpleGreeter ());
70+ }
71+
72+ @ Test
73+ public void oneToOne () {
74+ ReactorGreeterGrpc .ReactorGreeterStub stub = ReactorGreeterGrpc .newReactorStub (serverRule .getChannel ());
75+ Mono <HelloRequest > req = Mono .just (HelloRequest .newBuilder ().setName ("reactor" ).build ());
76+
77+ Mono <HelloResponse > resp = req
78+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
79+ .transform (stub ::sayHello )
80+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
81+ .contextWrite (ctx -> ctx .put ("name" , "context" ));
82+
83+ StepVerifier .create (resp )
84+ .expectNextCount (1 )
85+ .verifyComplete ();
86+ }
87+
88+ @ Test
89+ public void oneToMany () {
90+ ReactorGreeterGrpc .ReactorGreeterStub stub = ReactorGreeterGrpc .newReactorStub (serverRule .getChannel ());
91+ Mono <HelloRequest > req = Mono .just (HelloRequest .newBuilder ().setName ("reactor" ).build ());
92+
93+ Flux <HelloResponse > resp = req
94+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
95+ .as (stub ::sayHelloRespStream )
96+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
97+ .contextWrite (ctx -> ctx .put ("name" , "context" ));
98+
99+ StepVerifier .create (resp )
100+ .expectNextCount (3 )
101+ .verifyComplete ();
102+ }
103+
104+ @ Test
105+ public void manyToOne () {
106+ ReactorGreeterGrpc .ReactorGreeterStub stub = ReactorGreeterGrpc .newReactorStub (serverRule .getChannel ());
107+ Flux <HelloRequest > req = Mono .deferContextual (ctx -> Mono .just (HelloRequest .newBuilder ().setName (ctx .get ("name" )).build ())).repeat (2 );
108+
109+ Mono <HelloResponse > resp = req
110+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
111+ .as (stub ::sayHelloReqStream )
112+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
113+ .contextWrite (ctx -> ctx .put ("name" , "context" ));
114+
115+ StepVerifier .create (resp )
116+ .expectAccessibleContext ()
117+ .contains ("name" , "context" )
118+ .then ()
119+ .expectNextCount (1 )
120+ .verifyComplete ();
121+ }
122+
123+ @ Test
124+ public void manyToMany () {
125+ ReactorGreeterGrpc .ReactorGreeterStub stub = ReactorGreeterGrpc .newReactorStub (serverRule .getChannel ());
126+ Flux <HelloRequest > req = Mono .just (HelloRequest .newBuilder ().setName ("reactor" ).build ()).repeat (2 ).contextWrite (c -> c .put ("name" , "boom" ));
127+
128+ Flux <HelloResponse > resp = req
129+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
130+ .transform (stub ::sayHelloBothStream )
131+ .doOnEach (signal -> assertThat (signal .getContextView ().getOrEmpty ("name" )).isNotEmpty ())
132+ .contextWrite (ctx -> ctx .put ("name" , "context" ));
133+
134+ StepVerifier .create (resp )
135+ .expectNextCount (3 )
136+ .verifyComplete ();
137+ }
138+ }
0 commit comments