Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,79 @@
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
</differences>
<!-- Ignore addition of messagingBackend() method to PublisherSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to AdminClientSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>int kafkaDefaultPartitions()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>short kafkaDefaultReplicationFactor()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to AdminClientSettings.Builder -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultPartitions(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultReplicationFactor(short)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaProperties(java.util.Map)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
</difference>
<!-- Ignore addition of new methods to SubscriberSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to SubscriberSettings.Builder -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setKafkaProperties(java.util.Map)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
</difference>
</differences>
60 changes: 53 additions & 7 deletions google-cloud-pubsublite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>1.15.19</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<version>1.15.15-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.15.19</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<version>1.15.15-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<packaging>jar</packaging>
<name>Google Cloud Pub/Sub Lite</name>
<url>https://github.com/googleapis/java-pubsublite</url>
Expand Down Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.143.1</version>
<version>1.141.3</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
Expand All @@ -48,12 +48,12 @@
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
<version>1.15.19</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<version>1.15.15-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-pubsublite-v1</artifactId>
<version>1.15.19</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
<version>1.15.15-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
Expand Down Expand Up @@ -119,6 +119,48 @@
<version>0.8</version>
</dependency>

<!-- Apache Kafka Client (optional, for Managed Kafka backend) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
<optional>true</optional>
</dependency>

<!-- Google Managed Kafka authentication handler -->
<dependency>
<groupId>com.google.cloud.hosted.kafka</groupId>
<artifactId>managed-kafka-auth-login-handler</artifactId>
<version>1.0.6</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test dependencies for Kafka integration tests -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>com.google.truth</groupId>
Expand Down Expand Up @@ -164,8 +206,12 @@
<ignoredUnusedDeclaredDependency>org.hamcrest:hamcrest</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>com.google.flogger:flogger-system-backend</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
<ignoredUsedUndeclaredDependencies>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.testcontainers:testcontainers</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -211,4 +257,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.cloudpubsub.MessagingBackend;
import com.google.cloud.pubsublite.internal.AdminClientImpl;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.KafkaAdminClient;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import java.util.Map;
import java.util.Optional;

/** Settings for construction a Pub/Sub Lite AdminClient. */
Expand All @@ -44,9 +47,24 @@ public abstract class AdminClientSettings {
/** A stub to use to connect. */
abstract Optional<AdminServiceClient> serviceClient();

/** The backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
public abstract MessagingBackend messagingBackend();

/** Kafka-specific properties for when using MANAGED_KAFKA backend. */
public abstract Optional<Map<String, Object>> kafkaProperties();

/** Default number of partitions for new Kafka topics. */
public abstract int kafkaDefaultPartitions();

/** Default replication factor for new Kafka topics. */
public abstract short kafkaDefaultReplicationFactor();

public static Builder newBuilder() {
return new AutoValue_AdminClientSettings.Builder()
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS);
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS)
.setMessagingBackend(MessagingBackend.PUBSUB_LITE)
.setKafkaDefaultPartitions(3)
.setKafkaDefaultReplicationFactor((short) 1);
}

@AutoValue.Builder
Expand All @@ -64,11 +82,37 @@ public abstract static class Builder {
/** A service client to use to connect. */
public abstract Builder setServiceClient(AdminServiceClient serviceClient);

/** Set the backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
public abstract Builder setMessagingBackend(MessagingBackend backend);

/** Set Kafka-specific properties for when using MANAGED_KAFKA backend. */
public abstract Builder setKafkaProperties(Map<String, Object> kafkaProperties);

/** Set default number of partitions for new Kafka topics. */
public abstract Builder setKafkaDefaultPartitions(int partitions);

/** Set default replication factor for new Kafka topics. */
public abstract Builder setKafkaDefaultReplicationFactor(short replicationFactor);

/** Build the settings object. */
public abstract AdminClientSettings build();
}

AdminClient instantiate() throws ApiException {
// For Kafka backend, use KafkaAdminClient
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
if (!kafkaProperties().isPresent()) {
throw new IllegalStateException(
"kafkaProperties must be set when using MANAGED_KAFKA backend");
}
return new KafkaAdminClient(
region(),
kafkaProperties().get(),
kafkaDefaultPartitions(),
kafkaDefaultReplicationFactor());
}

// For Pub/Sub Lite backend, use AdminClientImpl
AdminServiceClient serviceClient;
if (serviceClient().isPresent()) {
serviceClient = serviceClient().get();
Expand Down
Loading
Loading