From f10cd313f4aad4ffa0e94a3f7cbe50ef41646be4 Mon Sep 17 00:00:00 2001 From: Ganesan Arunachalams Date: Tue, 4 Aug 2020 15:44:01 +0530 Subject: [PATCH] Enable read from replica It's configurable in case if we have replica then we could use to read from replica, otherwise by default it will use the master configuration from read postgres client too --- .../ConsentManagerConfiguration.java | 26 ++++++--- .../consent/ConsentConfiguration.java | 7 ++- .../consent/ConsentRequestRepository.java | 57 +++++++++---------- .../dataflow/DataFlowConfiguration.java | 2 +- .../link/LinkConfiguration.java | 11 +++- .../consentmanager/properties/DbOptions.java | 19 +++++++ .../user/OtpAttemptRepository.java | 4 -- .../user/OtpAttemptService.java | 5 +- .../user/UserConfiguration.java | 15 ++++- consent/src/main/resources/application.yml | 9 ++- .../in/projecteka/consentmanager/Replica.java | 14 +++++ 11 files changed, 116 insertions(+), 53 deletions(-) create mode 100644 src/main/java/in/projecteka/consentmanager/Replica.java diff --git a/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java index 18d0085e9..5b4534a39 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java @@ -163,12 +163,12 @@ public ClientRegistryClient clientRegistryClient(@Qualifier("customBuilder") Web } @Bean - public LockedUsersRepository lockedUsersRepository(DbOptions dbOptions) { - return new LockedUsersRepository(pgPool(dbOptions)); + public LockedUsersRepository lockedUsersRepository(@Qualifier("readWriteClient") PgPool pgPool) { + return new LockedUsersRepository(pgPool); } - @Bean - public PgPool pgPool(DbOptions dbOptions) { + @Bean("readWriteClient") + public PgPool readWriteClient(DbOptions dbOptions) { PgConnectOptions connectOptions = new PgConnectOptions() .setPort(dbOptions.getPort()) .setHost(dbOptions.getHost()) @@ -176,8 +176,21 @@ public PgPool pgPool(DbOptions dbOptions) { .setUser(dbOptions.getUser()) .setPassword(dbOptions.getPassword()); - PoolOptions poolOptions = new PoolOptions() - .setMaxSize(dbOptions.getPoolSize()); + PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getPoolSize()); + + return PgPool.pool(connectOptions, poolOptions); + } + + @Bean("readOnlyClient") + public PgPool readOnlyClient(DbOptions dbOptions) { + PgConnectOptions connectOptions = new PgConnectOptions() + .setPort(dbOptions.getReplica().getPort()) + .setHost(dbOptions.getReplica().getHost()) + .setDatabase(dbOptions.getSchema()) + .setUser(dbOptions.getReplica().getUser()) + .setPassword(dbOptions.getReplica().getPassword()); + + PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getReplica().getPoolSize()); return PgPool.pool(connectOptions, poolOptions); } @@ -344,7 +357,6 @@ ReactiveRedisOperations stringReactiveRedisOperations( return new ReactiveRedisTemplate<>(factory, context); } - @ConditionalOnProperty(value = "consentmanager.cacheMethod", havingValue = "guava", matchIfMissing = true) @Bean({"cacheForReplayAttack"}) public CacheAdapter stringLocalDateTimeCacheAdapter() { diff --git a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java index 4bfa6bf72..56b192250 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java @@ -37,12 +37,13 @@ public class ConsentConfiguration { @Bean - public ConsentRequestRepository consentRequestRepository(PgPool pgPool) { - return new ConsentRequestRepository(pgPool); + public ConsentRequestRepository consentRequestRepository(@Qualifier("readWriteClient") PgPool readWriteClient, + @Qualifier("readOnlyClient") PgPool readOnlyClient) { + return new ConsentRequestRepository(readWriteClient, readOnlyClient); } @Bean - public ConsentArtefactRepository consentArtefactRepository(PgPool pgPool) { + public ConsentArtefactRepository consentArtefactRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new ConsentArtefactRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java index e9296c954..1547a3bd4 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java +++ b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java @@ -11,7 +11,7 @@ import io.vertx.pgclient.PgPool; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.Tuple; +import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -27,7 +27,10 @@ import static in.projecteka.consentmanager.consent.model.ConsentStatus.GRANTED; import static in.projecteka.library.common.Serializer.from; import static in.projecteka.library.common.Serializer.to; +import static io.vertx.sqlclient.Tuple.of; +import static reactor.core.publisher.Mono.create; +@AllArgsConstructor public class ConsentRequestRepository { private static final Logger logger = LoggerFactory.getLogger(ConsentRequestRepository.class); private static final String SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS; @@ -35,16 +38,17 @@ public class ConsentRequestRepository { private static final String SELECT_CONSENT_REQUESTS_BY_STATUS; private static final String SELECT_CONSENT_DETAILS_FOR_PATIENT; private static final String SELECT_CONSENT_REQUEST_COUNT = "SELECT COUNT(*) FROM consent_request " + - "WHERE LOWER(patient_id) = $1 and status != $3 and (status=$2 OR $2 IS NULL)"; + "WHERE LOWER(patient_id) = $1 and status != $3 and (status= $2 OR $2 IS NULL)"; private static final String INSERT_CONSENT_REQUEST_QUERY = "INSERT INTO consent_request " + "(request_id, patient_id, status, details) VALUES ($1, $2, $3, $4)"; - private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status=$1, " + - "date_modified=$2 WHERE request_id=$3"; + private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status = $1, " + + "date_modified= $2 WHERE request_id= $3"; private static final String FAILED_TO_SAVE_CONSENT_REQUEST = "Failed to save consent request"; private static final String UNKNOWN_ERROR_OCCURRED = "Unknown error occurred"; private static final String FAILED_TO_GET_CONSENT_REQUESTS_BY_STATUS = "Failed to get consent requests by status"; - private final PgPool dbClient; + private final PgPool readWriteClient; + private final PgPool readOnlyClient; static { String s = "SELECT request_id, status, details, date_created, date_modified FROM consent_request " + @@ -57,14 +61,10 @@ public class ConsentRequestRepository { SELECT_CONSENT_REQUESTS_BY_STATUS = s + "status=$1"; } - public ConsentRequestRepository(PgPool dbClient) { - this.dbClient = dbClient; - } - public Mono insert(RequestedDetail requestedDetail, UUID requestId) { - return Mono.create(monoSink -> - dbClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY) - .execute(Tuple.of(requestId.toString(), + return create(monoSink -> + readWriteClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY) + .execute(of(requestId.toString(), requestedDetail.getPatient().getId(), ConsentStatus.REQUESTED.name(), new JsonObject(from(requestedDetail))), @@ -82,22 +82,21 @@ public Mono>> requestsForPatient(String pa int limit, int offset, String status) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT) - .execute(Tuple.of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()), + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT) + .execute(of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()), handler -> { List requestList = getConsentRequestDetails(handler); - dbClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT) - .execute(Tuple.of(patientId, status, GRANTED.toString()), counter -> { + readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT) + .execute(of(patientId, status, GRANTED.toString()), + counter -> { if (handler.failed()) { logger.error(handler.cause().getMessage(), handler.cause()); monoSink.error(new DbOperationError()); return; } - Integer count = counter.result().iterator() - .next().getInteger("count"); + var count = counter.result().iterator().next().getInteger("count"); monoSink.success(new ListResult<>(requestList, count)); - } - ); + }); })); } @@ -115,15 +114,13 @@ private List getConsentRequestDetails(AsyncResult requestOf(String requestId, String status, String patientId) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS) - .execute(Tuple.of(requestId, status, patientId), - consentRequestHandler(monoSink))); + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS) + .execute(of(requestId, status, patientId), consentRequestHandler(monoSink))); } public Mono requestOf(String requestId) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID) - .execute(Tuple.of(requestId), - consentRequestHandler(monoSink))); + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID) + .execute(of(requestId), consentRequestHandler(monoSink))); } private Handler>> consentRequestHandler(MonoSink monoSink) { @@ -162,8 +159,8 @@ private ConsentRequestDetail mapToConsentRequestDetail(Row result) { } public Mono updateStatus(String id, ConsentStatus status) { - return Mono.create(monoSink -> dbClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY) - .execute(Tuple.of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id), + return create(monoSink -> readWriteClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY) + .execute(of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id), updateHandler -> { if (updateHandler.failed()) { monoSink.error(new Exception("Failed to update status")); @@ -178,8 +175,8 @@ private ConsentStatus getConsentStatus(String status) { } public Flux getConsentsByStatus(ConsentStatus status) { - return Flux.create(fluxSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS) - .execute(Tuple.of(status.toString()), + return Flux.create(fluxSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS) + .execute(of(status.toString()), handler -> { if (handler.failed()) { logger.error(handler.cause().getMessage(), handler.cause()); diff --git a/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java index 098b50420..2d966a79f 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java @@ -65,7 +65,7 @@ public DataFlowRequester dataRequest(@Qualifier("customBuilder") WebClient.Build } @Bean - public DataFlowRequestRepository dataRequestRepository(PgPool pgPool) { + public DataFlowRequestRepository dataRequestRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new DataFlowRequestRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java index 98041f423..ba03e8ec9 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java @@ -6,6 +6,13 @@ import in.projecteka.consentmanager.clients.DiscoveryServiceClient; import in.projecteka.consentmanager.clients.LinkServiceClient; import in.projecteka.consentmanager.clients.UserServiceClient; +import in.projecteka.consentmanager.clients.properties.GatewayServiceProperties; +import in.projecteka.consentmanager.clients.properties.LinkServiceProperties; +import in.projecteka.consentmanager.common.CentralRegistry; +import in.projecteka.consentmanager.common.ServiceAuthentication; +import in.projecteka.consentmanager.common.cache.CacheAdapter; +import in.projecteka.consentmanager.common.cache.LoadingCacheAdapter; +import in.projecteka.consentmanager.common.cache.RedisCacheAdapter; import in.projecteka.consentmanager.link.discovery.Discovery; import in.projecteka.consentmanager.link.discovery.DiscoveryRepository; import in.projecteka.consentmanager.link.hiplink.UserAuthInitAction; @@ -36,12 +43,12 @@ public class LinkConfiguration { @Bean - public DiscoveryRepository discoveryRepository(PgPool pgPool) { + public DiscoveryRepository discoveryRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new DiscoveryRepository(pgPool); } @Bean - public LinkRepository linkRepository(PgPool pgPool) { + public LinkRepository linkRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new LinkRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java b/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java index cf1950327..806dc95b3 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java +++ b/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java @@ -16,8 +16,27 @@ public class DbOptions { private final String user; private final String password; private final int poolSize; + private final boolean replicaReadEnabled; + private final Replica replica; + + public Replica getReplica() { + return replica != null && replicaReadEnabled + ? replica + : new Replica(host, port, user, password, getReadPoolSize()); + } + + private int getReadPoolSize() { + return poolSize / 2 + poolSize % 2; + } + + public int getPoolSize() { + return replica != null && replicaReadEnabled + ? poolSize + : poolSize / 2; + } public in.projecteka.library.common.DbOptions toHeartBeat() { return new in.projecteka.library.common.DbOptions(host, port); } } + diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java index 982977bea..99f34222d 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java @@ -8,20 +8,16 @@ import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Repository; import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; - -@Repository @AllArgsConstructor public class OtpAttemptRepository { private final static Logger logger = LoggerFactory.getLogger(OtpAttemptRepository.class); - private static final String INSERT_OTP_ATTEMPT = "INSERT INTO " + "otp_attempt (session_id ,cm_id, identifier_type, identifier_value, status, action) VALUES ($1,$2,$3,$4,$5,$6)"; diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java index fe5bec0bc..92610d4d2 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java @@ -3,7 +3,6 @@ import in.projecteka.consentmanager.user.model.OtpAttempt; import in.projecteka.library.clients.model.ClientError; import lombok.AllArgsConstructor; -import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.LocalDateTime; @@ -55,8 +54,8 @@ public Mono removeMatchingAttempts(OtpAttempt otpAttempt) { public Mono handleInvalidOTPError(ClientError error, OtpAttempt attempt) { Mono invalidOTPError = Mono.error(error); - if (error.getErrorCode().equals(OTP_INVALID)) { - return saveOTPAttempt(attempt.toBuilder().attemptStatus(FAILURE).build()).then(invalidOTPError); + if (error.getErrorCode().equals(ErrorCode.OTP_INVALID)) { + return saveOTPAttempt(attempt.toBuilder().attemptStatus(OtpAttempt.AttemptStatus.FAILURE).build()).then(invalidOTPError); } return invalidOTPError; } diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java index 40bff4e59..88a5a2dc6 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java @@ -54,7 +54,7 @@ public UserService userService(UserRepository userRepository, } @Bean - public UserRepository userRepository(PgPool pgPool) { + public UserRepository userRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new UserRepository(pgPool); } @@ -145,7 +145,7 @@ public SessionService sessionService( } @Bean - public TransactionPinRepository transactionPinRepository(PgPool dbClient) { + public TransactionPinRepository transactionPinRepository(@Qualifier("readWriteClient") PgPool dbClient) { return new TransactionPinRepository(dbClient); } @@ -173,4 +173,15 @@ public TransactionPinService transactionPinService(TransactionPinRepository tran public ProfileService profileService(UserService userService, TransactionPinService transactionPinService) { return new ProfileService(userService, transactionPinService); } + + @Bean + public OtpAttemptRepository otpAttemptRepository(@Qualifier("readWriteClient") PgPool readWriteClient) { + return new OtpAttemptRepository(readWriteClient); + } + + @Bean + public OtpAttemptService otpAttemptService(OtpAttemptRepository otpAttemptRepository, + UserServiceProperties userServiceProperties) { + return new OtpAttemptService(otpAttemptRepository, userServiceProperties); + } } diff --git a/consent/src/main/resources/application.yml b/consent/src/main/resources/application.yml index 8dddb6bbf..45feeb9e9 100644 --- a/consent/src/main/resources/application.yml +++ b/consent/src/main/resources/application.yml @@ -62,7 +62,14 @@ consentmanager: schema: ${CONSENT_MANAGER_DB_NAME} user: ${POSTGRES_USER} password: ${POSTGRES_PASSWORD} - poolSize: 5 + poolSize: ${MASTER_POOL_SIZE:5} + replica-read-enabled: {REPLICA_READ_ENABLED:false} + replica: + host: ${POSTGRES_HOST} + port: ${POSTGRES_PORT:5432} + user: ${POSTGRES_USER} + password: ${POSTGRES_PASSWORD} + poolSize: ${REPLICA_POOL_SIZE:3} dataflow: consentmanager: url: ${CONSENT_MANAGER_URL} diff --git a/src/main/java/in/projecteka/consentmanager/Replica.java b/src/main/java/in/projecteka/consentmanager/Replica.java new file mode 100644 index 000000000..90d7724e6 --- /dev/null +++ b/src/main/java/in/projecteka/consentmanager/Replica.java @@ -0,0 +1,14 @@ +package in.projecteka.consentmanager; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public class Replica { + private final String host; + private final int port; + private final String user; + private final String password; + private final int poolSize; +}