diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java index 61a859ce7..f2969435a 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java @@ -43,6 +43,7 @@ public class HaGatewayConfiguration private List statementPaths = ImmutableList.of(V1_STATEMENT_PATH); private boolean includeClusterHostInResponse; private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration(); + private WriteBufferConfiguration writeBuffer = new WriteBufferConfiguration(); private RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig(); @@ -268,6 +269,16 @@ public void setIncludeClusterHostInResponse(boolean includeClusterHostInResponse this.includeClusterHostInResponse = includeClusterHostInResponse; } + public WriteBufferConfiguration getWriteBuffer() + { + return writeBuffer; + } + + public void setWriteBuffer(WriteBufferConfiguration writeBuffer) + { + this.writeBuffer = writeBuffer; + } + public ProxyResponseConfiguration getProxyResponseConfiguration() { return this.proxyResponseConfiguration; diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/WriteBufferConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/WriteBufferConfiguration.java new file mode 100644 index 000000000..bb912f618 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/WriteBufferConfiguration.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +import io.airlift.units.Duration; + +import java.util.concurrent.TimeUnit; + +public class WriteBufferConfiguration +{ + private boolean enabled; + private int maxCapacity = 10000; + private Duration flushInterval = new Duration(2, TimeUnit.SECONDS); + + public WriteBufferConfiguration() {} + + public WriteBufferConfiguration(boolean enabled, int maxCapacity, Duration flushInterval) + { + this.enabled = enabled; + this.maxCapacity = maxCapacity; + this.flushInterval = flushInterval; + } + + public boolean isEnabled() + { + return enabled; + } + + public void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + public int getMaxCapacity() + { + return maxCapacity; + } + + public void setMaxCapacity(int maxCapacity) + { + this.maxCapacity = maxCapacity; + } + + public Duration getFlushInterval() + { + return flushInterval; + } + + public void setFlushInterval(Duration flushInterval) + { + this.flushInterval = flushInterval; + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java index 2973655b8..f91bab58b 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java @@ -124,7 +124,7 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration) JdbcConnectionManager connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore()); resourceGroupsManager = new HaResourceGroupsManager(connectionManager); gatewayBackendManager = new HaGatewayManager(jdbi, configuration.getRouting()); - queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle")); + queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"), configuration.getWriteBuffer()); } private LbOAuthManager getOAuthManager(HaGatewayConfiguration configuration) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java index b7ea39001..db8ccf0b2 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java @@ -14,13 +14,19 @@ package io.trino.gateway.ha.router; import com.google.common.base.Strings; +import com.google.inject.Inject; +import io.airlift.log.Logger; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.domain.TableData; import io.trino.gateway.ha.domain.request.QueryHistoryRequest; import io.trino.gateway.ha.domain.response.DistributionResponse; import io.trino.gateway.ha.persistence.dao.QueryHistory; import io.trino.gateway.ha.persistence.dao.QueryHistoryDao; +import jakarta.annotation.PreDestroy; +import org.jdbi.v3.core.ConnectionException; import org.jdbi.v3.core.Jdbi; +import java.sql.SQLException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -29,6 +35,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -36,14 +45,35 @@ public class HaQueryHistoryManager implements QueryHistoryManager { private static final int FIRST_PAGE_NO = 1; + private static final Logger log = Logger.get(HaQueryHistoryManager.class); private final QueryHistoryDao dao; private final boolean isOracleBackend; + private final WriteBuffer writeBuffer; + private final ScheduledExecutorService scheduledExecutor; - public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend) + @Inject + public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend, WriteBufferConfiguration writeBufferConfig) { dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class); this.isOracleBackend = isOracleBackend; + if (writeBufferConfig != null && writeBufferConfig.isEnabled()) { + this.writeBuffer = new WriteBuffer<>(writeBufferConfig.getMaxCapacity()); + this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "query-history-write-buffer"); + t.setDaemon(true); + return t; + }); + scheduledExecutor.scheduleWithFixedDelay( + this::flushBufferedWrites, + writeBufferConfig.getFlushInterval().toMillis(), + writeBufferConfig.getFlushInterval().toMillis(), + TimeUnit.MILLISECONDS); + } + else { + this.writeBuffer = null; + this.scheduledExecutor = null; + } } @Override @@ -54,15 +84,27 @@ public void submitQueryDetail(QueryDetail queryDetail) return; } - dao.insertHistory( - queryDetail.getQueryId(), - queryDetail.getQueryText(), - queryDetail.getBackendUrl(), - queryDetail.getUser(), - queryDetail.getSource(), - queryDetail.getCaptureTime(), - queryDetail.getRoutingGroup(), - queryDetail.getExternalUrl()); + try { + dao.insertHistory( + queryDetail.getQueryId(), + queryDetail.getQueryText(), + queryDetail.getBackendUrl(), + queryDetail.getUser(), + queryDetail.getSource(), + queryDetail.getCaptureTime(), + queryDetail.getRoutingGroup(), + queryDetail.getExternalUrl()); + } + catch (RuntimeException e) { + if (isConnectionIssue(e) && writeBuffer != null) { + writeBuffer.buffer(queryDetail); + log.warn(e, "DB unavailable; buffered query_history entry. queryId=%s, bufferSize=%s", + queryDetail.getQueryId(), writeBuffer.size()); + } + else { + throw e; + } + } } @Override @@ -166,4 +208,66 @@ private static int getStart(int pageNo, int pageSize) } return (pageNo - FIRST_PAGE_NO) * pageSize; } + + private static boolean isConnectionIssue(Throwable t) + { + // SQL State codes starting with "08" indicate connection exceptions per ANSI/ISO SQL standard. + // See: https://en.wikipedia.org/wiki/SQLSTATE + // Examples: 08000 (connection exception), 08001 (cannot establish connection), + // 08003 (connection does not exist), 08006 (connection failure), etc. + for (Throwable cur = t; cur != null; cur = cur.getCause()) { + if (cur instanceof ConnectionException) { + return true; + } + if (cur instanceof SQLException sql) { + String sqlState = sql.getSQLState(); + if (sqlState != null && sqlState.startsWith("08")) { + return true; + } + } + } + return false; + } + + private void flushBufferedWrites() + { + if (writeBuffer == null) { + return; + } + int before = writeBuffer.size(); + int flushed = writeBuffer.flushAll(r -> { + dao.insertHistory( + r.getQueryId(), + r.getQueryText(), + r.getBackendUrl(), + r.getUser(), + r.getSource(), + r.getCaptureTime(), + r.getRoutingGroup(), + r.getExternalUrl()); + }); + if (flushed > 0) { + log.info("Flushed %s buffered query_history entries", flushed); + } + else if (before > 0 && writeBuffer.size() == before) { + log.warn("Failed to flush buffered query_history entries; will retry. bufferSize=%s", before); + } + } + + @PreDestroy + public void stop() + { + if (scheduledExecutor == null) { + return; + } + try { + flushBufferedWrites(); + } + catch (RuntimeException t) { + log.warn(t, "Error while flushing buffered query_history entries during shutdown"); + } + finally { + scheduledExecutor.shutdownNow(); + } + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/WriteBuffer.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/WriteBuffer.java new file mode 100644 index 000000000..f902f8fb3 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/WriteBuffer.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Consumer; + +public final class WriteBuffer +{ + private final BlockingDeque deque; + + public WriteBuffer(int maxCapacity) + { + this.deque = new LinkedBlockingDeque<>(maxCapacity); + } + + /** Buffer an item for later flush. Drops the oldest if full. */ + public void buffer(T item) + { + if (!deque.offerLast(item)) { + deque.pollFirst(); + deque.offerLast(item); + } + } + + /** + * Flushes items in insertion order by applying the provided flusher. + * Stops immediately if flush can't be performed on item and re-inserts + * the failed item at the head of the buffer. + * + * @param flusher consumer invoked for each buffered item + * @return number of items successfully flushed + */ + public int flushAll(Consumer flusher) + { + int flushed = 0; + for (T next; (next = deque.pollFirst()) != null; ) { + try { + flusher.accept(next); + flushed++; + } + catch (RuntimeException e) { + deque.offerFirst(next); + break; // stop after first failure + } + } + return flushed; + } + + public int size() + { + return deque.size(); + } +} diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseExternalUrlQueryHistoryTest.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseExternalUrlQueryHistoryTest.java index 1d34a40e3..314ec032f 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseExternalUrlQueryHistoryTest.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseExternalUrlQueryHistoryTest.java @@ -14,6 +14,7 @@ package io.trino.gateway.ha.router; import io.trino.gateway.ha.config.DataStoreConfiguration; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.persistence.FlywayMigration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.AfterAll; @@ -49,7 +50,8 @@ protected BaseExternalUrlQueryHistoryTest(JdbcDatabaseContainer container) true); FlywayMigration.migrate(config); JdbcConnectionManager jdbcConnectionManager = createTestingJdbcConnectionManager(container, config); - queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle")); + WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration(); + queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfiguration); } @AfterAll diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseTestQueryHistoryManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseTestQueryHistoryManager.java index 3f5de4bbc..2c8c9f7eb 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseTestQueryHistoryManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseTestQueryHistoryManager.java @@ -13,10 +13,15 @@ */ package io.trino.gateway.ha.router; +import io.airlift.units.Duration; import io.trino.gateway.ha.config.DataStoreConfiguration; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.domain.response.DistributionResponse; import io.trino.gateway.ha.persistence.FlywayMigration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; +import io.trino.gateway.ha.persistence.dao.QueryHistoryDao; +import org.jdbi.v3.core.ConnectionException; +import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -24,16 +29,34 @@ import org.junit.jupiter.api.TestInstance.Lifecycle; import org.testcontainers.containers.JdbcDatabaseContainer; +import java.sql.SQLException; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static io.trino.gateway.ha.TestingJdbcConnectionManager.createTestingJdbcConnectionManager; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @TestInstance(Lifecycle.PER_CLASS) abstract class BaseTestQueryHistoryManager { protected final JdbcDatabaseContainer container = startContainer(); + private static final String BACKEND_URL = "http://localhost:9999"; + private static final String SQL_WORKBENCH = "sqlWorkbench"; + private static final String USER = "test@ea.com"; + private static final String SELECT_1 = "select 1"; + private static final String OTHER_USER = "other-user"; + private static final String ROUTING_GROUP = "routing-group"; + private static final String EXTERNAL_URL = "https://example.com"; private QueryHistoryManager queryHistoryManager; protected abstract JdbcDatabaseContainer startContainer(); @@ -41,6 +64,7 @@ abstract class BaseTestQueryHistoryManager @BeforeAll void setUp() { + WriteBufferConfiguration writeBufferConfig = new WriteBufferConfiguration(); DataStoreConfiguration config = new DataStoreConfiguration( container.getJdbcUrl(), container.getUsername(), @@ -50,7 +74,7 @@ void setUp() true); FlywayMigration.migrate(config); JdbcConnectionManager jdbcConnectionManager = createTestingJdbcConnectionManager(container, config); - queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle")); + queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfig); } @AfterAll @@ -66,10 +90,10 @@ void testSubmitAndFetchQueryHistory() queryHistoryManager.fetchQueryHistory(Optional.empty()); assertThat(queryDetails).isEmpty(); QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail(); - queryDetail.setBackendUrl("http://localhost:9999"); - queryDetail.setSource("sqlWorkbench"); - queryDetail.setUser("test@ea.com"); - queryDetail.setQueryText("select 1"); + queryDetail.setBackendUrl(BACKEND_URL); + queryDetail.setSource(SQL_WORKBENCH); + queryDetail.setUser(USER); + queryDetail.setQueryText(SELECT_1); for (int i = 0; i < 2; i++) { queryDetail.setQueryId(String.valueOf(System.currentTimeMillis())); queryDetail.setCaptureTime(System.currentTimeMillis()); @@ -77,7 +101,7 @@ void testSubmitAndFetchQueryHistory() } //Add a query from other user - queryDetail.setUser("other-user"); + queryDetail.setUser(OTHER_USER); queryDetail.setQueryId(String.valueOf(System.currentTimeMillis())); queryDetail.setCaptureTime(System.currentTimeMillis()); queryHistoryManager.submitQueryDetail(queryDetail); @@ -87,7 +111,7 @@ void testSubmitAndFetchQueryHistory() // All queries when user is empty assertThat(queryDetails).hasSize(3); - queryDetails = queryHistoryManager.fetchQueryHistory(Optional.of("other-user")); + queryDetails = queryHistoryManager.fetchQueryHistory(Optional.of(OTHER_USER)); // Only 1 query when user is 'other-user' assertThat(queryDetails).hasSize(1); } @@ -100,13 +124,7 @@ void testFindDistribution() // Should return empty list assertThat(resList).isEmpty(); - QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail(); - queryDetail.setBackendUrl("http://localhost:9999"); - queryDetail.setSource("sqlWorkbench"); - queryDetail.setUser("test@ea.com"); - queryDetail.setQueryText("select 1"); - queryDetail.setQueryId(String.valueOf(System.currentTimeMillis())); - queryDetail.setCaptureTime(System.currentTimeMillis()); + QueryHistoryManager.QueryDetail queryDetail = createQueryDetail(); queryHistoryManager.submitQueryDetail(queryDetail); // Should return 1 entry @@ -129,4 +147,78 @@ void testTimestampParsing() long parsedLongTimestamp2 = (long) Float.parseFloat(mysqlTimestamp); assertThat(parsedLongTimestamp2).isEqualTo(result); } + + private static void stubInsertThenSucceed(QueryHistoryDao delegate, RuntimeException first) + { + doThrow(first) + .doNothing() + .when(delegate) + .insertHistory(anyString(), anyString(), anyString(), anyString(), anyString(), anyLong(), anyString(), anyString()); + } + + @Test + void buffersOnConnectionExceptionAndFlushesOnStop() + { + QueryHistoryDao delegate = mock(QueryHistoryDao.class); + stubInsertThenSucceed(delegate, new ConnectionException(new SQLException("network error", "08006"))); + + Jdbi mockJdbi = mock(Jdbi.class); + when(mockJdbi.onDemand(QueryHistoryDao.class)).thenReturn(delegate); + + WriteBufferConfiguration writeBufferConfig = new WriteBufferConfiguration(true, 10000, new Duration(2, TimeUnit.SECONDS)); + HaQueryHistoryManager manager = new HaQueryHistoryManager(mockJdbi, container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfig); + // First call buffers (no throw), then stop() flushes once and succeeds + QueryHistoryManager.QueryDetail queryDetail = createQueryDetail(); + try { + manager.submitQueryDetail(queryDetail); + manager.stop(); + verify(delegate, times(2)).insertHistory(anyString(), eq(SELECT_1), eq(BACKEND_URL), eq(USER), eq(SQL_WORKBENCH), anyLong(), anyString(), anyString()); + } + finally { + // idempotent + manager.stop(); + } + } + + @Test + void rethrowsNonConnectionException() + { + QueryHistoryDao delegate = mock(QueryHistoryDao.class); + doThrow(new IllegalStateException("bad input")) + .when(delegate) + .insertHistory(anyString(), anyString(), anyString(), anyString(), anyString(), anyLong(), anyString(), anyString()); + + Jdbi mockJdbi = mock(Jdbi.class); + when(mockJdbi.onDemand(QueryHistoryDao.class)).thenReturn(delegate); + WriteBufferConfiguration writeBufferConfig = new WriteBufferConfiguration(true, 10000, new Duration(2, TimeUnit.SECONDS)); + HaQueryHistoryManager manager = new HaQueryHistoryManager(mockJdbi, container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfig); + + QueryHistoryManager.QueryDetail queryDetail = createQueryDetail(); + try { + assertThatThrownBy(() -> manager.submitQueryDetail(queryDetail)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("bad input"); + + // Should not have buffered, so no retry on stop + manager.stop(); + verify(delegate, times(1)).insertHistory(anyString(), eq(SELECT_1), eq(BACKEND_URL), eq(USER), eq(SQL_WORKBENCH), anyLong(), anyString(), anyString()); + } + finally { + manager.stop(); + } + } + + private static QueryHistoryManager.QueryDetail createQueryDetail() + { + QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail(); + queryDetail.setQueryId(String.valueOf(System.currentTimeMillis())); + queryDetail.setQueryText(SELECT_1); + queryDetail.setBackendUrl(BACKEND_URL); + queryDetail.setUser(USER); + queryDetail.setSource(SQL_WORKBENCH); + queryDetail.setCaptureTime(System.currentTimeMillis()); + queryDetail.setRoutingGroup(ROUTING_GROUP); + queryDetail.setExternalUrl(EXTERNAL_URL); + return queryDetail; + } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java index 04c3f8b6e..ed0f489c6 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java @@ -18,6 +18,7 @@ import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -175,7 +176,8 @@ public void init() { JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(); backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); - historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false); + WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration(); + historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false, writeBufferConfiguration); queryCountBasedRouter = new QueryCountBasedRouter(backendManager, historyManager, routingConfiguration); populateData(); queryCountBasedRouter.updateClusterStats(clusters); diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java index 8d2b05135..6e5e64a98 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java @@ -14,6 +14,7 @@ package io.trino.gateway.ha.router; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.Test; @@ -31,7 +32,8 @@ public TestRoutingManagerNotFound() routingConfiguration.setDefaultRoutingGroup("default"); GatewayBackendManager backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); - QueryHistoryManager historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false); + WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration(); + QueryHistoryManager historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false, writeBufferConfiguration); this.routingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration); } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java index 230689c4a..b78a70285 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java @@ -16,6 +16,7 @@ import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.WriteBufferConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -37,8 +38,9 @@ void setUp() { JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(); RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + WriteBufferConfiguration writeBufferConfig = new WriteBufferConfiguration(); backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); - historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false); + historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), false, writeBufferConfig); haRoutingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration); } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestWriteBuffer.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestWriteBuffer.java new file mode 100644 index 000000000..0f16de16a --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestWriteBuffer.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class TestWriteBuffer +{ + @Test + void testBufferDropsOldestWhenFull() + { + WriteBuffer buffer = new WriteBuffer<>(2); + buffer.buffer(1); + buffer.buffer(2); + // At capacity now. Next add should drop 1 (oldest) + buffer.buffer(3); + assertThat(buffer.size()).isEqualTo(2); + + List flushed = new ArrayList<>(); + int flushedCount = buffer.flushAll(flushed::add); + assertThat(flushedCount).isEqualTo(2); + assertThat(flushed).containsExactly(2, 3); + } + + @Test + void testFlushAllRequeue() + { + WriteBuffer buffer = new WriteBuffer<>(10); + buffer.buffer(1); + buffer.buffer(2); + buffer.buffer(3); + + List processed = new ArrayList<>(); + int flushedCount = buffer.flushAll(i -> { + if (i == 2) { + throw new RuntimeException("fail on 2"); + } + processed.add(i); + }); + + // Only '1' should be processed, '2' fails and is requeued at head, '3' not attempted + assertThat(flushedCount).isEqualTo(1); + assertThat(processed).containsExactly(1); + assertThat(buffer.size()).isEqualTo(2); + + // Next flush with a no-op flusher should process [2, 3] + List retried = new ArrayList<>(); + int retriedCount = buffer.flushAll(retried::add); + assertThat(retriedCount).isEqualTo(2); + assertThat(retried).containsExactly(2, 3); + assertThat(buffer.size()).isZero(); + } +}