1616import org .reactivecommons .async .rabbit .config .ConnectionFactoryProvider ;
1717import org .reactivecommons .async .rabbit .config .RabbitProperties ;
1818import org .reactivecommons .async .rabbit .config .props .AsyncProps ;
19+ import org .reactivecommons .async .rabbit .config .spring .RabbitPropertiesBase ;
1920import org .springframework .boot .context .properties .PropertyMapper ;
2021import reactor .core .publisher .Mono ;
2122import reactor .rabbitmq .ChannelPool ;
2930import reactor .rabbitmq .Utils ;
3031import reactor .util .retry .Retry ;
3132
33+ import javax .net .ssl .KeyManager ;
34+ import javax .net .ssl .KeyManagerFactory ;
35+ import javax .net .ssl .SSLContext ;
36+ import javax .net .ssl .TrustManager ;
37+ import javax .net .ssl .TrustManagerFactory ;
38+ import javax .net .ssl .X509TrustManager ;
39+ import java .io .FileInputStream ;
40+ import java .io .IOException ;
41+ import java .io .InputStream ;
42+ import java .security .KeyManagementException ;
43+ import java .security .KeyStore ;
44+ import java .security .KeyStoreException ;
45+ import java .security .NoSuchAlgorithmException ;
46+ import java .security .SecureRandom ;
47+ import java .security .UnrecoverableKeyException ;
48+ import java .security .cert .CertificateException ;
3249import java .time .Duration ;
50+ import java .util .Arrays ;
3351import java .util .logging .Level ;
3452
3553@ Log
3654@ UtilityClass
3755public class RabbitMQSetupUtils {
3856 private static final String LISTENER_TYPE = "listener" ;
3957 private static final String SENDER_TYPE = "sender" ;
58+ private static final String DEFAULT_PROTOCOL ;
59+ public static final int START_INTERVAL = 300 ;
60+ public static final int MAX_BACKOFF_INTERVAL = 3000 ;
61+
62+ static {
63+ String protocol = "TLSv1.1" ;
64+ try {
65+ String [] protocols = SSLContext .getDefault ().getSupportedSSLParameters ().getProtocols ();
66+ for (String prot : protocols ) {
67+ if ("TLSv1.2" .equals (prot )) {
68+ protocol = "TLSv1.2" ;
69+ break ;
70+ }
71+ }
72+ } catch (NoSuchAlgorithmException e ) {
73+ // nothing
74+ }
75+ DEFAULT_PROTOCOL = protocol ;
76+ }
4077
4178 @ SneakyThrows
4279 public static ConnectionFactoryProvider connectionFactoryProvider (RabbitProperties properties ) {
@@ -48,9 +85,7 @@ public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperti
4885 map .from (properties ::determinePassword ).whenNonNull ().to (factory ::setPassword );
4986 map .from (properties ::determineVirtualHost ).whenNonNull ().to (factory ::setVirtualHost );
5087 factory .useNio ();
51- if (properties .getSsl () != null && properties .getSsl ().isEnabled ()) {
52- factory .useSslProtocol ();
53- }
88+ setUpSSL (factory , properties );
5489 return () -> factory ;
5590 }
5691
@@ -118,9 +153,95 @@ private static Mono<Connection> createConnectionMono(ConnectionFactory factory,
118153 log .log (Level .SEVERE , "Error creating connection to RabbitMQ Broker in host" +
119154 factory .getHost () + ". Starting retry process..." , err )
120155 )
121- .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (300 ))
122- .maxBackoff (Duration .ofMillis (3000 )))
156+ .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (START_INTERVAL ))
157+ .maxBackoff (Duration .ofMillis (MAX_BACKOFF_INTERVAL )))
123158 .cache ();
124159 }
125160
161+ // SSL based on RabbitConnectionFactoryBean
162+ // https://github.com/spring-projects/spring-amqp/blob/main/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java
163+
164+ private static void setUpSSL (ConnectionFactory factory , RabbitProperties properties )
165+ throws NoSuchAlgorithmException , KeyManagementException , KeyStoreException , UnrecoverableKeyException ,
166+ CertificateException , IOException {
167+ var ssl = properties .getSsl ();
168+ if (ssl != null && ssl .isEnabled ()) {
169+ var keyManagers = configureKeyManagers (ssl );
170+ var trustManagers = configureTrustManagers (ssl );
171+ var secureRandom = SecureRandom .getInstanceStrong ();
172+
173+ if (log .isLoggable (Level .FINE )) {
174+ log .fine ("Initializing SSLContext with KM: " + Arrays .toString (keyManagers ) +
175+ ", TM: " + Arrays .toString (trustManagers ) + ", random: " + secureRandom );
176+ }
177+ var context = createSSLContext (ssl );
178+ context .init (keyManagers , trustManagers , secureRandom );
179+ factory .useSslProtocol (context );
180+
181+ logDetails (trustManagers );
182+
183+ if (ssl .getVerifyHostname ()) {
184+ factory .enableHostnameVerification ();
185+ }
186+ }
187+ }
188+
189+ private static KeyManager [] configureKeyManagers (RabbitPropertiesBase .Ssl ssl ) throws KeyStoreException ,
190+ IOException ,
191+ NoSuchAlgorithmException ,
192+ CertificateException , UnrecoverableKeyException {
193+ KeyManager [] keyManagers = null ;
194+ if (ssl .getKeyStore () != null ) {
195+ var ks = KeyStore .getInstance (ssl .getKeyStoreType ());
196+ char [] keyPassphrase = null ;
197+ if (ssl .getKeyStorePassword () != null ) {
198+ keyPassphrase = ssl .getKeyStorePassword ().toCharArray ();
199+ }
200+ try (var inputStream = new FileInputStream (ssl .getKeyStore ())) {
201+ ks .load (inputStream , keyPassphrase );
202+ }
203+ var kmf = KeyManagerFactory .getInstance (KeyManagerFactory .getDefaultAlgorithm ());
204+ kmf .init (ks , keyPassphrase );
205+ keyManagers = kmf .getKeyManagers ();
206+ }
207+ return keyManagers ;
208+ }
209+
210+ private static TrustManager [] configureTrustManagers (RabbitPropertiesBase .Ssl ssl )
211+ throws KeyStoreException , IOException , NoSuchAlgorithmException , CertificateException {
212+ KeyStore tks = null ;
213+ if (ssl .getTrustStore () != null ) {
214+ tks = KeyStore .getInstance (ssl .getTrustStoreType ());
215+ char [] trustPassphrase = null ;
216+ if (ssl .getTrustStorePassword () != null ) {
217+ trustPassphrase = ssl .getTrustStorePassword ().toCharArray ();
218+ }
219+ try (InputStream inputStream = new FileInputStream (ssl .getTrustStore ())) {
220+ tks .load (inputStream , trustPassphrase );
221+ }
222+ }
223+
224+ var tmf = TrustManagerFactory .getInstance (TrustManagerFactory .getDefaultAlgorithm ());
225+ tmf .init (tks );
226+ return tmf .getTrustManagers ();
227+ }
228+
229+ private static SSLContext createSSLContext (RabbitPropertiesBase .Ssl ssl ) throws NoSuchAlgorithmException {
230+ return SSLContext .getInstance (ssl .getAlgorithm () != null ? ssl .getAlgorithm () : DEFAULT_PROTOCOL );
231+ }
232+
233+ private static void logDetails (TrustManager [] managers ) {
234+ var found = false ;
235+ for (var trustManager : managers ) {
236+ if (trustManager instanceof X509TrustManager ) {
237+ found = true ;
238+ var x509TrustManager = (X509TrustManager ) trustManager ;
239+ log .info ("Loaded " + x509TrustManager .getAcceptedIssuers ().length + " accepted issuers for rabbitmq" );
240+ }
241+ }
242+ if (!found ) {
243+ log .warning ("No X509TrustManager found in the truststore." );
244+ }
245+ }
246+
126247}
0 commit comments