Migrate Data Catalog to Dataplex Knowledge Catalog for CDC templates#3927
Migrate Data Catalog to Dataplex Knowledge Catalog for CDC templates#3927stankiewicz wants to merge 22 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical failure in the CDC pipeline caused by the deprecation of the legacy Google Cloud Data Catalog API. By migrating the schema publishing and retrieval logic to the Dataplex Knowledge Catalog API, the pipeline maintains compatibility with current Google Cloud infrastructure standards. The changes ensure that metadata, including system types and labels, is correctly preserved and mapped to the new Dataplex entry structures. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request migrates the CDC parent and connector modules from Google Cloud Data Catalog to Google Cloud Dataplex Catalog, updating dependencies, client initializations, and schema mapping utilities to use Dataplex's aspect-based schema model. The review feedback identifies several critical issues, including resource leaks from unclosed CatalogServiceClient instances, an N+1 query performance bottleneck when retrieving entry schemas, improper handling of InterruptedException, and potential NullPointerException risks when parsing protobuf Struct fields without defensive checks.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request migrates the CDC common library from Google Cloud Data Catalog to Google Cloud Dataplex Catalog, updating dependencies, clients, and schema mapping utilities to use Dataplex aspects. Feedback on these changes suggests propagating exceptions during asynchronous entry group creation to avoid invalid states, filtering search results in lookupPubSubEntry to verify they belong to the @pubsub entry group, and normalizing data types to lowercase in SchemaUtils to ensure robust, case-insensitive type mapping.
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("Interrupted while creating EntryGroup", e); | ||
| } catch (ExecutionException e) { | ||
| if (e.getCause() instanceof AlreadyExistsException) { | ||
| this.entryGroupCreated = true; | ||
| } else { | ||
| LOG.error("Failed to create EntryGroup", e); | ||
| } | ||
| } |
There was a problem hiding this comment.
If createEntryGroupAsync fails with an exception other than AlreadyExistsException (or if the thread is interrupted), the error is logged but the exception is swallowed. This allows the constructor of SingleTopicSchemaManager to complete successfully in an invalid state, which will cause subsequent schema updates to fail.
To ensure fail-fast behavior and make debugging easier, we should propagate these failures by throwing a RuntimeException.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while creating EntryGroup", e);
throw new RuntimeException("Interrupted while creating EntryGroup", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof AlreadyExistsException) {
this.entryGroupCreated = true;
} else {
LOG.error("Failed to create EntryGroup", e);
throw new RuntimeException("Failed to create EntryGroup", e);
}
}References
- When throwing exceptions in environments where the caller or global exception handler might not log them (such as certain Dataflow templates), log the error message before throwing the exception to ensure the failure is recorded.
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3927 +/- ##
============================================
- Coverage 55.59% 55.45% -0.15%
+ Complexity 7052 6601 -451
============================================
Files 1103 1103
Lines 67681 67866 +185
Branches 7603 7627 +24
============================================
+ Hits 37625 37632 +7
- Misses 27636 27808 +172
- Partials 2420 2426 +6
🚀 New features to boost your workflow:
|
|
Fixes: #3921 |
This PR resolves the INVALID_ARGUMENT write operation failure (Project is not allowed to perform write operations due to Data Catalog deprecation) caused by the deprecation of the legacy Google Cloud Data Catalog API.
It migrates the Debezium-to-PubSub CDC pipeline's schema publishing and schema retrieval logic to use the new Dataplex Knowledge Catalog API (com.google.cloud.dataplex.v1.CatalogServiceClient).
Key Changes