diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java index 74e52f0b7e..f666771b68 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java @@ -54,7 +54,16 @@ protected void registerMetric(MetricRegistry registry, MetricDefine define, Metr public void register(MetricRegistry registry) { if (globalRegistry == null) { - registerMetrics(registry); + try { + registerMetrics(registry); + } catch (Throwable t) { + // Roll back any metrics that were partially registered to prevent orphaned metrics + // in the global MetricRegistry. Without this, unregister() would be a no-op because + // globalRegistry is still null, leaving metrics permanently leaked. + registeredMetricKeys.forEach(registry::unregister); + registeredMetricKeys.clear(); + throw t; + } globalRegistry = registry; } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index fa0ee873c4..7edb998283 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -478,6 +478,32 @@ private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdent private boolean triggerTableAdded( ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) { + // Clean up any pre-existing runtime to prevent metric registration conflicts. + // This can happen when a table is deleted from DB (e.g., by dispose after filter change) + // but its runtime and metrics remain in memory. We search by table name rather than by ID + // because re-syncing a table creates a new DB row with a different ID. + tableRuntimeMap + .values() + .removeIf( + existing -> { + ServerTableIdentifier existingId = existing.getTableIdentifier(); + if (existingId.getCatalog().equals(serverTableIdentifier.getCatalog()) + && existingId.getDatabase().equals(serverTableIdentifier.getDatabase()) + && existingId.getTableName().equals(serverTableIdentifier.getTableName())) { + LOG.warn( + "Found existing table runtime for {}, disposing before re-adding.", + serverTableIdentifier); + try { + existing.dispose(); + } catch (Exception e) { + LOG.warn( + "Error disposing existing table runtime for {}", serverTableIdentifier, e); + } + return true; + } + return false; + }); + AmoroTable table = catalog.loadTable( serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName()); @@ -538,7 +564,14 @@ private void revertTableRuntimeAdded( externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier.getId()); + TableRuntime runtime = tableRuntimeMap.remove(tableIdentifier.getId()); + if (runtime != null) { + try { + runtime.dispose(); + } catch (Exception e) { + LOG.warn("Error disposing runtime during revert for {}", tableIdentifier, e); + } + } } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestAbstractTableMetricsPartialRegistration.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestAbstractTableMetricsPartialRegistration.java new file mode 100644 index 0000000000..6866be9ad0 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestAbstractTableMetricsPartialRegistration.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.metrics.Counter; +import org.apache.amoro.metrics.MetricDefine; +import org.apache.amoro.metrics.MetricRegistry; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test that AbstractTableMetrics.register properly rolls back partially registered metrics when + * registerMetrics throws an exception partway through. + */ +public class TestAbstractTableMetricsPartialRegistration { + + private static final MetricDefine METRIC_A = + MetricDefine.defineCounter("test_metric_a").withTags("catalog", "database", "table").build(); + + private static final MetricDefine METRIC_B = + MetricDefine.defineCounter("test_metric_b").withTags("catalog", "database", "table").build(); + + private static final MetricDefine METRIC_C = + MetricDefine.defineCounter("test_metric_c").withTags("catalog", "database", "table").build(); + + /** + * A test subclass of AbstractTableMetrics that registers 2 metrics successfully, then throws an + * exception on the 3rd metric. + */ + private static class PartialFailureMetrics extends AbstractTableMetrics { + + PartialFailureMetrics(ServerTableIdentifier identifier) { + super(identifier); + } + + @Override + protected void registerMetrics(MetricRegistry registry) { + registerMetric(registry, METRIC_A, new Counter()); + registerMetric(registry, METRIC_B, new Counter()); + // Simulate failure on the 3rd metric + throw new RuntimeException("Simulated registration failure"); + } + } + + /** A normal test subclass that registers all 3 metrics successfully. */ + private static class NormalMetrics extends AbstractTableMetrics { + + NormalMetrics(ServerTableIdentifier identifier) { + super(identifier); + } + + @Override + protected void registerMetrics(MetricRegistry registry) { + registerMetric(registry, METRIC_A, new Counter()); + registerMetric(registry, METRIC_B, new Counter()); + registerMetric(registry, METRIC_C, new Counter()); + } + } + + @Test + public void testPartialRegistrationRollback() { + MetricRegistry registry = new MetricRegistry(); + ServerTableIdentifier identifier = + ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG); + + PartialFailureMetrics metrics = new PartialFailureMetrics(identifier); + + // register should throw, and the 2 partially registered metrics should be rolled back + Assertions.assertThrows(RuntimeException.class, () -> metrics.register(registry)); + + // Verify: no metrics remain in the registry (all rolled back) + Assertions.assertEquals(0, registry.getMetrics().size()); + + // Verify: globalRegistry is still null (register didn't complete) + // This means unregister() would be a no-op, which is fine since we already cleaned up + metrics.unregister(); // should be safe no-op + + // Verify: a new metrics instance can register the same metrics without conflict + NormalMetrics newMetrics = new NormalMetrics(identifier); + newMetrics.register(registry); + Assertions.assertEquals(3, registry.getMetrics().size()); + + // Clean up + newMetrics.unregister(); + Assertions.assertEquals(0, registry.getMetrics().size()); + } + + @Test + public void testSuccessfulRegistrationAndUnregistration() { + MetricRegistry registry = new MetricRegistry(); + ServerTableIdentifier identifier = + ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG); + + NormalMetrics metrics = new NormalMetrics(identifier); + + // Normal registration should succeed + metrics.register(registry); + Assertions.assertEquals(3, registry.getMetrics().size()); + + // Unregister should clean up all metrics + metrics.unregister(); + Assertions.assertEquals(0, registry.getMetrics().size()); + + // Re-registration with a new instance should succeed + NormalMetrics newMetrics = new NormalMetrics(identifier); + newMetrics.register(registry); + Assertions.assertEquals(3, registry.getMetrics().size()); + + newMetrics.unregister(); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java index 9c0ff56237..e6ab3ac361 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java @@ -455,6 +455,84 @@ private UnifiedCatalog createNewCatalogTable(String catalogName, String dbName, return externalCatalog; } + /** + * Test that a table can be re-added after disposal without metric conflicts. This verifies the + * fix for the "Metric is already been registered" error that occurs when database-filter changes + * cause tables to be disposed and then re-synced. + */ + @Test + public void testTableReAddAfterDispose_MetricsClean() { + createTable(); + ServerTableIdentifier tableIdentifier = serverTableIdentifier(); + MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry(); + + // Verify table runtime and metrics are registered + List runtimesAfterAdd = persistency.getTableRuntimeMetas(); + Assert.assertEquals(1, runtimesAfterAdd.size()); + Assert.assertFalse(globalRegistry.getMetrics().isEmpty()); + + // Dispose the table (simulates filter change excluding all databases) + tableService().disposeTable(tableIdentifier); + List runtimesAfterDispose = persistency.getTableRuntimeMetas(); + Assert.assertEquals(0, runtimesAfterDispose.size()); + + // Re-sync via exploreTableRuntimes (simulates filter being removed, table re-discovered) + // The external iceberg table still exists, so it will be re-synced. + // This should NOT throw "Metric is already been registered" + tableService().exploreTableRuntimes(); + List runtimesAfterReAdd = persistency.getTableRuntimeMetas(); + Assert.assertEquals(1, runtimesAfterReAdd.size()); + Assert.assertFalse(globalRegistry.getMetrics().isEmpty()); + + dropTable(); + dropDatabase(); + } + + /** + * Test that triggerTableAdded is idempotent: if a table runtime already exists in the map (e.g., + * due to incomplete disposal), re-adding the table should dispose the old runtime first and + * succeed without metric conflicts. + */ + @Test + public void testTriggerTableAddedIdempotent() { + ExternalCatalog externalCatalog = initExternalCatalog(); + createTable(); + ServerTableIdentifier tableIdentifier = serverTableIdentifier(); + MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry(); + + // Verify initial state: 1 runtime, metrics registered + Assert.assertEquals(1, persistency.getTableRuntimeMetas().size()); + int metricCountBefore = globalRegistry.getMetrics().size(); + Assert.assertTrue(metricCountBefore > 0); + + // Simulate inconsistent state: delete table records from DB but leave runtime in map. + // This mimics the scenario where disposeTable's DB operation fails and the table is + // later removed from DB by another path, leaving orphaned metrics in the registry. + persistency.deleteTableRuntime(tableIdentifier.getId()); + persistency.deleteTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier()); + + // Verify DB is empty but runtime (with metrics) is still in memory + Assert.assertEquals(0, persistency.getTableRuntimeMetas().size()); + Assert.assertEquals(0, tableManager().listManagedTables().size()); + Assert.assertTrue(tableService().contains(tableIdentifier.getId())); + + // exploreExternalCatalog should detect the table in external catalog but not in DB, + // and call syncTable -> triggerTableAdded. With the fix, it should dispose the old + // runtime (by table name match) first, then register the new one without conflict. + tableService().exploreExternalCatalog(externalCatalog); + + // Verify: table is re-registered successfully with new ID, metrics present + List runtimesAfterReSync = persistency.getTableRuntimeMetas(); + Assert.assertEquals(1, runtimesAfterReSync.size()); + Assert.assertFalse(globalRegistry.getMetrics().isEmpty()); + + // The old runtime should be gone from the map (old ID) + Assert.assertFalse(tableService().contains(tableIdentifier.getId())); + + dropTable(); + dropDatabase(); + } + @Mock private DefaultTableRuntime tableRuntimeWithException; @Mock private ServerTableIdentifier tableIdentifierWithException;