Skip to content

Commit f7dea58

Browse files
magnel4mby
authored andcommitted
feat: add new super stream consumer handle
1 parent 6263d1a commit f7dea58

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

src/super_stream_consumer.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { Client } from "./client"
2-
import { Consumer, ConsumerFunc } from "./consumer"
2+
import { Consumer } from "./consumer"
33
import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy"
4+
import { Message } from "./publisher"
45
import { Offset } from "./requests/subscribe_request"
56

7+
type SuperStreamConsumerFunc = (msg: Message, consumer?: Consumer) => Promise<void> | void
8+
69
export class SuperStreamConsumer {
710
private consumers: Map<string, Consumer> = new Map<string, Consumer>()
811
public consumerRef: string
@@ -13,7 +16,7 @@ export class SuperStreamConsumer {
1316
private creditPolicy: ConsumerCreditPolicy
1417

1518
private constructor(
16-
readonly handle: ConsumerFunc,
19+
readonly handle: SuperStreamConsumerFunc,
1720
params: {
1821
superStream: string
1922
locator: Client
@@ -42,7 +45,7 @@ export class SuperStreamConsumer {
4245
singleActive: true,
4346
creditPolicy: this.creditPolicy,
4447
},
45-
this.handle,
48+
(msg) => this.handle(msg, this.consumers.get(p)),
4649
this
4750
)
4851
this.consumers.set(p, partitionConsumer)
@@ -52,7 +55,7 @@ export class SuperStreamConsumer {
5255
}
5356

5457
static async create(
55-
handle: ConsumerFunc,
58+
handle: SuperStreamConsumerFunc,
5659
params: {
5760
superStream: string
5861
locator: Client

0 commit comments

Comments
 (0)