[AMORO-4048] Saving cleanup opeartion process info in table_process#4077
[AMORO-4048] Saving cleanup opeartion process info in table_process#4077zhangwl9 wants to merge 3 commits intoapache:masterfrom
Conversation
84c1f56 to
0c8f0d7
Compare
f6788dc to
5779284
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4077 +/- ##
============================================
+ Coverage 22.39% 28.84% +6.44%
- Complexity 2552 3958 +1406
============================================
Files 458 656 +198
Lines 42116 52462 +10346
Branches 5917 6644 +727
============================================
+ Hits 9433 15133 +5700
- Misses 31871 36229 +4358
- Partials 812 1100 +288
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR addresses [AMORO-4048] by persisting cleanup executor run information into the table_process table, similar to how optimizing executions are tracked.
Changes:
- Add persistence logic in
PeriodicTableSchedulerto insert/updatetable_processrecords for cleanup operations (RUNNING → SUCCESS/FAILED). - Update inline scheduler tests to validate cleanup process persistence and expose testing hooks.
- Add a new test covering cleanup process status transitions and stored failure messages.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java |
Adds creation and completion persistence for cleanup processes in table_process. |
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java |
Exposes test-only wrappers to call the new persistence methods. |
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java |
Adds a persistence-focused test validating cleanup process lifecycle in table_process. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(); |
There was a problem hiding this comment.
SnowflakeIdGenerator is instantiated per PeriodicTableScheduler instance. Because the generator’s uniqueness relies on per-instance sequence state (and defaults to machineId=0), multiple schedulers generating IDs within the same time slice can produce duplicate processIds and violate the table_process.process_id PK. Use a shared/static generator (or a centralized ID service) so all schedulers share sequence state, or pass distinct machine IDs per instance.
| private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(); | |
| private static final SnowflakeIdGenerator ID_GENERATOR = new SnowflakeIdGenerator(); |
| } catch (Exception e) { | ||
| logger.error("exception when schedule for table: {}", tableRuntime.getTableIdentifier(), e); | ||
| } catch (Throwable t) { | ||
| executionError = t; |
There was a problem hiding this comment.
The catch (Throwable t) block records the error for persistence but never logs it. This will silently swallow failures when execute() or the persistence calls throw, making cleanup issues hard to diagnose in production. Please add an error log (including table identifier and cleanup operation) and consider rethrowing Errors if you don’t want to mask fatal JVM problems.
| executionError = t; | |
| executionError = t; | |
| logger.error( | |
| "Failed to execute cleanup operation {} for table {}", | |
| cleanupOperation, | |
| tableRuntime.getTableIdentifier(), | |
| t); | |
| if (t instanceof Error) { | |
| throw (Error) t; | |
| } |
| if (executionError != null) { | ||
| cleanProcessMeta.setStatus(ProcessStatus.FAILED); | ||
| String message = executionError.getMessage(); | ||
| if (message == null) { | ||
| message = executionError.getClass().getName(); | ||
| } | ||
|
|
||
| logger.debug( | ||
| "Update lastCleanTime for table {} with cleanup operation {}", | ||
| tableRuntime.getTableIdentifier().getTableName(), | ||
| cleanupOperation); | ||
| } catch (Exception e) { | ||
| logger.error( | ||
| "Failed to update lastCleanTime for table {}", | ||
| tableRuntime.getTableIdentifier().getTableName(), | ||
| e); | ||
| cleanProcessMeta.setFailMessage(message); | ||
| } else { |
There was a problem hiding this comment.
fail_message in table_process is constrained (e.g., Postgres has length(fail_message) <= 4096). Persisting Throwable.getMessage() without truncation can violate this constraint (and also loses stack trace detail). Consider using the existing org.apache.amoro.utils.ExceptionUtil.getErrorMessage(t, 4000) pattern used by optimizing to both capture stack traces and bound the stored length.
| @VisibleForTesting | ||
| public TableProcessMeta createCleanupProcessInfo( | ||
| TableRuntime tableRuntime, CleanupOperation cleanupOperation) { | ||
|
|
||
| if (shouldSkipOperation(tableRuntime, cleanupOperation)) { | ||
| return null; | ||
| } | ||
|
|
||
| TableProcessMeta cleanProcessMeta = buildProcessMeta(tableRuntime, cleanupOperation); | ||
| persistencyHelper.beginAndPersistCleanupProcess(cleanProcessMeta); | ||
|
|
||
| logger.debug( | ||
| "Successfully persist cleanup process [processId={}, tableId={}, processType={}]", | ||
| cleanProcessMeta.getProcessId(), | ||
| cleanProcessMeta.getTableId(), | ||
| cleanProcessMeta.getProcessType()); | ||
|
|
||
| return cleanProcessMeta; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public void persistCleanupResult( | ||
| TableRuntime tableRuntime, | ||
| CleanupOperation cleanupOperation, | ||
| TableProcessMeta cleanProcessMeta, | ||
| Throwable executionError) { | ||
|
|
There was a problem hiding this comment.
createCleanupProcessInfo / persistCleanupResult are marked @VisibleForTesting but exposed as public methods on a production scheduler base class. This widens the API surface unnecessarily; making them protected (or package-private) would still allow test subclasses to access them while reducing accidental external use.
| private static class PersistencyHelper extends PersistentBase { | ||
|
|
||
| public PersistencyHelper() {} | ||
|
|
||
| private void beginAndPersistCleanupProcess(TableProcessMeta meta) { |
There was a problem hiding this comment.
The inner helper is named PersistencyHelper, while the surrounding package/classes use persistence terminology (e.g., PersistentBase, server.persistence). Renaming to PersistenceHelper would improve consistency and avoid confusion when searching for persistence-related utilities.
| executor.persistCleanupResultForTest(tableRuntime, operation, processMeta.copy(), null); | ||
| TableProcessMeta persistedSuccess = getProcessMeta(processMeta.getProcessId()); | ||
| Assert.assertEquals(ProcessStatus.SUCCESS, persistedSuccess.getStatus()); | ||
| Assert.assertTrue(persistedSuccess.getCreateTime() < persistedSuccess.getFinishTime()); |
There was a problem hiding this comment.
This assertion can be flaky: createTime and finishTime are both derived from System.currentTimeMillis() and may be equal when the persistence call happens within the same millisecond (or due to DB timestamp precision). Consider asserting <= instead, or introducing a small deterministic delay to guarantee finishTime is after createTime.
| Assert.assertTrue(persistedSuccess.getCreateTime() < persistedSuccess.getFinishTime()); | |
| Assert.assertTrue(persistedSuccess.getCreateTime() <= persistedSuccess.getFinishTime()); |
Why are the changes needed?
Close #4048.
Brief change log
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before making a pull request
Documentation