Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HaGatewayConfiguration
private List<String> statementPaths = ImmutableList.of(V1_STATEMENT_PATH);
private boolean includeClusterHostInResponse;
private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration();
private WriteBufferConfiguration writeBuffer = new WriteBufferConfiguration();

private RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig();

Expand Down Expand Up @@ -268,6 +269,16 @@ public void setIncludeClusterHostInResponse(boolean includeClusterHostInResponse
this.includeClusterHostInResponse = includeClusterHostInResponse;
}

public WriteBufferConfiguration getWriteBuffer()
{
return writeBuffer;
}

public void setWriteBuffer(WriteBufferConfiguration writeBuffer)
{
this.writeBuffer = writeBuffer;
}

public ProxyResponseConfiguration getProxyResponseConfiguration()
{
return this.proxyResponseConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.config;

import io.airlift.units.Duration;

import java.util.concurrent.TimeUnit;

public class WriteBufferConfiguration
{
private boolean enabled;
private int maxCapacity = 10000;
private Duration flushInterval = new Duration(2, TimeUnit.SECONDS);

public WriteBufferConfiguration() {}

public WriteBufferConfiguration(boolean enabled, int maxCapacity, Duration flushInterval)
{
this.enabled = enabled;
this.maxCapacity = maxCapacity;
this.flushInterval = flushInterval;
}

public boolean isEnabled()
{
return enabled;
}

public void setEnabled(boolean enabled)
{
this.enabled = enabled;
}

public int getMaxCapacity()
{
return maxCapacity;
}

public void setMaxCapacity(int maxCapacity)
{
this.maxCapacity = maxCapacity;
}

public Duration getFlushInterval()
{
return flushInterval;
}

public void setFlushInterval(Duration flushInterval)
{
this.flushInterval = flushInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration)
JdbcConnectionManager connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore());
resourceGroupsManager = new HaResourceGroupsManager(connectionManager);
gatewayBackendManager = new HaGatewayManager(jdbi, configuration.getRouting());
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"));
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"), configuration.getWriteBuffer());
}

private LbOAuthManager getOAuthManager(HaGatewayConfiguration configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
package io.trino.gateway.ha.router;

import com.google.common.base.Strings;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.gateway.ha.config.WriteBufferConfiguration;
import io.trino.gateway.ha.domain.TableData;
import io.trino.gateway.ha.domain.request.QueryHistoryRequest;
import io.trino.gateway.ha.domain.response.DistributionResponse;
import io.trino.gateway.ha.persistence.dao.QueryHistory;
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
import jakarta.annotation.PreDestroy;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Jdbi;

import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -29,21 +35,45 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class HaQueryHistoryManager
implements QueryHistoryManager
{
private static final int FIRST_PAGE_NO = 1;
private static final Logger log = Logger.get(HaQueryHistoryManager.class);

private final QueryHistoryDao dao;
private final boolean isOracleBackend;
private final WriteBuffer<QueryDetail> writeBuffer;
private final ScheduledExecutorService scheduledExecutor;

public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend)
@Inject
public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend, WriteBufferConfiguration writeBufferConfig)
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class);
this.isOracleBackend = isOracleBackend;
if (writeBufferConfig != null && writeBufferConfig.isEnabled()) {
this.writeBuffer = new WriteBuffer<>(writeBufferConfig.getMaxCapacity());
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "query-history-write-buffer");
t.setDaemon(true);
return t;
});
scheduledExecutor.scheduleWithFixedDelay(
this::flushBufferedWrites,
writeBufferConfig.getFlushInterval().toMillis(),
writeBufferConfig.getFlushInterval().toMillis(),
TimeUnit.MILLISECONDS);
}
else {
this.writeBuffer = null;
this.scheduledExecutor = null;
}
}

@Override
Expand All @@ -54,15 +84,27 @@ public void submitQueryDetail(QueryDetail queryDetail)
return;
}

dao.insertHistory(
queryDetail.getQueryId(),
queryDetail.getQueryText(),
queryDetail.getBackendUrl(),
queryDetail.getUser(),
queryDetail.getSource(),
queryDetail.getCaptureTime(),
queryDetail.getRoutingGroup(),
queryDetail.getExternalUrl());
try {
dao.insertHistory(
queryDetail.getQueryId(),
queryDetail.getQueryText(),
queryDetail.getBackendUrl(),
queryDetail.getUser(),
queryDetail.getSource(),
queryDetail.getCaptureTime(),
queryDetail.getRoutingGroup(),
queryDetail.getExternalUrl());
}
catch (RuntimeException e) {
if (isConnectionIssue(e) && writeBuffer != null) {
writeBuffer.buffer(queryDetail);
log.warn(e, "DB unavailable; buffered query_history entry. queryId=%s, bufferSize=%s",
queryDetail.getQueryId(), writeBuffer.size());
}
else {
throw e;
}
}
}

@Override
Expand Down Expand Up @@ -166,4 +208,66 @@ private static int getStart(int pageNo, int pageSize)
}
return (pageNo - FIRST_PAGE_NO) * pageSize;
}

private static boolean isConnectionIssue(Throwable t)
{
// SQL State codes starting with "08" indicate connection exceptions per ANSI/ISO SQL standard.
// See: https://en.wikipedia.org/wiki/SQLSTATE
// Examples: 08000 (connection exception), 08001 (cannot establish connection),
// 08003 (connection does not exist), 08006 (connection failure), etc.
for (Throwable cur = t; cur != null; cur = cur.getCause()) {
if (cur instanceof ConnectionException) {
return true;
}
if (cur instanceof SQLException sql) {
String sqlState = sql.getSQLState();
if (sqlState != null && sqlState.startsWith("08")) {
return true;
}
}
}
return false;
}

private void flushBufferedWrites()
{
if (writeBuffer == null) {
return;
}
int before = writeBuffer.size();
int flushed = writeBuffer.flushAll(r -> {
dao.insertHistory(
r.getQueryId(),
r.getQueryText(),
r.getBackendUrl(),
r.getUser(),
r.getSource(),
r.getCaptureTime(),
r.getRoutingGroup(),
r.getExternalUrl());
});
if (flushed > 0) {
log.info("Flushed %s buffered query_history entries", flushed);
}
else if (before > 0 && writeBuffer.size() == before) {
log.warn("Failed to flush buffered query_history entries; will retry. bufferSize=%s", before);
}
}

@PreDestroy
public void stop()
{
if (scheduledExecutor == null) {
return;
}
try {
flushBufferedWrites();
}
catch (RuntimeException t) {
log.warn(t, "Error while flushing buffered query_history entries during shutdown");
}
finally {
scheduledExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.router;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;

public final class WriteBuffer<T>
{
private final BlockingDeque<T> deque;

public WriteBuffer(int maxCapacity)
{
this.deque = new LinkedBlockingDeque<>(maxCapacity);
}

/** Buffer an item for later flush. Drops the oldest if full. */
public void buffer(T item)
{
if (!deque.offerLast(item)) {
deque.pollFirst();
deque.offerLast(item);
}
}

/**
* Flushes items in insertion order by applying the provided flusher.
* Stops immediately if flush can't be performed on item and re-inserts
* the failed item at the head of the buffer.
*
* @param flusher consumer invoked for each buffered item
* @return number of items successfully flushed
*/
public int flushAll(Consumer<T> flusher)
{
int flushed = 0;
for (T next; (next = deque.pollFirst()) != null; ) {
try {
flusher.accept(next);
flushed++;
}
catch (RuntimeException e) {
deque.offerFirst(next);
break; // stop after first failure
}
}
return flushed;
}

public int size()
{
return deque.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.gateway.ha.router;

import io.trino.gateway.ha.config.DataStoreConfiguration;
import io.trino.gateway.ha.config.WriteBufferConfiguration;
import io.trino.gateway.ha.persistence.FlywayMigration;
import io.trino.gateway.ha.persistence.JdbcConnectionManager;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -49,7 +50,8 @@ protected BaseExternalUrlQueryHistoryTest(JdbcDatabaseContainer<?> container)
true);
FlywayMigration.migrate(config);
JdbcConnectionManager jdbcConnectionManager = createTestingJdbcConnectionManager(container, config);
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"));
WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration();
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfiguration);
}

@AfterAll
Expand Down
Loading