11package com .devshawn .kafka .gitops ;
22
3- import ch .qos .logback .classic .Level ;
4- import ch .qos .logback .classic .Logger ;
3+ import java .util .ArrayList ;
4+ import java .util .List ;
5+ import java .util .Map ;
6+ import java .util .NoSuchElementException ;
7+ import java .util .Optional ;
8+ import java .util .concurrent .atomic .AtomicInteger ;
9+ import java .util .concurrent .atomic .AtomicReference ;
10+ import org .slf4j .LoggerFactory ;
511import com .devshawn .kafka .gitops .config .KafkaGitopsConfigLoader ;
612import com .devshawn .kafka .gitops .config .ManagerConfig ;
13+ import com .devshawn .kafka .gitops .config .SchemaRegistryConfigLoader ;
714import com .devshawn .kafka .gitops .domain .confluent .ServiceAccount ;
815import com .devshawn .kafka .gitops .domain .options .GetAclOptions ;
916import com .devshawn .kafka .gitops .domain .plan .DesiredPlan ;
1017import com .devshawn .kafka .gitops .domain .state .AclDetails ;
1118import com .devshawn .kafka .gitops .domain .state .CustomAclDetails ;
1219import com .devshawn .kafka .gitops .domain .state .DesiredState ;
1320import com .devshawn .kafka .gitops .domain .state .DesiredStateFile ;
21+ import com .devshawn .kafka .gitops .domain .state .SchemaDetails ;
1422import com .devshawn .kafka .gitops .domain .state .TopicDetails ;
1523import com .devshawn .kafka .gitops .domain .state .service .KafkaStreamsService ;
24+ import com .devshawn .kafka .gitops .enums .SchemaCompatibility ;
1625import com .devshawn .kafka .gitops .exception .ConfluentCloudException ;
1726import com .devshawn .kafka .gitops .exception .InvalidAclDefinitionException ;
1827import com .devshawn .kafka .gitops .exception .MissingConfigurationException ;
2433import com .devshawn .kafka .gitops .service .KafkaService ;
2534import com .devshawn .kafka .gitops .service .ParserService ;
2635import com .devshawn .kafka .gitops .service .RoleService ;
36+ import com .devshawn .kafka .gitops .service .SchemaRegistryService ;
2737import com .devshawn .kafka .gitops .util .LogUtil ;
2838import com .devshawn .kafka .gitops .util .StateUtil ;
2939import com .fasterxml .jackson .core .JsonParser ;
40+ import com .fasterxml .jackson .core .util .DefaultIndenter ;
41+ import com .fasterxml .jackson .core .util .DefaultPrettyPrinter ;
3042import com .fasterxml .jackson .databind .DeserializationFeature ;
3143import com .fasterxml .jackson .databind .ObjectMapper ;
44+ import com .fasterxml .jackson .databind .SerializationFeature ;
3245import com .fasterxml .jackson .datatype .jdk8 .Jdk8Module ;
33- import org .slf4j .LoggerFactory ;
34-
35- import java .util .ArrayList ;
36- import java .util .List ;
37- import java .util .Map ;
38- import java .util .NoSuchElementException ;
39- import java .util .Optional ;
40- import java .util .concurrent .atomic .AtomicInteger ;
41- import java .util .concurrent .atomic .AtomicReference ;
46+ import ch .qos .logback .classic .Level ;
47+ import ch .qos .logback .classic .Logger ;
4248
4349public class StateManager {
4450
45- private static org .slf4j .Logger log = LoggerFactory .getLogger (StateManager .class );
46-
4751 private final ManagerConfig managerConfig ;
4852 private final ObjectMapper objectMapper ;
4953 private final ParserService parserService ;
5054 private final KafkaService kafkaService ;
55+ private final SchemaRegistryService schemaRegistryService ;
5156 private final RoleService roleService ;
5257 private final ConfluentCloudService confluentCloudService ;
5358
@@ -163,7 +168,7 @@ private DesiredState getDesiredState() {
163168 }
164169
165170 private void generateTopicsState (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
166- Optional <Integer > defaultReplication = StateUtil .fetchReplication (desiredStateFile );
171+ Optional <Integer > defaultReplication = StateUtil .fetchDefaultTopicsReplication (desiredStateFile );
167172 if (defaultReplication .isPresent ()) {
168173 desiredStateFile .getTopics ().forEach ((name , details ) -> {
169174 Integer replication = details .getReplication ().isPresent () ? details .getReplication ().get () : defaultReplication .get ();
@@ -175,7 +180,15 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
175180 }
176181
177182 private void generateSchemasState (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
178- desiredState .putAllSchemas (desiredStateFile .getSchemas ());
183+ Optional <SchemaCompatibility > defaultSchemaCompatibility = StateUtil .fetchDefaultSchemasCompatibility (desiredStateFile );
184+ if (defaultSchemaCompatibility .isPresent ()) {
185+ desiredStateFile .getSchemas ().forEach ((s , details ) -> {
186+ SchemaCompatibility compatibility = details .getCompatibility ().isPresent () ? details .getCompatibility ().get () : defaultSchemaCompatibility .get ();
187+ desiredState .putSchemas (s , new SchemaDetails .Builder ().mergeFrom (details ).setCompatibility (compatibility ).build ());
188+ });
189+ } else {
190+ desiredState .putAllSchemas (desiredStateFile .getSchemas ());
191+ }
179192 }
180193
181194 private void generateConfluentCloudServiceAcls (DesiredState .Builder desiredState , DesiredStateFile desiredStateFile ) {
@@ -316,7 +329,7 @@ private void validateCustomAcls(DesiredStateFile desiredStateFile) {
316329 }
317330
318331 private void validateTopics (DesiredStateFile desiredStateFile ) {
319- Optional <Integer > defaultReplication = StateUtil .fetchReplication (desiredStateFile );
332+ Optional <Integer > defaultReplication = StateUtil .fetchDefaultTopicsReplication (desiredStateFile );
320333 if (!defaultReplication .isPresent ()) {
321334 desiredStateFile .getTopics ().forEach ((name , details ) -> {
322335 if (!details .getReplication ().isPresent ()) {
@@ -331,42 +344,13 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
331344 }
332345
333346 private void validateSchemas (DesiredStateFile desiredStateFile ) {
334- if (!desiredStateFile .getSchemas ().isEmpty ()) {
335- SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader .load ();
336- desiredStateFile .getSchemas ().forEach ((s , schemaDetails ) -> {
337- if (!schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
338- throw new ValidationException (String .format ("Schema type %s is currently not supported." , schemaDetails .getType ()));
339- }
340- if (!Files .exists (Paths .get (schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" ) + "/" + schemaDetails .getFile ()))) {
341- throw new ValidationException (String .format ("Schema file %s not found in schema directory at %s" , schemaDetails .getFile (), schemaRegistryConfig .getConfig ().get ("SCHEMA_DIRECTORY" )));
342- }
343- if (schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
344- AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider ();
345- if (schemaDetails .getReferences ().isEmpty () && schemaDetails .getType ().equalsIgnoreCase ("Avro" )) {
346- Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), Collections .emptyList ());
347- if (!parsedSchema .isPresent ()) {
348- throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
349- }
350- } else {
351- List <SchemaReference > schemaReferences = new ArrayList <>();
352- schemaDetails .getReferences ().forEach (referenceDetails -> {
353- SchemaReference schemaReference = new SchemaReference (referenceDetails .getName (), referenceDetails .getSubject (), referenceDetails .getVersion ());
354- schemaReferences .add (schemaReference );
355- });
356- // we need to pass a schema registry client as a config because the underlying code validates against the current state
357- avroSchemaProvider .configure (Collections .singletonMap (SchemaProvider .SCHEMA_VERSION_FETCHER_CONFIG , schemaRegistryService .createSchemaRegistryClient ()));
358- try {
359- Optional <ParsedSchema > parsedSchema = avroSchemaProvider .parseSchema (schemaRegistryService .loadSchemaFromDisk (schemaDetails .getFile ()), schemaReferences );
360- if (!parsedSchema .isPresent ()) {
361- throw new ValidationException (String .format ("Avro schema %s could not be parsed." , schemaDetails .getFile ()));
362- }
363- } catch (IllegalStateException ex ) {
364- throw new ValidationException (String .format ("Reference validation error: %s" , ex .getMessage ()));
365- } catch (RuntimeException ex ) {
366- throw new ValidationException (String .format ("Error thrown when attempting to validate schema with reference" , ex .getMessage ()));
367- }
368- }
347+ Optional <SchemaCompatibility > defaultSchemaCompatibility = StateUtil .fetchDefaultSchemasCompatibility (desiredStateFile );
348+ if (!defaultSchemaCompatibility .isPresent ()) {
349+ desiredStateFile .getSchemas ().forEach ((subject , details ) -> {
350+ if (!details .getCompatibility ().isPresent ()) {
351+ throw new ValidationException (String .format ("Not set: [compatibility] in state file definition: schema -> %s" , subject ));
369352 }
353+ schemaRegistryService .validateSchema (subject , details );
370354 });
371355 }
372356 }
@@ -379,11 +363,17 @@ private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
379363 }
380364
381365 private ObjectMapper initializeObjectMapper () {
382- ObjectMapper objectMapper = new ObjectMapper ();
383- objectMapper .enable (JsonParser .Feature .ALLOW_UNQUOTED_FIELD_NAMES );
384- objectMapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
385- objectMapper .registerModule (new Jdk8Module ());
386- return objectMapper ;
366+ ObjectMapper gitopsObjectMapper = new ObjectMapper ();
367+ gitopsObjectMapper .enable (SerializationFeature .INDENT_OUTPUT );
368+ gitopsObjectMapper .enable (JsonParser .Feature .ALLOW_UNQUOTED_FIELD_NAMES );
369+ gitopsObjectMapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
370+ gitopsObjectMapper .registerModule (new Jdk8Module ());
371+ DefaultIndenter defaultIndenter = new DefaultIndenter (" " , DefaultIndenter .SYS_LF );
372+ DefaultPrettyPrinter printer = new DefaultPrettyPrinter ()
373+ .withObjectIndenter (defaultIndenter )
374+ .withArrayIndenter (defaultIndenter );
375+ gitopsObjectMapper .setDefaultPrettyPrinter (printer );
376+ return gitopsObjectMapper ;
387377 }
388378
389379 private void initializeLogger (boolean verbose ) {
0 commit comments