Skip to content

Commit 0cfa9a4

Browse files
committed
[AMORO-4034][Improvement]: MaintainerMetrics Unified interface and Report (#4034)
1 parent 869860e commit 0cfa9a4

14 files changed

Lines changed: 1603 additions & 208 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.amoro.maintainer.OptimizingInfo;
2424
import org.apache.amoro.maintainer.TableMaintainerContext;
2525
import org.apache.amoro.server.table.DefaultTableRuntime;
26-
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
2726
import org.apache.amoro.server.utils.HiveLocationUtil;
2827
import org.apache.amoro.table.MixedTable;
2928

@@ -56,18 +55,11 @@ public TableConfiguration getTableConfiguration() {
5655

5756
@Override
5857
public MaintainerMetrics getMetrics() {
59-
TableOrphanFilesCleaningMetrics metrics = tableRuntime.getOrphanFilesCleaningMetrics();
60-
return new MaintainerMetrics() {
61-
@Override
62-
public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
63-
metrics.completeOrphanDataFiles(expected, cleaned);
64-
}
65-
66-
@Override
67-
public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
68-
metrics.completeOrphanMetadataFiles(expected, cleaned);
69-
}
70-
};
58+
// Return the full TableMaintainerMetricsImpl directly
59+
// This provides access to all maintainer metrics including orphan files cleaning,
60+
// dangling delete files cleaning, snapshot expiration, data expiration, tag creation,
61+
// and partition expiration.
62+
return tableRuntime.getMaintainerMetrics();
7163
}
7264

7365
@Override
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.table;
20+
21+
import static org.apache.amoro.metrics.MetricDefine.defineCounter;
22+
import static org.apache.amoro.metrics.MetricDefine.defineGauge;
23+
24+
import org.apache.amoro.ServerTableIdentifier;
25+
import org.apache.amoro.maintainer.MaintainerMetrics;
26+
import org.apache.amoro.metrics.MetricDefine;
27+
28+
/**
29+
* Abstract base class for table maintenance operation metrics
30+
*
31+
* <p>Responsibilities:
32+
*
33+
* <ul>
34+
* <li>Define all MetricDefine constants
35+
* <li>Provide template methods for metrics registration
36+
* <li>Handle tags (catalog, database, table, table_format, operation_type)
37+
* </ul>
38+
*
39+
* <p>Note: All metrics include the table_format tag to distinguish Iceberg and Paimon tables
40+
*/
41+
public abstract class AbstractTableMaintainerMetrics extends AbstractTableMetrics
42+
implements MaintainerMetrics {
43+
44+
/** Table format constant: Iceberg native table */
45+
protected static final String TABLE_FORMAT_ICEBERG = "iceberg";
46+
47+
/** Table format constant: Paimon native table */
48+
protected static final String TABLE_FORMAT_PAIMON = "paimon";
49+
50+
/** Table format constant: Mixed table (based on Iceberg) */
51+
protected static final String TABLE_FORMAT_MIXED_ICEBERG = "mixed_iceberg";
52+
53+
/** Table format constant: Hive table */
54+
protected static final String TABLE_FORMAT_HIVE = "hive";
55+
56+
// ========== Orphan Files Related MetricDefine ==========
57+
58+
/** Count of orphan data files cleaned */
59+
public static final MetricDefine TABLE_ORPHAN_DATA_FILES_CLEANED_COUNT =
60+
defineCounter("table_orphan_data_files_cleaned_count")
61+
.withDescription("Count of orphan data files cleaned")
62+
.withTags("catalog", "database", "table", "table_format")
63+
.build();
64+
65+
/** Expected count of orphan data files to clean */
66+
public static final MetricDefine TABLE_ORPHAN_DATA_FILES_CLEANED_EXPECTED_COUNT =
67+
defineCounter("table_orphan_data_files_cleaned_expected_count")
68+
.withDescription("Expected count of orphan data files to clean")
69+
.withTags("catalog", "database", "table", "table_format")
70+
.build();
71+
72+
/** Count of orphan metadata files cleaned */
73+
public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT =
74+
defineCounter("table_orphan_metadata_files_cleaned_count")
75+
.withDescription("Count of orphan metadata files cleaned")
76+
.withTags("catalog", "database", "table", "table_format")
77+
.build();
78+
79+
/** Expected count of orphan metadata files to clean */
80+
public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT =
81+
defineCounter("table_orphan_metadata_files_cleaned_expected_count")
82+
.withDescription("Expected count of orphan metadata files to clean")
83+
.withTags("catalog", "database", "table", "table_format")
84+
.build();
85+
86+
/** Duration of orphan files cleaning operation (milliseconds) */
87+
public static final MetricDefine TABLE_ORPHAN_FILES_CLEANING_DURATION =
88+
defineGauge("table_orphan_files_cleaning_duration_millis")
89+
.withDescription("Duration of orphan files cleaning operation in milliseconds")
90+
.withTags("catalog", "database", "table", "table_format")
91+
.build();
92+
93+
// ========== Dangling Delete Files Related MetricDefine (Iceberg) ==========
94+
95+
/** Count of dangling delete files cleaned */
96+
public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT =
97+
defineCounter("table_dangling_delete_files_cleaned_count")
98+
.withDescription("Count of dangling delete files cleaned")
99+
.withTags("catalog", "database", "table", "table_format")
100+
.build();
101+
102+
/** Duration of dangling delete files cleaning operation (milliseconds) */
103+
public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION =
104+
defineGauge("table_dangling_delete_files_cleaning_duration_millis")
105+
.withDescription("Duration of dangling delete files cleaning operation in milliseconds")
106+
.withTags("catalog", "database", "table", "table_format")
107+
.build();
108+
109+
// ========== Snapshot Expiration Related MetricDefine ==========
110+
111+
/** Count of snapshots expired */
112+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_COUNT =
113+
defineCounter("table_snapshots_expired_count")
114+
.withDescription("Count of snapshots expired")
115+
.withTags("catalog", "database", "table", "table_format")
116+
.build();
117+
118+
/** Count of data files deleted during snapshot expiration */
119+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED =
120+
defineCounter("table_snapshots_expired_data_files_deleted")
121+
.withDescription("Count of data files deleted during snapshot expiration")
122+
.withTags("catalog", "database", "table", "table_format")
123+
.build();
124+
125+
/** Duration of snapshot expiration operation (milliseconds) */
126+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRATION_DURATION =
127+
defineGauge("table_snapshots_expiration_duration_millis")
128+
.withDescription("Duration of snapshot expiration operation in milliseconds")
129+
.withTags("catalog", "database", "table", "table_format")
130+
.build();
131+
132+
// ========== Data Expiration Related MetricDefine (Iceberg) ==========
133+
134+
/** Count of data files expired */
135+
public static final MetricDefine TABLE_DATA_EXPIRED_DATA_FILES_COUNT =
136+
defineCounter("table_data_expired_data_files_count")
137+
.withDescription("Count of data files expired")
138+
.withTags("catalog", "database", "table", "table_format")
139+
.build();
140+
141+
/** Count of delete files expired */
142+
public static final MetricDefine TABLE_DATA_EXPIRED_DELETE_FILES_COUNT =
143+
defineCounter("table_data_expired_delete_files_count")
144+
.withDescription("Count of delete files expired")
145+
.withTags("catalog", "database", "table", "table_format")
146+
.build();
147+
148+
/** Duration of data expiration operation (milliseconds) */
149+
public static final MetricDefine TABLE_DATA_EXPIRATION_DURATION =
150+
defineGauge("table_data_expiration_duration_millis")
151+
.withDescription("Duration of data expiration operation in milliseconds")
152+
.withTags("catalog", "database", "table", "table_format")
153+
.build();
154+
155+
// ========== Tag Creation Related MetricDefine (Iceberg) ==========
156+
157+
/** Count of tags created */
158+
public static final MetricDefine TABLE_TAGS_CREATED_COUNT =
159+
defineCounter("table_tags_created_count")
160+
.withDescription("Count of tags created")
161+
.withTags("catalog", "database", "table", "table_format")
162+
.build();
163+
164+
/** Duration of tag creation operation (milliseconds) */
165+
public static final MetricDefine TABLE_TAG_CREATION_DURATION =
166+
defineGauge("table_tag_creation_duration_millis")
167+
.withDescription("Duration of tag creation operation in milliseconds")
168+
.withTags("catalog", "database", "table", "table_format")
169+
.build();
170+
171+
// ========== Partition Expiration Related MetricDefine (Paimon) ==========
172+
173+
/** Count of partitions expired */
174+
public static final MetricDefine TABLE_PARTITIONS_EXPIRED_COUNT =
175+
defineCounter("table_partitions_expired_count")
176+
.withDescription("Count of partitions expired")
177+
.withTags("catalog", "database", "table", "table_format")
178+
.build();
179+
180+
/** Count of files expired during partition expiration */
181+
public static final MetricDefine TABLE_PARTITIONS_EXPIRED_FILES_COUNT =
182+
defineCounter("table_partitions_expired_files_count")
183+
.withDescription("Count of files expired during partition expiration")
184+
.withTags("catalog", "database", "table", "table_format")
185+
.build();
186+
187+
/** Duration of partition expiration operation (milliseconds) */
188+
public static final MetricDefine TABLE_PARTITION_EXPIRATION_DURATION =
189+
defineGauge("table_partition_expiration_duration_millis")
190+
.withDescription("Duration of partition expiration operation in milliseconds")
191+
.withTags("catalog", "database", "table", "table_format")
192+
.build();
193+
194+
// ========== General Operation Status Related MetricDefine ==========
195+
196+
/** Count of successful maintainer operations */
197+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT =
198+
defineCounter("table_maintainer_operation_success_count")
199+
.withDescription("Count of successful maintainer operations")
200+
.withTags("catalog", "database", "table", "table_format", "operation_type")
201+
.build();
202+
203+
/** Count of failed maintainer operations */
204+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_FAILURE_COUNT =
205+
defineCounter("table_maintainer_operation_failure_count")
206+
.withDescription("Count of failed maintainer operations")
207+
.withTags("catalog", "database", "table", "table_format", "operation_type")
208+
.build();
209+
210+
/** Duration of maintainer operation (milliseconds) */
211+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_DURATION =
212+
defineGauge("table_maintainer_operation_duration_millis")
213+
.withDescription("Duration of maintainer operation in milliseconds")
214+
.withTags("catalog", "database", "table", "table_format", "operation_type")
215+
.build();
216+
217+
/** Table format type */
218+
protected final String tableFormat;
219+
220+
/**
221+
* Constructor
222+
*
223+
* @param identifier Table identifier (contains format information via getFormat())
224+
*/
225+
protected AbstractTableMaintainerMetrics(ServerTableIdentifier identifier) {
226+
super(identifier);
227+
this.tableFormat = identifier.getFormat().name().toLowerCase();
228+
}
229+
}

amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ protected AbstractTableMetrics(ServerTableIdentifier identifier) {
3737
this.identifier = identifier;
3838
}
3939

40+
/**
41+
* Get the table identifier.
42+
*
43+
* @return ServerTableIdentifier
44+
*/
45+
public ServerTableIdentifier getIdentifier() {
46+
return identifier;
47+
}
48+
4049
protected void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) {
4150
MetricKey key =
4251
registry.register(

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime
9797

9898
private final Map<Action, TableProcessContainer> processContainerMap = Maps.newConcurrentMap();
9999
private final TableOptimizingMetrics optimizingMetrics;
100-
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
100+
private final TableMaintainerMetricsImpl maintainerMetrics;
101101
private final TableSummaryMetrics tableSummaryMetrics;
102102
private volatile long lastPlanTime;
103103
private volatile OptimizingProcess optimizingProcess;
@@ -107,8 +107,7 @@ public DefaultTableRuntime(TableRuntimeStore store) {
107107
super(store);
108108
this.optimizingMetrics =
109109
new TableOptimizingMetrics(store.getTableIdentifier(), store.getGroupName());
110-
this.orphanFilesCleaningMetrics =
111-
new TableOrphanFilesCleaningMetrics(store.getTableIdentifier());
110+
this.maintainerMetrics = new TableMaintainerMetricsImpl(store.getTableIdentifier());
112111
this.tableSummaryMetrics = new TableSummaryMetrics(store.getTableIdentifier());
113112
}
114113

@@ -124,7 +123,7 @@ public void recover(OptimizingProcess optimizingProcess) {
124123
public void registerMetric(MetricRegistry metricRegistry) {
125124
// TODO: extract method to interface.
126125
this.optimizingMetrics.register(metricRegistry);
127-
this.orphanFilesCleaningMetrics.register(metricRegistry);
126+
this.maintainerMetrics.registerMetrics(metricRegistry);
128127
this.tableSummaryMetrics.register(metricRegistry);
129128
}
130129

@@ -161,8 +160,13 @@ public List<TableProcessStore> getProcessStates(Action action) {
161160
return processContainerMap.get(action).getProcessStates();
162161
}
163162

164-
public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() {
165-
return orphanFilesCleaningMetrics;
163+
/**
164+
* Get the maintainer metrics implementation.
165+
*
166+
* @return TableMaintainerMetricsImpl instance
167+
*/
168+
public TableMaintainerMetricsImpl getMaintainerMetrics() {
169+
return maintainerMetrics;
166170
}
167171

168172
public long getCurrentSnapshotId() {
@@ -472,7 +476,7 @@ public void beginCommitting() {
472476
@Override
473477
public void unregisterMetric() {
474478
tableSummaryMetrics.unregister();
475-
orphanFilesCleaningMetrics.unregister();
479+
maintainerMetrics.unregister();
476480
optimizingMetrics.unregister();
477481
}
478482

0 commit comments

Comments
 (0)