Skip to content

Commit a4468f3

Browse files
czy006chenzhengyu
authored andcommitted
[AMORO-4034][Improvement]: MaintainerMetrics Unified interface and Report (#4034)
1 parent 869860e commit a4468f3

14 files changed

Lines changed: 1604 additions & 208 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/DefaultTableMaintainerContext.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.amoro.maintainer.OptimizingInfo;
2424
import org.apache.amoro.maintainer.TableMaintainerContext;
2525
import org.apache.amoro.server.table.DefaultTableRuntime;
26-
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
2726
import org.apache.amoro.server.utils.HiveLocationUtil;
2827
import org.apache.amoro.table.MixedTable;
2928

@@ -56,18 +55,11 @@ public TableConfiguration getTableConfiguration() {
5655

5756
@Override
5857
public MaintainerMetrics getMetrics() {
59-
TableOrphanFilesCleaningMetrics metrics = tableRuntime.getOrphanFilesCleaningMetrics();
60-
return new MaintainerMetrics() {
61-
@Override
62-
public void recordOrphanDataFilesCleaned(int expected, int cleaned) {
63-
metrics.completeOrphanDataFiles(expected, cleaned);
64-
}
65-
66-
@Override
67-
public void recordOrphanMetadataFilesCleaned(int expected, int cleaned) {
68-
metrics.completeOrphanMetadataFiles(expected, cleaned);
69-
}
70-
};
58+
// Return the full TableMaintainerMetricsImpl directly
59+
// This provides access to all maintainer metrics including orphan files cleaning,
60+
// dangling delete files cleaning, snapshot expiration, data expiration, tag creation,
61+
// and partition expiration.
62+
return tableRuntime.getMaintainerMetrics();
7163
}
7264

7365
@Override
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.table;
20+
21+
import static org.apache.amoro.metrics.MetricDefine.defineCounter;
22+
import static org.apache.amoro.metrics.MetricDefine.defineGauge;
23+
24+
import org.apache.amoro.ServerTableIdentifier;
25+
import org.apache.amoro.maintainer.MaintainerMetrics;
26+
import org.apache.amoro.metrics.MetricDefine;
27+
28+
/**
29+
* Abstract base class for table maintenance operation metrics
30+
*
31+
* <p>Responsibilities:
32+
*
33+
* <ul>
34+
* <li>Define all MetricDefine constants
35+
* <li>Provide template methods for metrics registration
36+
* <li>Handle tags (catalog, database, table, table_format, operation_type)
37+
* </ul>
38+
*
39+
* <p>Note: All metrics include the table_format tag to distinguish Iceberg and Paimon tables
40+
*/
41+
public abstract class AbstractTableMaintainerMetrics extends AbstractTableMetrics
42+
implements MaintainerMetrics {
43+
44+
/** Table format constant: Iceberg native table */
45+
protected static final String TABLE_FORMAT_ICEBERG = "iceberg";
46+
47+
/** Table format constant: Paimon native table */
48+
protected static final String TABLE_FORMAT_PAIMON = "paimon";
49+
50+
/** Table format constant: Mixed table (based on Iceberg) */
51+
protected static final String TABLE_FORMAT_MIXED_ICEBERG = "mixed_iceberg";
52+
53+
/** Table format constant: Hive table */
54+
protected static final String TABLE_FORMAT_HIVE = "hive";
55+
56+
// ========== Orphan Files Related MetricDefine ==========
57+
58+
/** Count of orphan data files cleaned */
59+
public static final MetricDefine TABLE_ORPHAN_DATA_FILES_CLEANED_COUNT =
60+
defineCounter("table_orphan_data_files_cleaned_count")
61+
.withDescription("Count of orphan data files cleaned")
62+
.withTags("catalog", "database", "table", "table_format")
63+
.build();
64+
65+
/** Expected count of orphan data files to clean */
66+
public static final MetricDefine TABLE_ORPHAN_DATA_FILES_CLEANED_EXPECTED_COUNT =
67+
defineCounter("table_orphan_data_files_cleaned_expected_count")
68+
.withDescription("Expected count of orphan data files to clean")
69+
.withTags("catalog", "database", "table", "table_format")
70+
.build();
71+
72+
/** Count of orphan metadata files cleaned */
73+
public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_COUNT =
74+
defineCounter("table_orphan_metadata_files_cleaned_count")
75+
.withDescription("Count of orphan metadata files cleaned")
76+
.withTags("catalog", "database", "table", "table_format")
77+
.build();
78+
79+
/** Expected count of orphan metadata files to clean */
80+
public static final MetricDefine TABLE_ORPHAN_METADATA_FILES_CLEANED_EXPECTED_COUNT =
81+
defineCounter("table_orphan_metadata_files_cleaned_expected_count")
82+
.withDescription("Expected count of orphan metadata files to clean")
83+
.withTags("catalog", "database", "table", "table_format")
84+
.build();
85+
86+
/** Duration of orphan files cleaning operation (milliseconds) */
87+
public static final MetricDefine TABLE_ORPHAN_FILES_CLEANING_DURATION =
88+
defineGauge("table_orphan_files_cleaning_duration_millis")
89+
.withDescription("Duration of orphan files cleaning operation in milliseconds")
90+
.withTags("catalog", "database", "table", "table_format")
91+
.build();
92+
93+
// ========== Dangling Delete Files Related MetricDefine (Iceberg) ==========
94+
95+
/** Count of dangling delete files cleaned */
96+
public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANED_COUNT =
97+
defineCounter("table_dangling_delete_files_cleaned_count")
98+
.withDescription("Count of dangling delete files cleaned")
99+
.withTags("catalog", "database", "table", "table_format")
100+
.build();
101+
102+
/** Duration of dangling delete files cleaning operation (milliseconds) */
103+
public static final MetricDefine TABLE_DANGLING_DELETE_FILES_CLEANING_DURATION =
104+
defineGauge("table_dangling_delete_files_cleaning_duration_millis")
105+
.withDescription("Duration of dangling delete files cleaning operation in milliseconds")
106+
.withTags("catalog", "database", "table", "table_format")
107+
.build();
108+
109+
// ========== Snapshot Expiration Related MetricDefine ==========
110+
111+
/** Count of snapshots expired */
112+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_COUNT =
113+
defineCounter("table_snapshots_expired_count")
114+
.withDescription("Count of snapshots expired")
115+
.withTags("catalog", "database", "table", "table_format")
116+
.build();
117+
118+
/** Count of data files deleted during snapshot expiration */
119+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRED_DATA_FILES_DELETED =
120+
defineCounter("table_snapshots_expired_data_files_deleted")
121+
.withDescription("Count of data files deleted during snapshot expiration")
122+
.withTags("catalog", "database", "table", "table_format")
123+
.build();
124+
125+
/** Duration of snapshot expiration operation (milliseconds) */
126+
public static final MetricDefine TABLE_SNAPSHOTS_EXPIRATION_DURATION =
127+
defineGauge("table_snapshots_expiration_duration_millis")
128+
.withDescription("Duration of snapshot expiration operation in milliseconds")
129+
.withTags("catalog", "database", "table", "table_format")
130+
.build();
131+
132+
// ========== Data Expiration Related MetricDefine (Iceberg) ==========
133+
134+
/** Count of data files expired */
135+
public static final MetricDefine TABLE_DATA_EXPIRED_DATA_FILES_COUNT =
136+
defineCounter("table_data_expired_data_files_count")
137+
.withDescription("Count of data files expired")
138+
.withTags("catalog", "database", "table", "table_format")
139+
.build();
140+
141+
/** Count of delete files expired */
142+
public static final MetricDefine TABLE_DATA_EXPIRED_DELETE_FILES_COUNT =
143+
defineCounter("table_data_expired_delete_files_count")
144+
.withDescription("Count of delete files expired")
145+
.withTags("catalog", "database", "table", "table_format")
146+
.build();
147+
148+
/** Duration of data expiration operation (milliseconds) */
149+
public static final MetricDefine TABLE_DATA_EXPIRATION_DURATION =
150+
defineGauge("table_data_expiration_duration_millis")
151+
.withDescription("Duration of data expiration operation in milliseconds")
152+
.withTags("catalog", "database", "table", "table_format")
153+
.build();
154+
155+
// ========== Tag Creation Related MetricDefine (Iceberg) ==========
156+
157+
/** Count of tags created */
158+
public static final MetricDefine TABLE_TAGS_CREATED_COUNT =
159+
defineCounter("table_tags_created_count")
160+
.withDescription("Count of tags created")
161+
.withTags("catalog", "database", "table", "table_format")
162+
.build();
163+
164+
/** Duration of tag creation operation (milliseconds) */
165+
public static final MetricDefine TABLE_TAG_CREATION_DURATION =
166+
defineGauge("table_tag_creation_duration_millis")
167+
.withDescription("Duration of tag creation operation in milliseconds")
168+
.withTags("catalog", "database", "table", "table_format")
169+
.build();
170+
171+
// ========== Partition Expiration Related MetricDefine (Paimon) ==========
172+
173+
/** Count of partitions expired */
174+
public static final MetricDefine TABLE_PARTITIONS_EXPIRED_COUNT =
175+
defineCounter("table_partitions_expired_count")
176+
.withDescription("Count of partitions expired")
177+
.withTags("catalog", "database", "table", "table_format")
178+
.build();
179+
180+
/** Count of files expired during partition expiration */
181+
public static final MetricDefine TABLE_PARTITIONS_EXPIRED_FILES_COUNT =
182+
defineCounter("table_partitions_expired_files_count")
183+
.withDescription("Count of files expired during partition expiration")
184+
.withTags("catalog", "database", "table", "table_format")
185+
.build();
186+
187+
/** Duration of partition expiration operation (milliseconds) */
188+
public static final MetricDefine TABLE_PARTITION_EXPIRATION_DURATION =
189+
defineGauge("table_partition_expiration_duration_millis")
190+
.withDescription("Duration of partition expiration operation in milliseconds")
191+
.withTags("catalog", "database", "table", "table_format")
192+
.build();
193+
194+
// ========== General Operation Status Related MetricDefine ==========
195+
196+
/** Count of successful maintainer operations */
197+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_SUCCESS_COUNT =
198+
defineCounter("table_maintainer_operation_success_count")
199+
.withDescription("Count of successful maintainer operations")
200+
.withTags("catalog", "database", "table", "table_format", "operation_type")
201+
.build();
202+
203+
/** Count of failed maintainer operations */
204+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_FAILURE_COUNT =
205+
defineCounter("table_maintainer_operation_failure_count")
206+
.withDescription("Count of failed maintainer operations")
207+
.withTags("catalog", "database", "table", "table_format", "operation_type")
208+
.build();
209+
210+
/** Duration of maintainer operation (milliseconds) */
211+
public static final MetricDefine TABLE_MAINTAINER_OPERATION_DURATION =
212+
defineGauge("table_maintainer_operation_duration_millis")
213+
.withDescription("Duration of maintainer operation in milliseconds")
214+
.withTags("catalog", "database", "table", "table_format", "operation_type")
215+
.build();
216+
217+
/** Table format type */
218+
protected final String tableFormat;
219+
220+
/**
221+
* Constructor
222+
*
223+
* @param identifier Table identifier (contains format information via getFormat())
224+
*/
225+
protected AbstractTableMaintainerMetrics(ServerTableIdentifier identifier) {
226+
super(identifier);
227+
this.tableFormat = identifier.getFormat().name().toLowerCase();
228+
}
229+
}

amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableMetrics.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ protected AbstractTableMetrics(ServerTableIdentifier identifier) {
3737
this.identifier = identifier;
3838
}
3939

40+
/**
41+
* Get the table identifier.
42+
*
43+
* @return ServerTableIdentifier
44+
*/
45+
public ServerTableIdentifier getIdentifier() {
46+
return identifier;
47+
}
48+
4049
protected void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) {
4150
MetricKey key =
4251
registry.register(

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.amoro.Action;
2222
import org.apache.amoro.AmoroTable;
2323
import org.apache.amoro.SupportsProcessPlugins;
24+
import org.apache.amoro.TableFormat;
2425
import org.apache.amoro.TableRuntime;
2526
import org.apache.amoro.api.BlockableOperation;
2627
import org.apache.amoro.config.OptimizingConfig;
@@ -97,7 +98,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime
9798

9899
private final Map<Action, TableProcessContainer> processContainerMap = Maps.newConcurrentMap();
99100
private final TableOptimizingMetrics optimizingMetrics;
100-
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
101+
private final TableMaintainerMetricsImpl maintainerMetrics;
101102
private final TableSummaryMetrics tableSummaryMetrics;
102103
private volatile long lastPlanTime;
103104
private volatile OptimizingProcess optimizingProcess;
@@ -107,8 +108,7 @@ public DefaultTableRuntime(TableRuntimeStore store) {
107108
super(store);
108109
this.optimizingMetrics =
109110
new TableOptimizingMetrics(store.getTableIdentifier(), store.getGroupName());
110-
this.orphanFilesCleaningMetrics =
111-
new TableOrphanFilesCleaningMetrics(store.getTableIdentifier());
111+
this.maintainerMetrics = new TableMaintainerMetricsImpl(store.getTableIdentifier());
112112
this.tableSummaryMetrics = new TableSummaryMetrics(store.getTableIdentifier());
113113
}
114114

@@ -124,7 +124,7 @@ public void recover(OptimizingProcess optimizingProcess) {
124124
public void registerMetric(MetricRegistry metricRegistry) {
125125
// TODO: extract method to interface.
126126
this.optimizingMetrics.register(metricRegistry);
127-
this.orphanFilesCleaningMetrics.register(metricRegistry);
127+
this.maintainerMetrics.registerMetrics(metricRegistry);
128128
this.tableSummaryMetrics.register(metricRegistry);
129129
}
130130

@@ -161,8 +161,13 @@ public List<TableProcessStore> getProcessStates(Action action) {
161161
return processContainerMap.get(action).getProcessStates();
162162
}
163163

164-
public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() {
165-
return orphanFilesCleaningMetrics;
164+
/**
165+
* Get the maintainer metrics implementation.
166+
*
167+
* @return TableMaintainerMetricsImpl instance
168+
*/
169+
public TableMaintainerMetricsImpl getMaintainerMetrics() {
170+
return maintainerMetrics;
166171
}
167172

168173
public long getCurrentSnapshotId() {
@@ -472,7 +477,7 @@ public void beginCommitting() {
472477
@Override
473478
public void unregisterMetric() {
474479
tableSummaryMetrics.unregister();
475-
orphanFilesCleaningMetrics.unregister();
480+
maintainerMetrics.unregister();
476481
optimizingMetrics.unregister();
477482
}
478483

0 commit comments

Comments
 (0)