-
Notifications
You must be signed in to change notification settings - Fork 52
CASSSIDECAR-243: Implementation of CDCPublisher #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jyothsnakonisa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, leaving some comments.
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| void testConfigChanged() throws Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove this test? I think this is valuable to have a test for checking callback and config updating when config is changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This became a periodic task, and the callback was moved to a vertx signal instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think the test is sill valid right? may be we should have the test to check if the vertx signal is made when config is changed
|
Thanks @bbotella for addressing my comments. There is this comment #294 (comment) which is not resolved. We need to look into that codepath as it is critical for leader election and mutation publishing. Also, can you please check if there is a way to add an end to end test that checks if the mutations are published(in-memory) might not need to have kafka for the test. Other than those two everything else looks good to me. |
jyothsnakonisa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Bernardo for the patch!
73efd40 to
b594d40
Compare
| */ | ||
| public BigInteger decodeFromWire(int pos, Buffer buf) | ||
| { | ||
| return decodeFromWire(new MutableInt(pos), buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the MutableInt is only required if we need to track the change in position in a wrapper serializer, this would be marginally faster as:
| return decodeFromWire(new MutableInt(pos), buf); | |
| return new BigInteger(CommonCodecs.BYTE_ARRAY.decodeFromWire(pos, buf)); |
| private volatile Map<String, String> kafkaConfigMappings = Map.of(); | ||
| private volatile Map<String, String> cdcConfigMappings = Map.of(); | ||
|
|
||
| private Map<String, String> kafkaConfigMappings = Map.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look like they should be volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There not accessed with synchronized so better to make them volatile
| } | ||
| }); | ||
| })) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result of .collect(Collectors.toList()); is ignored/not used.
| } | ||
|
|
||
| // NEW: Deduplicate by (instanceId, tokenRange) to prevent duplicate consumers | ||
| Map<String, SidecarCdc> uniqueConsumers = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Map<String, SidecarCdc> uniqueConsumers = new HashMap<>(); | |
| Map<String, SidecarCdc> uniqueConsumers = new HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum()); |
| return -1; | ||
| } | ||
|
|
||
| private boolean resolveToSameAddress(String host1, String host2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this only expects ip addresses to be input, and so will not perform DNS resolution. Can we make that explicit? We can also make the method static and add some unit tests here.
| { | ||
| for (InstanceMetadata instance : instanceFetcher.allLocalInstances()) | ||
| { | ||
| String configuredHost = instance.ipAddress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an ip address, no?
| String configuredHost = instance.ipAddress(); | |
| String configuredIpAddress = instance.ipAddress(); |
| private final ICdcStats cdcStats; | ||
| private final SidecarConfiguration sidecarConfiguration; | ||
| private CdcManager cdcManager; | ||
| private Serializer<CdcEvent> avroSerializer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be final:
| private Serializer<CdcEvent> avroSerializer; | |
| private final Serializer<CdcEvent> avroSerializer; | |
| private final Provider<RangeManager> rangeManagerProvider; |
jberragan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits, +1 (nb)
87d98d5 to
363a547
Compare
yifan-c
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When CDC is disabled,
| loadPublisher(); | ||
| publishSchemas(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those 2 methods are called in every iteration. Looks like they are included in the for-loop by mistake.
| private void loadPublisher() | ||
| { | ||
| KafkaOptions kafkaOptions = () -> cdcConfig.kafkaConfigs(); | ||
| this.publisher = SchemaStorePublisherFactory.DEFAULT.buildPublisher(kafkaOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publisher is a Closeable object. We should close the previous object when assigning a new one.
| } | ||
| }); | ||
| })) | ||
| .collect(Collectors.toList());; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double ;
| ON_CDC_CONFIGURATION_CHANGED, | ||
| ON_CDC_CACHE_WARMED_UP, | ||
| ON_CDC_CONFIG_MAPPINGS_CHANGED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add javadoc
| KafkaConfigAccessor kafkaConfigAccessor, | ||
| CdcConfigAccessor cdcConfigAccessor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
| SidecarSchema sidecarSchema, | ||
| CqlToAvroSchemaConverter cqlToAvroSchemaConverter) | ||
| { | ||
| super(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super() is redundant, because it is just implementing the interface SchemaStore
| this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables())); | ||
| AvroSchemas.registerLogicalTypes(); | ||
| cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged); | ||
| this.vertx = vertx; | ||
| this.cdcConfig = cdcConfig; | ||
| this.sidecarCdcStats = sidecarCdcStats; | ||
|
|
||
| configureSidecarServerEventListeners(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When CDC is disabled, we should skip those all together. It does not make sense to register listeners, etc.
Ideally, we should not mount the CdcModule entirely, if the feature is not enabled. It is outside of the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a Jira
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Just to clarify that conditionally register the listeners are still wanted in this patch.
| vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(), this); | ||
| vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(), this); | ||
| vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(), this); | ||
| vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this); | ||
| vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this); | ||
| vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), new ConfigChangedHandler()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip those when CDC is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be part of the Jira mentioned in previous comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those should be behind the condition whether cdc is enabled.
| KafkaProducer<String, byte[]> producer = new KafkaProducer<>(conf.kafkaConfigs()); | ||
| KafkaPublisher kafkaPublisher = new KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both producer and kafkaPublisher are Closeable. I do not see them get closed anywhere. We are create a new eventConsumer on every publisher restart, meaning leaks.
CASSSIDECAR-242: Implementation of CachingSchemaStore House cleanup WIP commit Working CDC on Sidecar Latest changes Working CDC Working Working
5825499 to
8c25504
Compare
Patch by James Berragan, Jyothsna Konisa, Bernardo Botella; reviewed by for CASSSIDECAR-243