Skip to content

Commit be6a0d1

Browse files
authored
[automatic-failover] Integrate health checks with probing policies and retry logic (CAE-1685) (#3541)
* initial port Jedis health monitoring * wip integrate healthchecks * formating * formating * add test case plan * Endpoints without health checks configured should return HEALTHY Changes - add connection.getHealthStatus(RedisUri endpoint) - HEALTHY - returned for Databases without health checks configured - add test * Create MultiDbClient with custom health check strategy supplier Changes - add test to ensure health status changes from custom health checks are reflected * Create MultiDbClient with custom health check strategy supplier Changes - add test to ensure health status changes from custom health checks are reflected * faster await timeout * add test - use different health check strategies for different endpoints * wait for initial healthy database * add test - configure health check interval and timeout * add test - trigger failover when health check detects unhealthy endpoint * add test - should not failover to unhealthy endpoints * add test - Should trigger failover via circuit breaker even when health check returns HEALTHY * reduce await poll interval in HealthCheckIntegrationTest * mark un-implemented tests are disabled * add test - Should transition from UNKNOWN to HEALTHY * add test - Should create health check when adding new database * fix - Should stop health check when removing database * add test - Should stop health check when removing database * add test - HealthCheckLifecycleTests - Should start health checks automatically when connection is created - Should stop health checks when connection is closed * fix HealthCheck not stopped on StatefulRedisMultiDbConnection.close() * remove HealthStatusListenerTests stubs, health check events, not exposed publicly * format * add health checks unit test * clean up - rename health check thread names to lettuce-* - clean up warnings - format - javadocs & autor updated * address failing tests - Update StatefulMultiDbConnectionIntegrationTests to account for added additional test server in MultiDbTestSupport - Junit4 @after replaced with JUnit5 * address failing tests - Update StatefulMultiDbConnectionIntegrationTests to account for added additional test server in MultiDbTestSupport - Junit4 @after replaced with JUnit5 * package private StatusTracker * make healthStatusManager required when creating MultiDbStatefullConnection * remove un-implemented probing integration tests - covered with unit tests * introduce isHealthy() to replace getHealthStatus() * register listeners before adding HealthChecks
1 parent 5f3060f commit be6a0d1

29 files changed

+3181
-44
lines changed

src/main/java/io/lettuce/core/failover/CircuitBreaker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.TimeoutException;
1111
import java.util.concurrent.atomic.AtomicReference;
1212

13+
import io.lettuce.core.internal.LettuceAssert;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

@@ -46,6 +47,8 @@ public class CircuitBreaker implements Closeable {
4647
* Create a circuit breaker instance.
4748
*/
4849
public CircuitBreaker(CircuitBreakerConfig config) {
50+
LettuceAssert.notNull(config, "CircuitBreakerConfig must not be null");
51+
4952
this.config = config;
5053
this.trackedExceptions = new HashSet<>(config.trackedExceptions);
5154
this.stateRef = new AtomicReference<>(

src/main/java/io/lettuce/core/failover/DatabaseConfig.java

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import io.lettuce.core.ClientOptions;
44
import io.lettuce.core.RedisURI;
55
import io.lettuce.core.failover.CircuitBreaker.CircuitBreakerConfig;
6+
import io.lettuce.core.failover.health.HealthCheckStrategySupplier;
67
import io.lettuce.core.internal.LettuceAssert;
78

89
/**
9-
* Configuration for a database in a multi-database client. Holds the Redis URI, weight for load balancing, and client options.
10+
* Configuration for a database in a multi-database client. Holds the Redis URI, weight for load balancing, client options,
11+
* circuit breaker configuration, and optional health check strategy supplier.
1012
*
1113
* @author Ali Takavci
1214
* @since 7.1
@@ -21,44 +23,61 @@ public class DatabaseConfig {
2123

2224
private final CircuitBreakerConfig circuitBreakerConfig;
2325

26+
private final HealthCheckStrategySupplier healthCheckStrategySupplier;
27+
2428
/**
2529
* Create a new database configuration.
2630
*
2731
* @param redisURI the Redis URI, must not be {@code null}
2832
* @param weight the weight for load balancing, must be greater than 0
2933
* @param clientOptions the client options, can be {@code null} to use defaults
3034
* @param circuitBreakerConfig the circuit breaker configuration, can be {@code null} to use defaults
35+
* @param healthCheckStrategySupplier the health check strategy supplier, can be {@code null} to disable health checks
3136
*/
3237
public DatabaseConfig(RedisURI redisURI, float weight, ClientOptions clientOptions,
33-
CircuitBreakerConfig circuitBreakerConfig) {
38+
CircuitBreakerConfig circuitBreakerConfig, HealthCheckStrategySupplier healthCheckStrategySupplier) {
3439
LettuceAssert.notNull(redisURI, "RedisURI must not be null");
3540
LettuceAssert.isTrue(weight > 0, "Weight must be greater than 0");
3641

3742
this.redisURI = redisURI;
3843
this.weight = weight;
3944
this.clientOptions = clientOptions;
4045
this.circuitBreakerConfig = circuitBreakerConfig != null ? circuitBreakerConfig : CircuitBreakerConfig.DEFAULT;
46+
this.healthCheckStrategySupplier = healthCheckStrategySupplier;
47+
}
48+
49+
/**
50+
* Create a new database configuration with default health check strategy supplier.
51+
*
52+
* @param redisURI the Redis URI, must not be {@code null}
53+
* @param weight the weight for load balancing, must be greater than 0
54+
* @param clientOptions the client options, can be {@code null} to use defaults
55+
* @param circuitBreakerConfig the circuit breaker configuration, can be {@code null} to use defaults
56+
*/
57+
public DatabaseConfig(RedisURI redisURI, float weight, ClientOptions clientOptions,
58+
CircuitBreakerConfig circuitBreakerConfig) {
59+
this(redisURI, weight, clientOptions, circuitBreakerConfig, null);
4160
}
4261

4362
/**
44-
* Create a new database configuration with default client options.
63+
* Create a new database configuration with default client options and health check strategy supplier.
4564
*
4665
* @param redisURI the Redis URI, must not be {@code null}
4766
* @param weight the weight for load balancing, must be greater than 0
4867
* @param clientOptions the client options, can be {@code null} to use defaults
4968
*/
5069
public DatabaseConfig(RedisURI redisURI, float weight, ClientOptions clientOptions) {
51-
this(redisURI, weight, clientOptions, null);
70+
this(redisURI, weight, clientOptions, null, null);
5271
}
5372

5473
/**
55-
* Create a new database configuration with default client options.
74+
* Create a new database configuration with default client options and health check strategy supplier.
5675
*
5776
* @param redisURI the Redis URI, must not be {@code null}
5877
* @param weight the weight for load balancing, must be greater than 0
5978
*/
6079
public DatabaseConfig(RedisURI redisURI, float weight) {
61-
this(redisURI, weight, null, null);
80+
this(redisURI, weight, null, null, null);
6281
}
6382

6483
/**
@@ -97,6 +116,15 @@ public CircuitBreakerConfig getCircuitBreakerConfig() {
97116
return circuitBreakerConfig;
98117
}
99118

119+
/**
120+
* Get the health check strategy supplier.
121+
*
122+
* @return the health check strategy supplier, can be {@code null}
123+
*/
124+
public HealthCheckStrategySupplier getHealthCheckStrategySupplier() {
125+
return healthCheckStrategySupplier;
126+
}
127+
100128
@Override
101129
public boolean equals(Object o) {
102130
if (this == o)
@@ -110,20 +138,30 @@ public boolean equals(Object o) {
110138
return false;
111139
if (!redisURI.equals(that.redisURI))
112140
return false;
113-
return clientOptions != null ? clientOptions.equals(that.clientOptions) : that.clientOptions == null;
141+
if (clientOptions != null ? !clientOptions.equals(that.clientOptions) : that.clientOptions != null)
142+
return false;
143+
if (circuitBreakerConfig != null ? !circuitBreakerConfig.equals(that.circuitBreakerConfig)
144+
: that.circuitBreakerConfig != null)
145+
return false;
146+
return healthCheckStrategySupplier != null ? healthCheckStrategySupplier.equals(that.healthCheckStrategySupplier)
147+
: that.healthCheckStrategySupplier == null;
114148
}
115149

116150
@Override
117151
public int hashCode() {
118152
int result = redisURI.hashCode();
119153
result = 31 * result + (weight != +0.0f ? Float.floatToIntBits(weight) : 0);
120154
result = 31 * result + (clientOptions != null ? clientOptions.hashCode() : 0);
155+
result = 31 * result + (circuitBreakerConfig != null ? circuitBreakerConfig.hashCode() : 0);
156+
result = 31 * result + (healthCheckStrategySupplier != null ? healthCheckStrategySupplier.hashCode() : 0);
121157
return result;
122158
}
123159

124160
@Override
125161
public String toString() {
126-
return "DatabaseConfig{" + "redisURI=" + redisURI + ", weight=" + weight + ", clientOptions=" + clientOptions + '}';
162+
return "DatabaseConfig{" + "redisURI=" + redisURI + ", weight=" + weight + ", clientOptions=" + clientOptions
163+
+ ", circuitBreakerConfig=" + circuitBreakerConfig + ", healthCheckStrategySupplier="
164+
+ healthCheckStrategySupplier + '}';
127165
}
128166

129167
}

src/main/java/io/lettuce/core/failover/DatabaseConnectionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.lettuce.core.api.StatefulRedisConnection;
44
import io.lettuce.core.codec.RedisCodec;
5+
import io.lettuce.core.failover.health.HealthStatusManager;
56

67
/**
78
* Factory interface for creating database connections in a multi-database client.
@@ -23,6 +24,6 @@ interface DatabaseConnectionFactory<C extends StatefulRedisConnection<K, V>, K,
2324
* @param codec the codec to use for encoding/decoding
2425
* @return a new RedisDatabase instance
2526
*/
26-
RedisDatabase<C> createDatabase(DatabaseConfig config, RedisCodec<K, V> codec);
27+
RedisDatabase<C> createDatabase(DatabaseConfig config, RedisCodec<K, V> codec, HealthStatusManager healthStatusManager);
2728

2829
}

src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
package io.lettuce.core.failover;
22

33
import java.util.Collection;
4+
import java.util.Comparator;
5+
import java.util.List;
46
import java.util.Map;
57
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.stream.Collectors;
69

710
import io.lettuce.core.Delegating;
811
import io.lettuce.core.RedisChannelWriter;
912
import io.lettuce.core.RedisClient;
13+
import io.lettuce.core.RedisConnectionException;
1014
import io.lettuce.core.RedisURI;
1115
import io.lettuce.core.StatefulRedisConnectionImpl;
1216
import io.lettuce.core.api.StatefulRedisConnection;
1317
import io.lettuce.core.codec.RedisCodec;
1418
import io.lettuce.core.failover.api.StatefulRedisMultiDbConnection;
1519
import io.lettuce.core.failover.api.StatefulRedisMultiDbPubSubConnection;
20+
import io.lettuce.core.failover.health.HealthCheck;
21+
import io.lettuce.core.failover.health.HealthCheckStrategy;
22+
import io.lettuce.core.failover.health.HealthStatus;
23+
import io.lettuce.core.failover.health.HealthStatusManager;
24+
import io.lettuce.core.failover.health.HealthStatusManagerImpl;
1625
import io.lettuce.core.internal.LettuceAssert;
1726
import io.lettuce.core.protocol.DefaultEndpoint;
1827
import io.lettuce.core.pubsub.PubSubEndpoint;
1928
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
2029
import io.lettuce.core.resource.ClientResources;
30+
import io.netty.util.internal.logging.InternalLogger;
31+
import io.netty.util.internal.logging.InternalLoggerFactory;
2132

2233
/**
2334
* Failover-aware client that composes multiple standalone Redis endpoints and returns a single Stateful connection wrapper
@@ -30,6 +41,8 @@
3041
*/
3142
class MultiDbClientImpl extends RedisClient implements MultiDbClient {
3243

44+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiDbClientImpl.class);
45+
3346
private static final RedisURI EMPTY_URI = new RedisURI();
3447

3548
private final Map<RedisURI, DatabaseConfig> databaseConfigs;
@@ -73,6 +86,8 @@ public <K, V> StatefulRedisMultiDbConnection<K, V> connect(RedisCodec<K, V> code
7386
throw new IllegalArgumentException("codec must not be null");
7487
}
7588

89+
HealthStatusManager healthStatusManager = createHealthStatusManager();
90+
7691
Map<RedisURI, RedisDatabase<StatefulRedisConnection<K, V>>> databases = new ConcurrentHashMap<>(databaseConfigs.size());
7792
for (Map.Entry<RedisURI, DatabaseConfig> entry : databaseConfigs.entrySet()) {
7893
RedisURI uri = entry.getKey();
@@ -81,22 +96,41 @@ public <K, V> StatefulRedisMultiDbConnection<K, V> connect(RedisCodec<K, V> code
8196
// HACK: looks like repeating the implementation all around 'RedisClient.connect' is an overkill.
8297
// connections.put(uri, connect(codec, uri));
8398
// Instead we will use it from delegate
84-
RedisDatabase<StatefulRedisConnection<K, V>> database = createRedisDatabase(config, codec);
99+
RedisDatabase<StatefulRedisConnection<K, V>> database = createRedisDatabase(config, codec, healthStatusManager);
85100

86101
databases.put(uri, database);
87102
}
88103

104+
StatusTracker statusTracker = new StatusTracker(healthStatusManager);
105+
// Wait for health checks to complete if configured
106+
waitForInitialHealthyDatabase(statusTracker, databases);
107+
89108
// Provide a connection factory for dynamic database addition
90109
return new StatefulRedisMultiDbConnectionImpl<StatefulRedisConnection<K, V>, K, V>(databases, getResources(), codec,
91-
getOptions().getJsonParser(), this::createRedisDatabase);
110+
getOptions().getJsonParser(), this::createRedisDatabase, healthStatusManager);
111+
}
112+
113+
protected HealthStatusManager createHealthStatusManager() {
114+
return new HealthStatusManagerImpl();
92115
}
93116

94117
private <K, V> RedisDatabase<StatefulRedisConnection<K, V>> createRedisDatabase(DatabaseConfig config,
95-
RedisCodec<K, V> codec) {
118+
RedisCodec<K, V> codec, HealthStatusManager healthStatusManager) {
96119
RedisURI uri = config.getRedisURI();
97120
StatefulRedisConnection<K, V> connection = connect(codec, uri);
98121
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
99-
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);
122+
123+
HealthCheck healthCheck;
124+
if (config.getHealthCheckStrategySupplier() != null) {
125+
HealthCheckStrategy hcStrategy = config.getHealthCheckStrategySupplier().get(config.getRedisURI(),
126+
connection.getOptions());
127+
healthCheck = healthStatusManager.add(uri, hcStrategy);
128+
} else {
129+
healthCheck = null;
130+
}
131+
132+
RedisDatabase<StatefulRedisConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint,
133+
healthCheck);
100134

101135
return database;
102136
}
@@ -116,27 +150,45 @@ public <K, V> StatefulRedisMultiDbPubSubConnection<K, V> connectPubSub(RedisCode
116150
throw new IllegalArgumentException("codec must not be null");
117151
}
118152

153+
HealthStatusManager healthStatusManager = createHealthStatusManager();
154+
119155
Map<RedisURI, RedisDatabase<StatefulRedisPubSubConnection<K, V>>> databases = new ConcurrentHashMap<>(
120156
databaseConfigs.size());
121157
for (Map.Entry<RedisURI, DatabaseConfig> entry : databaseConfigs.entrySet()) {
122158
RedisURI uri = entry.getKey();
123159
DatabaseConfig config = entry.getValue();
124160

125-
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = createRedisDatabaseWithPubSub(config, codec);
161+
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = createRedisDatabaseWithPubSub(config, codec,
162+
healthStatusManager);
126163
databases.put(uri, database);
127164
}
128165

166+
StatusTracker statusTracker = new StatusTracker(healthStatusManager);
167+
// Wait for health checks to complete if configured
168+
waitForInitialHealthyDatabase(statusTracker, databases);
169+
129170
// Provide a connection factory for dynamic database addition
130171
return new StatefulRedisMultiDbPubSubConnectionImpl<K, V>(databases, getResources(), codec,
131-
getOptions().getJsonParser(), this::createRedisDatabaseWithPubSub);
172+
getOptions().getJsonParser(), this::createRedisDatabaseWithPubSub, healthStatusManager);
132173
}
133174

134175
private <K, V> RedisDatabase<StatefulRedisPubSubConnection<K, V>> createRedisDatabaseWithPubSub(DatabaseConfig config,
135-
RedisCodec<K, V> codec) {
176+
RedisCodec<K, V> codec, HealthStatusManager healthStatusManager) {
136177
RedisURI uri = config.getRedisURI();
137178
StatefulRedisPubSubConnection<K, V> connection = connectPubSub(codec, uri);
138179
DatabaseEndpoint databaseEndpoint = extractDatabaseEndpoint(connection);
139-
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint);
180+
181+
HealthCheck healthCheck;
182+
if (config.getHealthCheckStrategySupplier() != null) {
183+
HealthCheckStrategy hcStrategy = config.getHealthCheckStrategySupplier().get(config.getRedisURI(),
184+
connection.getOptions());
185+
healthCheck = healthStatusManager.add(uri, hcStrategy);
186+
} else {
187+
healthCheck = null;
188+
}
189+
190+
RedisDatabase<StatefulRedisPubSubConnection<K, V>> database = new RedisDatabase<>(config, connection, databaseEndpoint,
191+
healthCheck);
140192
return database;
141193
}
142194

@@ -158,4 +210,52 @@ protected <K, V> PubSubEndpoint<K, V> createPubSubEndpoint() {
158210
return new DatabasePubSubEndpointImpl<>(getOptions(), getResources());
159211
}
160212

213+
/**
214+
* Waits for initial health check results and selects the first healthy database based on weight priority. Blocks until at
215+
* least one database becomes healthy or all databases are determined to be unhealthy.
216+
*
217+
* @param statusTracker the status tracker to use for waiting on health check results
218+
* @param databaseMap the map of databases to evaluate
219+
* @throws RedisConnectionException if all databases are unhealthy
220+
*/
221+
private void waitForInitialHealthyDatabase(StatusTracker statusTracker,
222+
Map<RedisURI, ? extends RedisDatabase<?>> databaseMap) {
223+
// Sort databases by weight in descending order
224+
List<? extends Map.Entry<RedisURI, ? extends RedisDatabase<?>>> sortedDatabases = databaseMap.entrySet().stream()
225+
.sorted(Map.Entry.comparingByValue(Comparator.comparing((RedisDatabase<?> db) -> db.getWeight()).reversed()))
226+
.collect(Collectors.toList());
227+
logger.info("Selecting initial database from {} configured databases", sortedDatabases.size());
228+
229+
// Select database in weight order
230+
for (Map.Entry<RedisURI, ? extends RedisDatabase<?>> entry : sortedDatabases) {
231+
RedisURI endpoint = entry.getKey();
232+
RedisDatabase<?> database = entry.getValue();
233+
234+
logger.info("Evaluating database {} (weight: {})", endpoint, database.getWeight());
235+
236+
HealthStatus status;
237+
238+
// Check if health checks are enabled for this database
239+
if (database.getHealthCheck() != null) {
240+
logger.info("Health checks enabled for {}, waiting for result", endpoint);
241+
// Wait for this database's health status to be determined
242+
status = statusTracker.waitForHealthStatus(endpoint);
243+
} else {
244+
// No health check configured - assume healthy
245+
logger.info("No health check configured for database {}, defaulting to HEALTHY", endpoint);
246+
status = HealthStatus.HEALTHY;
247+
}
248+
249+
if (status == HealthStatus.HEALTHY) {
250+
logger.info("Found healthy database: {} (weight: {})", endpoint, database.getWeight());
251+
return;
252+
} else {
253+
logger.info("Database {} is unhealthy, trying next database", endpoint);
254+
}
255+
}
256+
257+
// All databases are unhealthy
258+
throw new RedisConnectionException("All configured databases are unhealthy.");
259+
}
260+
161261
}

0 commit comments

Comments
 (0)