Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ protected void registerMetric(MetricRegistry registry, MetricDefine define, Metr

public void register(MetricRegistry registry) {
if (globalRegistry == null) {
registerMetrics(registry);
try {
registerMetrics(registry);
} catch (Throwable t) {
// Roll back any metrics that were partially registered to prevent orphaned metrics
// in the global MetricRegistry. Without this, unregister() would be a no-op because
// globalRegistry is still null, leaving metrics permanently leaked.
registeredMetricKeys.forEach(registry::unregister);
registeredMetricKeys.clear();
throw t;
}
globalRegistry = registry;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,32 @@ private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdent

private boolean triggerTableAdded(
ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) {
// Clean up any pre-existing runtime to prevent metric registration conflicts.
// This can happen when a table is deleted from DB (e.g., by dispose after filter change)
// but its runtime and metrics remain in memory. We search by table name rather than by ID
// because re-syncing a table creates a new DB row with a different ID.
tableRuntimeMap
.values()
.removeIf(
existing -> {
ServerTableIdentifier existingId = existing.getTableIdentifier();
if (existingId.getCatalog().equals(serverTableIdentifier.getCatalog())
&& existingId.getDatabase().equals(serverTableIdentifier.getDatabase())
&& existingId.getTableName().equals(serverTableIdentifier.getTableName())) {
LOG.warn(
"Found existing table runtime for {}, disposing before re-adding.",
serverTableIdentifier);
try {
existing.dispose();
} catch (Exception e) {
LOG.warn(
"Error disposing existing table runtime for {}", serverTableIdentifier, e);
}
return true;
}
return false;
});

AmoroTable<?> table =
catalog.loadTable(
serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName());
Expand Down Expand Up @@ -538,7 +564,14 @@ private void revertTableRuntimeAdded(
externalCatalog.getServerTableIdentifier(
tableIdentity.getDatabase(), tableIdentity.getTableName());
if (tableIdentifier != null) {
tableRuntimeMap.remove(tableIdentifier.getId());
TableRuntime runtime = tableRuntimeMap.remove(tableIdentifier.getId());
if (runtime != null) {
try {
runtime.dispose();
} catch (Exception e) {
LOG.warn("Error disposing runtime during revert for {}", tableIdentifier, e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.metrics.Counter;
import org.apache.amoro.metrics.MetricDefine;
import org.apache.amoro.metrics.MetricRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Test that AbstractTableMetrics.register properly rolls back partially registered metrics when
* registerMetrics throws an exception partway through.
*/
public class TestAbstractTableMetricsPartialRegistration {

private static final MetricDefine METRIC_A =
MetricDefine.defineCounter("test_metric_a").withTags("catalog", "database", "table").build();

private static final MetricDefine METRIC_B =
MetricDefine.defineCounter("test_metric_b").withTags("catalog", "database", "table").build();

private static final MetricDefine METRIC_C =
MetricDefine.defineCounter("test_metric_c").withTags("catalog", "database", "table").build();

/**
* A test subclass of AbstractTableMetrics that registers 2 metrics successfully, then throws an
* exception on the 3rd metric.
*/
private static class PartialFailureMetrics extends AbstractTableMetrics {

PartialFailureMetrics(ServerTableIdentifier identifier) {
super(identifier);
}

@Override
protected void registerMetrics(MetricRegistry registry) {
registerMetric(registry, METRIC_A, new Counter());
registerMetric(registry, METRIC_B, new Counter());
// Simulate failure on the 3rd metric
throw new RuntimeException("Simulated registration failure");
}
}

/** A normal test subclass that registers all 3 metrics successfully. */
private static class NormalMetrics extends AbstractTableMetrics {

NormalMetrics(ServerTableIdentifier identifier) {
super(identifier);
}

@Override
protected void registerMetrics(MetricRegistry registry) {
registerMetric(registry, METRIC_A, new Counter());
registerMetric(registry, METRIC_B, new Counter());
registerMetric(registry, METRIC_C, new Counter());
}
}

@Test
public void testPartialRegistrationRollback() {
MetricRegistry registry = new MetricRegistry();
ServerTableIdentifier identifier =
ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG);

PartialFailureMetrics metrics = new PartialFailureMetrics(identifier);

// register should throw, and the 2 partially registered metrics should be rolled back
Assertions.assertThrows(RuntimeException.class, () -> metrics.register(registry));

// Verify: no metrics remain in the registry (all rolled back)
Assertions.assertEquals(0, registry.getMetrics().size());

// Verify: globalRegistry is still null (register didn't complete)
// This means unregister() would be a no-op, which is fine since we already cleaned up
metrics.unregister(); // should be safe no-op

// Verify: a new metrics instance can register the same metrics without conflict
NormalMetrics newMetrics = new NormalMetrics(identifier);
newMetrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

// Clean up
newMetrics.unregister();
Assertions.assertEquals(0, registry.getMetrics().size());
}

@Test
public void testSuccessfulRegistrationAndUnregistration() {
MetricRegistry registry = new MetricRegistry();
ServerTableIdentifier identifier =
ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG);

NormalMetrics metrics = new NormalMetrics(identifier);

// Normal registration should succeed
metrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

// Unregister should clean up all metrics
metrics.unregister();
Assertions.assertEquals(0, registry.getMetrics().size());

// Re-registration with a new instance should succeed
NormalMetrics newMetrics = new NormalMetrics(identifier);
newMetrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

newMetrics.unregister();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,84 @@ private UnifiedCatalog createNewCatalogTable(String catalogName, String dbName,
return externalCatalog;
}

/**
* Test that a table can be re-added after disposal without metric conflicts. This verifies the
* fix for the "Metric is already been registered" error that occurs when database-filter changes
* cause tables to be disposed and then re-synced.
*/
@Test
public void testTableReAddAfterDispose_MetricsClean() {
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

// Verify table runtime and metrics are registered
List<TableRuntimeMeta> runtimesAfterAdd = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterAdd.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

// Dispose the table (simulates filter change excluding all databases)
tableService().disposeTable(tableIdentifier);
List<TableRuntimeMeta> runtimesAfterDispose = persistency.getTableRuntimeMetas();
Assert.assertEquals(0, runtimesAfterDispose.size());

// Re-sync via exploreTableRuntimes (simulates filter being removed, table re-discovered)
// The external iceberg table still exists, so it will be re-synced.
// This should NOT throw "Metric is already been registered"
tableService().exploreTableRuntimes();
List<TableRuntimeMeta> runtimesAfterReAdd = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterReAdd.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

dropTable();
dropDatabase();
}

/**
* Test that triggerTableAdded is idempotent: if a table runtime already exists in the map (e.g.,
* due to incomplete disposal), re-adding the table should dispose the old runtime first and
* succeed without metric conflicts.
*/
@Test
public void testTriggerTableAddedIdempotent() {
ExternalCatalog externalCatalog = initExternalCatalog();
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

// Verify initial state: 1 runtime, metrics registered
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
int metricCountBefore = globalRegistry.getMetrics().size();
Assert.assertTrue(metricCountBefore > 0);

// Simulate inconsistent state: delete table records from DB but leave runtime in map.
// This mimics the scenario where disposeTable's DB operation fails and the table is
// later removed from DB by another path, leaving orphaned metrics in the registry.
persistency.deleteTableRuntime(tableIdentifier.getId());
persistency.deleteTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier());

// Verify DB is empty but runtime (with metrics) is still in memory
Assert.assertEquals(0, persistency.getTableRuntimeMetas().size());
Assert.assertEquals(0, tableManager().listManagedTables().size());
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));

// exploreExternalCatalog should detect the table in external catalog but not in DB,
// and call syncTable -> triggerTableAdded. With the fix, it should dispose the old
// runtime (by table name match) first, then register the new one without conflict.
tableService().exploreExternalCatalog(externalCatalog);

// Verify: table is re-registered successfully with new ID, metrics present
List<TableRuntimeMeta> runtimesAfterReSync = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterReSync.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

// The old runtime should be gone from the map (old ID)
Assert.assertFalse(tableService().contains(tableIdentifier.getId()));

dropTable();
dropDatabase();
}

@Mock private DefaultTableRuntime tableRuntimeWithException;
@Mock private ServerTableIdentifier tableIdentifierWithException;

Expand Down
Loading