44import com .rabbitmq .client .ConnectionFactory ;
55import lombok .AccessLevel ;
66import lombok .NoArgsConstructor ;
7- import lombok .SneakyThrows ;
87import lombok .extern .java .Log ;
98import org .reactivecommons .api .domain .DomainEventBus ;
109import org .reactivecommons .async .commons .DLQDiscardNotifier ;
5049import java .security .cert .CertificateException ;
5150import java .time .Duration ;
5251import java .util .Arrays ;
52+ import java .util .concurrent .ConcurrentHashMap ;
53+ import java .util .concurrent .ConcurrentMap ;
5354import java .util .logging .Level ;
5455
5556@ Log
5657@ NoArgsConstructor (access = AccessLevel .PRIVATE )
5758public final class RabbitMQSetupUtils {
58- private static final String LISTENER_TYPE = "listener" ;
59- private static final String TOPOLOGY_TYPE = "topology" ;
60- private static final String SENDER_TYPE = "sender" ;
59+ private static final String SHARED_TYPE = "shared" ;
6160 private static final String DEFAULT_PROTOCOL ;
6261 public static final int START_INTERVAL = 300 ;
6362 public static final int MAX_BACKOFF_INTERVAL = 3000 ;
6463
64+ private static final ConcurrentMap <RabbitProperties , ConnectionFactory > FACTORY_CACHE = new ConcurrentHashMap <>();
65+ private static final ConcurrentMap <ConnectionFactory , Mono <Connection >> CONNECTION_CACHE = new ConcurrentHashMap <>();
66+
6567 static {
6668 String protocol = "TLSv1.1" ;
6769 try {
@@ -78,17 +80,23 @@ public final class RabbitMQSetupUtils {
7880 DEFAULT_PROTOCOL = protocol ;
7981 }
8082
81- @ SneakyThrows
8283 public static ConnectionFactoryProvider connectionFactoryProvider (RabbitProperties properties ) {
83- final ConnectionFactory factory = new ConnectionFactory ();
84- PropertyMapper map = PropertyMapper .get ();
85- map .from (properties ::determineHost ).whenNonNull ().to (factory ::setHost );
86- map .from (properties ::determinePort ).to (factory ::setPort );
87- map .from (properties ::determineUsername ).whenNonNull ().to (factory ::setUsername );
88- map .from (properties ::determinePassword ).whenNonNull ().to (factory ::setPassword );
89- map .from (properties ::determineVirtualHost ).whenNonNull ().to (factory ::setVirtualHost );
90- factory .useNio ();
91- setUpSSL (factory , properties );
84+ final ConnectionFactory factory = FACTORY_CACHE .computeIfAbsent (properties , props -> {
85+ try {
86+ ConnectionFactory newFactory = new ConnectionFactory ();
87+ PropertyMapper map = PropertyMapper .get ();
88+ map .from (props ::determineHost ).whenNonNull ().to (newFactory ::setHost );
89+ map .from (props ::determinePort ).to (newFactory ::setPort );
90+ map .from (props ::determineUsername ).whenNonNull ().to (newFactory ::setUsername );
91+ map .from (props ::determinePassword ).whenNonNull ().to (newFactory ::setPassword );
92+ map .from (props ::determineVirtualHost ).whenNonNull ().to (newFactory ::setVirtualHost );
93+ newFactory .useNio ();
94+ setUpSSL (newFactory , props );
95+ return newFactory ;
96+ } catch (Exception e ) {
97+ throw new RuntimeException ("Error creating ConnectionFactory: " , e );
98+ }
99+ });
92100 return () -> factory ;
93101 }
94102
@@ -107,7 +115,7 @@ public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvide
107115
108116 public static ReactiveMessageListener createMessageListener (ConnectionFactoryProvider provider , AsyncProps props ) {
109117 final Mono <Connection > connection =
110- createConnectionMono (provider .getConnectionFactory (), props .getAppName (), LISTENER_TYPE );
118+ createConnectionMono (provider .getConnectionFactory (), props .getAppName ());
111119 final Receiver receiver = RabbitFlux .createReceiver (new ReceiverOptions ().connectionMono (connection ));
112120 final Sender sender = RabbitFlux .createSender (new SenderOptions ().connectionMono (connection ));
113121
@@ -119,8 +127,7 @@ public static ReactiveMessageListener createMessageListener(ConnectionFactoryPro
119127
120128 public static TopologyCreator createTopologyCreator (AsyncProps props ) {
121129 ConnectionFactoryProvider provider = connectionFactoryProvider (props .getConnectionProperties ());
122- final Mono <Connection > connection = createConnectionMono (provider .getConnectionFactory (),
123- props .getAppName (), TOPOLOGY_TYPE );
130+ final Mono <Connection > connection = createConnectionMono (provider .getConnectionFactory (), props .getAppName ());
124131 final Sender sender = RabbitFlux .createSender (new SenderOptions ().connectionMono (connection ));
125132 return new TopologyCreator (sender , props .getQueueType ());
126133 }
@@ -134,8 +141,7 @@ public static DiscardNotifier createDiscardNotifier(ReactiveMessageSender sender
134141
135142 private static SenderOptions reactiveCommonsSenderOptions (String appName , ConnectionFactoryProvider provider ,
136143 RabbitProperties rabbitProperties ) {
137- final Mono <Connection > senderConnection = createConnectionMono (provider .getConnectionFactory (), appName ,
138- SENDER_TYPE );
144+ final Mono <Connection > senderConnection = createConnectionMono (provider .getConnectionFactory (), appName );
139145 final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions ();
140146 final PropertyMapper map = PropertyMapper .get ();
141147
@@ -153,18 +159,20 @@ private static SenderOptions reactiveCommonsSenderOptions(String appName, Connec
153159 .transform (Utils ::cache ));
154160 }
155161
156- private static Mono <Connection > createConnectionMono (ConnectionFactory factory , String connectionPrefix ,
157- String connectionType ) {
158- log .info ("Creating connection mono to RabbitMQ Broker in host '" + factory .getHost () + "' with " +
159- "type: " + connectionType );
160- return Mono .fromCallable (() -> factory .newConnection (connectionPrefix + " " + connectionType ))
161- .doOnError (err ->
162- log .log (Level .SEVERE , "Error creating connection to RabbitMQ Broker in host '" +
163- factory .getHost () + "'. Starting retry process..." , err )
164- )
165- .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (START_INTERVAL ))
166- .maxBackoff (Duration .ofMillis (MAX_BACKOFF_INTERVAL )))
167- .cache ();
162+ private static Mono <Connection > createConnectionMono (ConnectionFactory factory , String appName ) {
163+ return CONNECTION_CACHE .computeIfAbsent (factory , f -> {
164+ log .info ("Creating connection mono to RabbitMQ Broker in host '" + f .getHost () + "'" );
165+ return Mono .fromCallable (() -> f .newConnection (
166+ appName + "-" + InstanceIdentifier .getInstanceId (SHARED_TYPE , "" )
167+ ))
168+ .doOnError (err ->
169+ log .log (Level .SEVERE , "Error creating connection to RabbitMQ Broker in host '"
170+ + f .getHost () + "'. Starting retry process..." , err )
171+ )
172+ .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (START_INTERVAL ))
173+ .maxBackoff (Duration .ofMillis (MAX_BACKOFF_INTERVAL )))
174+ .cache ();
175+ });
168176 }
169177
170178 // SSL based on RabbitConnectionFactoryBean
0 commit comments