Skip to content

Commit 750fb37

Browse files
authored
chores: add examples to illustrate pubsub in Main.java class (#181)
1 parent 6c8b9dc commit 750fb37

File tree

3 files changed

+263
-0
lines changed

3 files changed

+263
-0
lines changed

examples/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,25 @@
3434
<groupId>com.salesforce.multicloudj</groupId>
3535
<artifactId>docstore-client</artifactId>
3636
</dependency>
37+
<dependency>
38+
<groupId>com.salesforce.multicloudj</groupId>
39+
<artifactId>pubsub-client</artifactId>
40+
</dependency>
3741
<dependency>
3842
<groupId>com.salesforce.multicloudj</groupId>
3943
<artifactId>docstore-gcp-firestore</artifactId>
4044
<version>${project.version}</version>
4145
</dependency>
46+
<dependency>
47+
<groupId>com.salesforce.multicloudj</groupId>
48+
<artifactId>pubsub-gcp</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
51+
<dependency>
52+
<groupId>com.salesforce.multicloudj</groupId>
53+
<artifactId>pubsub-aws</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
4256
<dependency>
4357
<groupId>com.salesforce.multicloudj</groupId>
4458
<artifactId>sts-gcp</artifactId>
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package com.salesforce.multicloudj.pubsub;
2+
3+
import com.salesforce.multicloudj.pubsub.client.GetAttributeResult;
4+
import com.salesforce.multicloudj.pubsub.client.SubscriptionClient;
5+
import com.salesforce.multicloudj.pubsub.client.TopicClient;
6+
import com.salesforce.multicloudj.pubsub.driver.AckID;
7+
import com.salesforce.multicloudj.pubsub.driver.Message;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.concurrent.CompletableFuture;
14+
15+
public class Main {
16+
17+
static String provider = "gcp";
18+
19+
public static void main(String[] args) {
20+
publishMessage();
21+
publishMessageWithMetadata();
22+
publishBatchMessages();
23+
receiveMessage();
24+
acknowledgeMessage();
25+
acknowledgeMessagesBatch();
26+
negativeAcknowledgeMessage();
27+
getSubscriptionAttributes();
28+
}
29+
30+
/**
31+
* Publishes a simple message to a topic.
32+
*/
33+
public static void publishMessage() {
34+
// Create a TopicClient instance based on the provider
35+
TopicClient topicClient = getTopicClient(provider);
36+
37+
// Create a message with body content
38+
Message message = Message.builder()
39+
.withBody("Hello from MultiCloudJ PubSub!")
40+
.build();
41+
42+
// Send the message to the topic
43+
topicClient.send(message);
44+
45+
// Log the success
46+
getLogger().info("Message published successfully");
47+
}
48+
49+
/**
50+
* Publishes a message with metadata to a topic.
51+
*/
52+
public static void publishMessageWithMetadata() {
53+
// Get the TopicClient instance
54+
TopicClient topicClient = getTopicClient(provider);
55+
56+
// Create a message with body and metadata
57+
Message message = Message.builder()
58+
.withBody("Message with metadata")
59+
.withMetadata("source", "demo")
60+
.withMetadata("timestamp", String.valueOf(System.currentTimeMillis()))
61+
.withMetadata("messageId", "msg-123")
62+
.build();
63+
64+
// Send the message
65+
topicClient.send(message);
66+
67+
getLogger().info("Message with metadata published successfully");
68+
}
69+
70+
/**
71+
* Publishes multiple messages for batch operations.
72+
*/
73+
public static void publishBatchMessages() {
74+
// Get the TopicClient instance
75+
TopicClient topicClient = getTopicClient(provider);
76+
77+
// Publish 5 messages for batch acknowledgment
78+
for (int i = 1; i <= 5; i++) {
79+
Message message = Message.builder()
80+
.withBody("Batch message #" + i)
81+
.withMetadata("batchId", "batch-1")
82+
.withMetadata("messageNumber", String.valueOf(i))
83+
.build();
84+
85+
topicClient.send(message);
86+
getLogger().info("Published batch message #{}", i);
87+
}
88+
89+
getLogger().info("Published 5 messages for batch acknowledgment");
90+
}
91+
92+
/**
93+
* Receives a message from a subscription.
94+
*/
95+
public static void receiveMessage() {
96+
// Get the SubscriptionClient instance
97+
SubscriptionClient subscriptionClient = getSubscriptionClient(provider);
98+
99+
// Receive a message from the subscription
100+
Message message = subscriptionClient.receive();
101+
102+
// Process the received message
103+
if (message != null) {
104+
String body = new String(message.getBody());
105+
getLogger().info("Received message: {}", body);
106+
107+
if (message.getMetadata() != null) {
108+
getLogger().info("Message metadata: {}", message.getMetadata());
109+
}
110+
111+
if (message.getAckID() != null) {
112+
getLogger().info("Message AckID: {}", message.getAckID());
113+
}
114+
}
115+
}
116+
117+
/**
118+
* Acknowledges a single message.
119+
*/
120+
public static void acknowledgeMessage() {
121+
// Get the SubscriptionClient instance
122+
SubscriptionClient subscriptionClient = getSubscriptionClient(provider);
123+
124+
// Receive a message
125+
Message message = subscriptionClient.receive();
126+
127+
if (message != null && message.getAckID() != null) {
128+
// Acknowledge the message
129+
subscriptionClient.sendAck(message.getAckID());
130+
getLogger().info("Message acknowledged successfully");
131+
}
132+
}
133+
134+
/**
135+
* Acknowledges multiple messages in a batch.
136+
*/
137+
public static void acknowledgeMessagesBatch() {
138+
// Get the SubscriptionClient instance
139+
SubscriptionClient subscriptionClient = getSubscriptionClient(provider);
140+
141+
// Receive multiple messages and collect their AckIDs
142+
List<AckID> ackIDs = new ArrayList<>();
143+
int messageCount = 0;
144+
int maxMessages = 5;
145+
146+
while (messageCount < maxMessages) {
147+
Message message = subscriptionClient.receive();
148+
if (message != null && message.getAckID() != null) {
149+
ackIDs.add(message.getAckID());
150+
messageCount++;
151+
}
152+
}
153+
154+
if (!ackIDs.isEmpty()) {
155+
// Acknowledge all messages in batch
156+
CompletableFuture<Void> ackFuture = subscriptionClient.sendAcks(ackIDs);
157+
158+
// Wait for batch acknowledgment to complete
159+
ackFuture.join();
160+
161+
getLogger().info("Acknowledged {} messages in batch", ackIDs.size());
162+
}
163+
}
164+
165+
/**
166+
* Negatively acknowledges a message (nack).
167+
*/
168+
public static void negativeAcknowledgeMessage() {
169+
// Get the SubscriptionClient instance
170+
SubscriptionClient subscriptionClient = getSubscriptionClient(provider);
171+
172+
// Check if nacking is supported
173+
if (!subscriptionClient.canNack()) {
174+
getLogger().info("Negative acknowledgment is not supported by this provider");
175+
return;
176+
}
177+
178+
// First, publish a message to nack
179+
TopicClient topicClient = getTopicClient(provider);
180+
Message messageToNack = Message.builder()
181+
.withBody("Message to be nacked and redelivered")
182+
.withMetadata("purpose", "nack-test")
183+
.build();
184+
topicClient.send(messageToNack);
185+
getLogger().info("Published message for nack test");
186+
187+
// Wait a bit for message to be available
188+
try {
189+
Thread.sleep(500);
190+
} catch (InterruptedException e) {
191+
Thread.currentThread().interrupt();
192+
}
193+
194+
// Receive the message
195+
Message message = subscriptionClient.receive();
196+
197+
if (message != null && message.getAckID() != null) {
198+
String messageBody = new String(message.getBody());
199+
getLogger().info("Received message: {}", messageBody);
200+
201+
// Negatively acknowledge the message
202+
subscriptionClient.sendNack(message.getAckID());
203+
getLogger().info("Message negatively acknowledged");
204+
}
205+
}
206+
207+
/**
208+
* Gets subscription attributes.
209+
*/
210+
public static void getSubscriptionAttributes() {
211+
// Get the SubscriptionClient instance
212+
SubscriptionClient subscriptionClient = getSubscriptionClient(provider);
213+
214+
// Get subscription attributes
215+
GetAttributeResult attributes = subscriptionClient.getAttributes();
216+
217+
getLogger().info("Subscription name: {}", attributes.getName());
218+
getLogger().info("Topic: {}", attributes.getTopic());
219+
}
220+
221+
private static TopicClient getTopicClient(String provider) {
222+
return TopicClient.builder(provider)
223+
.withTopicName("projects/substrate-sdk-gcp-poc1/topics/test-topic")
224+
// .withRegion("us-west-2")
225+
.build();
226+
}
227+
228+
private static SubscriptionClient getSubscriptionClient(String provider) {
229+
return SubscriptionClient.builder(provider)
230+
.withSubscriptionName("projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription")
231+
// .withRegion("us-west-2")
232+
.build();
233+
}
234+
235+
private static Logger getLogger() {
236+
return LoggerFactory.getLogger("Main");
237+
}
238+
}

pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,17 @@
175175
<version>${project.version}</version>
176176
<type>test-jar</type>
177177
</dependency>
178+
<dependency>
179+
<groupId>com.salesforce.multicloudj</groupId>
180+
<artifactId>pubsub-client</artifactId>
181+
<version>${project.version}</version>
182+
</dependency>
183+
<dependency>
184+
<groupId>com.salesforce.multicloudj</groupId>
185+
<artifactId>pubsub-client</artifactId>
186+
<version>${project.version}</version>
187+
<type>test-jar</type>
188+
</dependency>
178189
</dependencies>
179190
</dependencyManagement>
180191
<build>

0 commit comments

Comments
 (0)