Skip to content

Commit 8c52dd0

Browse files
committed
Introduce query history write buffer for DB resiliency
1 parent 5a36940 commit 8c52dd0

File tree

11 files changed

+443
-29
lines changed

11 files changed

+443
-29
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class HaGatewayConfiguration
4343
private List<String> statementPaths = ImmutableList.of(V1_STATEMENT_PATH);
4444
private boolean includeClusterHostInResponse;
4545
private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration();
46+
private WriteBufferConfiguration writeBuffer = new WriteBufferConfiguration();
4647

4748
private RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig();
4849

@@ -268,6 +269,16 @@ public void setIncludeClusterHostInResponse(boolean includeClusterHostInResponse
268269
this.includeClusterHostInResponse = includeClusterHostInResponse;
269270
}
270271

272+
public WriteBufferConfiguration getWriteBuffer()
273+
{
274+
return writeBuffer;
275+
}
276+
277+
public void setWriteBuffer(WriteBufferConfiguration writeBuffer)
278+
{
279+
this.writeBuffer = writeBuffer;
280+
}
281+
271282
public ProxyResponseConfiguration getProxyResponseConfiguration()
272283
{
273284
return this.proxyResponseConfiguration;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.config;
15+
16+
import io.airlift.units.Duration;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
public class WriteBufferConfiguration
21+
{
22+
private boolean enabled;
23+
private int maxCapacity = 10000;
24+
private Duration flushInterval = new Duration(2, TimeUnit.SECONDS);
25+
26+
public WriteBufferConfiguration() {}
27+
28+
public WriteBufferConfiguration(boolean enabled, int maxCapacity, Duration flushInterval)
29+
{
30+
this.enabled = enabled;
31+
this.maxCapacity = maxCapacity;
32+
this.flushInterval = flushInterval;
33+
}
34+
35+
public boolean isEnabled()
36+
{
37+
return enabled;
38+
}
39+
40+
public void setEnabled(boolean enabled)
41+
{
42+
this.enabled = enabled;
43+
}
44+
45+
public int getMaxCapacity()
46+
{
47+
return maxCapacity;
48+
}
49+
50+
public void setMaxCapacity(int maxCapacity)
51+
{
52+
this.maxCapacity = maxCapacity;
53+
}
54+
55+
public Duration getFlushInterval()
56+
{
57+
return flushInterval;
58+
}
59+
60+
public void setFlushInterval(Duration flushInterval)
61+
{
62+
this.flushInterval = flushInterval;
63+
}
64+
}

gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration)
124124
JdbcConnectionManager connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore());
125125
resourceGroupsManager = new HaResourceGroupsManager(connectionManager);
126126
gatewayBackendManager = new HaGatewayManager(jdbi, configuration.getRouting());
127-
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"));
127+
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"), configuration.getWriteBuffer());
128128
}
129129

130130
private LbOAuthManager getOAuthManager(HaGatewayConfiguration configuration)

gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java

Lines changed: 114 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414
package io.trino.gateway.ha.router;
1515

1616
import com.google.common.base.Strings;
17+
import com.google.inject.Inject;
18+
import io.airlift.log.Logger;
19+
import io.trino.gateway.ha.config.WriteBufferConfiguration;
1720
import io.trino.gateway.ha.domain.TableData;
1821
import io.trino.gateway.ha.domain.request.QueryHistoryRequest;
1922
import io.trino.gateway.ha.domain.response.DistributionResponse;
2023
import io.trino.gateway.ha.persistence.dao.QueryHistory;
2124
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
25+
import jakarta.annotation.PreDestroy;
26+
import org.jdbi.v3.core.ConnectionException;
2227
import org.jdbi.v3.core.Jdbi;
2328

29+
import java.sql.SQLException;
2430
import java.time.Instant;
2531
import java.time.LocalDateTime;
2632
import java.time.ZoneId;
@@ -29,21 +35,45 @@
2935
import java.util.List;
3036
import java.util.Map;
3137
import java.util.Optional;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.ScheduledExecutorService;
40+
import java.util.concurrent.TimeUnit;
3241

3342
import static java.util.Objects.requireNonNull;
3443

3544
public class HaQueryHistoryManager
3645
implements QueryHistoryManager
3746
{
3847
private static final int FIRST_PAGE_NO = 1;
48+
private static final Logger log = Logger.get(HaQueryHistoryManager.class);
3949

4050
private final QueryHistoryDao dao;
4151
private final boolean isOracleBackend;
52+
private final WriteBuffer<QueryDetail> writeBuffer;
53+
private final ScheduledExecutorService scheduledExecutor;
4254

43-
public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend)
55+
@Inject
56+
public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend, WriteBufferConfiguration writeBufferConfig)
4457
{
4558
dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class);
4659
this.isOracleBackend = isOracleBackend;
60+
if (writeBufferConfig != null && writeBufferConfig.isEnabled()) {
61+
this.writeBuffer = new WriteBuffer<>(writeBufferConfig.getMaxCapacity());
62+
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
63+
Thread t = new Thread(r, "query-history-write-buffer");
64+
t.setDaemon(true);
65+
return t;
66+
});
67+
scheduledExecutor.scheduleWithFixedDelay(
68+
this::flushBufferedWrites,
69+
writeBufferConfig.getFlushInterval().toMillis(),
70+
writeBufferConfig.getFlushInterval().toMillis(),
71+
TimeUnit.MILLISECONDS);
72+
}
73+
else {
74+
this.writeBuffer = null;
75+
this.scheduledExecutor = null;
76+
}
4777
}
4878

4979
@Override
@@ -54,15 +84,27 @@ public void submitQueryDetail(QueryDetail queryDetail)
5484
return;
5585
}
5686

57-
dao.insertHistory(
58-
queryDetail.getQueryId(),
59-
queryDetail.getQueryText(),
60-
queryDetail.getBackendUrl(),
61-
queryDetail.getUser(),
62-
queryDetail.getSource(),
63-
queryDetail.getCaptureTime(),
64-
queryDetail.getRoutingGroup(),
65-
queryDetail.getExternalUrl());
87+
try {
88+
dao.insertHistory(
89+
queryDetail.getQueryId(),
90+
queryDetail.getQueryText(),
91+
queryDetail.getBackendUrl(),
92+
queryDetail.getUser(),
93+
queryDetail.getSource(),
94+
queryDetail.getCaptureTime(),
95+
queryDetail.getRoutingGroup(),
96+
queryDetail.getExternalUrl());
97+
}
98+
catch (RuntimeException e) {
99+
if (isConnectionIssue(e) && writeBuffer != null) {
100+
writeBuffer.buffer(queryDetail);
101+
log.warn(e, "DB unavailable; buffered query_history entry. queryId=%s, bufferSize=%s",
102+
queryDetail.getQueryId(), writeBuffer.size());
103+
}
104+
else {
105+
throw e;
106+
}
107+
}
66108
}
67109

68110
@Override
@@ -166,4 +208,66 @@ private static int getStart(int pageNo, int pageSize)
166208
}
167209
return (pageNo - FIRST_PAGE_NO) * pageSize;
168210
}
211+
212+
private static boolean isConnectionIssue(Throwable t)
213+
{
214+
// SQL State codes starting with "08" indicate connection exceptions per ANSI/ISO SQL standard.
215+
// See: https://en.wikipedia.org/wiki/SQLSTATE
216+
// Examples: 08000 (connection exception), 08001 (cannot establish connection),
217+
// 08003 (connection does not exist), 08006 (connection failure), etc.
218+
for (Throwable cur = t; cur != null; cur = cur.getCause()) {
219+
if (cur instanceof ConnectionException) {
220+
return true;
221+
}
222+
if (cur instanceof SQLException sql) {
223+
String sqlState = sql.getSQLState();
224+
if (sqlState != null && sqlState.startsWith("08")) {
225+
return true;
226+
}
227+
}
228+
}
229+
return false;
230+
}
231+
232+
private void flushBufferedWrites()
233+
{
234+
if (writeBuffer == null) {
235+
return;
236+
}
237+
int before = writeBuffer.size();
238+
int flushed = writeBuffer.flushAll(r -> {
239+
dao.insertHistory(
240+
r.getQueryId(),
241+
r.getQueryText(),
242+
r.getBackendUrl(),
243+
r.getUser(),
244+
r.getSource(),
245+
r.getCaptureTime(),
246+
r.getRoutingGroup(),
247+
r.getExternalUrl());
248+
});
249+
if (flushed > 0) {
250+
log.info("Flushed %s buffered query_history entries", flushed);
251+
}
252+
else if (before > 0 && writeBuffer.size() == before) {
253+
log.warn("Failed to flush buffered query_history entries; will retry. bufferSize=%s", before);
254+
}
255+
}
256+
257+
@PreDestroy
258+
public void stop()
259+
{
260+
if (scheduledExecutor == null) {
261+
return;
262+
}
263+
try {
264+
flushBufferedWrites();
265+
}
266+
catch (RuntimeException t) {
267+
log.warn(t, "Error while flushing buffered query_history entries during shutdown");
268+
}
269+
finally {
270+
scheduledExecutor.shutdownNow();
271+
}
272+
}
169273
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.router;
15+
16+
import java.util.concurrent.BlockingDeque;
17+
import java.util.concurrent.LinkedBlockingDeque;
18+
import java.util.function.Consumer;
19+
20+
public final class WriteBuffer<T>
21+
{
22+
private final BlockingDeque<T> deque;
23+
24+
public WriteBuffer(int maxCapacity)
25+
{
26+
this.deque = new LinkedBlockingDeque<>(maxCapacity);
27+
}
28+
29+
/** Buffer an item for later flush. Drops the oldest if full. */
30+
public void buffer(T item)
31+
{
32+
if (!deque.offerLast(item)) {
33+
deque.pollFirst();
34+
deque.offerLast(item);
35+
}
36+
}
37+
38+
/**
39+
* Flushes items in insertion order by applying the provided flusher.
40+
* Stops immediately if flush can't be performed on item and re-inserts
41+
* the failed item at the head of the buffer.
42+
*
43+
* @param flusher consumer invoked for each buffered item
44+
* @return number of items successfully flushed
45+
*/
46+
public int flushAll(Consumer<T> flusher)
47+
{
48+
int flushed = 0;
49+
for (T next; (next = deque.pollFirst()) != null; ) {
50+
try {
51+
flusher.accept(next);
52+
flushed++;
53+
}
54+
catch (RuntimeException e) {
55+
deque.offerFirst(next);
56+
break; // stop after first failure
57+
}
58+
}
59+
return flushed;
60+
}
61+
62+
public int size()
63+
{
64+
return deque.size();
65+
}
66+
}

gateway-ha/src/test/java/io/trino/gateway/ha/router/BaseExternalUrlQueryHistoryTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.gateway.ha.router;
1515

1616
import io.trino.gateway.ha.config.DataStoreConfiguration;
17+
import io.trino.gateway.ha.config.WriteBufferConfiguration;
1718
import io.trino.gateway.ha.persistence.FlywayMigration;
1819
import io.trino.gateway.ha.persistence.JdbcConnectionManager;
1920
import org.junit.jupiter.api.AfterAll;
@@ -49,7 +50,8 @@ protected BaseExternalUrlQueryHistoryTest(JdbcDatabaseContainer<?> container)
4950
true);
5051
FlywayMigration.migrate(config);
5152
JdbcConnectionManager jdbcConnectionManager = createTestingJdbcConnectionManager(container, config);
52-
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"));
53+
WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration();
54+
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfiguration);
5355
}
5456

5557
@AfterAll

0 commit comments

Comments
 (0)