Skip to content

Commit 44ed5c8

Browse files
authored
Merge pull request #36 from juanpmarin/master
fix: set resourceManagementChannelMono from channelPool
2 parents 605af38 + 929557d commit 44ed5c8

File tree

1 file changed

+3
-5
lines changed
  • async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config

1 file changed

+3
-5
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import org.springframework.context.annotation.Configuration;
2020
import org.springframework.context.annotation.Import;
2121
import reactor.core.publisher.Mono;
22-
import reactor.core.scheduler.Scheduler;
23-
import reactor.core.scheduler.Schedulers;
2422
import reactor.rabbitmq.*;
2523

2624
import java.time.Duration;
@@ -59,7 +57,9 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M
5957
channelPoolOptions
6058
);
6159

62-
final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool));
60+
final Sender sender = RabbitFlux.createSender(new SenderOptions()
61+
.channelPool(channelPool)
62+
.resourceManagementChannelMono(channelPool.getChannelMono()));
6363

6464
return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender));
6565
}
@@ -101,13 +101,11 @@ public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSuppli
101101
}
102102

103103
Mono<Connection> createConnectionMono(ConnectionFactory factory, String connectionPrefix, String connectionType) {
104-
final Scheduler senderScheduler = Schedulers.elastic();
105104
return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType))
106105
.doOnError(err ->
107106
log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", err)
108107
)
109108
.retryBackoff(Long.MAX_VALUE, Duration.ofMillis(300), Duration.ofMillis(3000))
110-
.subscribeOn(senderScheduler)
111109
.cache();
112110
}
113111

0 commit comments

Comments
 (0)