diff --git a/docs/stream-filtering.md b/docs/stream-filtering.md index 77cd959b1..c27a15a58 100644 --- a/docs/stream-filtering.md +++ b/docs/stream-filtering.md @@ -188,6 +188,16 @@ Msg = amqp10_msg:set_message_annotations( amqp10_client:send_msg(Sender, Msg), ``` + + +```javascript +const message = createAmqpMessage({ + body: "Hello World!", + annotations: { "x-stream-filter-value": "invoices" }, // set Bloom filter value + }) +await publisher.publish(message) +``` + A receiver must use a filter with descriptor `rabbitmq:stream-filter`. @@ -317,6 +327,30 @@ after 5000 -> exit(missing_msg) end, ``` + + +```javascript +const consumer = await connection.createConsumer({ + stream: { + name: "some-stream", + offset: Offset.first(), + matchUnfiltered: true, + filterValues: ["invoices", "orders"], // This Bloom filter will be evaluated server-side per chunk (Stage 1). + }, + messageHandler: (context, message) => { + // This filter will be evaluated client-side per message (Stage 3). + if ( + message.message_annotations && + ["invoices", "orders"].includes(message.message_annotations["x-stream-filter-value"]) + ) { + // message processing + } + context.accept() + }, + }) +consumer.start() +``` + @@ -542,6 +576,28 @@ Filter = #{<<"filter-name-1">> => ``` + +```javascript +const consumer = await connection.createConsumer({ + stream: { + name: "my-queue", + offset: Offset.first(), + messagePropertiesFilter: { + subject: "&p:Order", + user_id: "John" + }, + applicationPropertiesFilter: { + region: "emea", + }, + }, + messageHandler: (context, message) => { + // process the messages + }, + }) +consumer.start() +``` + + ### SQL Filter Expressions @@ -838,6 +894,23 @@ Filter = #{<<"sql-filter">> => #filter{descriptor = <<"amqp:sql-filter">>, ``` + +```javascript +const consumer = await connection.createConsumer({ + stream: { + name: "my-queue", + offset: Offset.first(), + sqlFilter: "properties.user_id = 'John' AND" + + "properties.subject LIKE 'Order%' AND region = 'emea'" + }, + messageHandler: (context, message) => { + // process the messages + }, + }) +consumer.start() +``` + + ### Error Handling @@ -1013,6 +1086,27 @@ Filter = #{%% This Bloom filter will be evaluated server-side per chunk at stage ``` + +```javascript +const consumer = await connection.createConsumer({ + stream: { + name: "my-queue", + offset: Offset.first(), + filterValues: ["order.created"], // This Bloom filter will be evaluated server-side per chunk (Stage 1). + sqlFilter: "p.subject = 'order.created' AND " + + "p.creation_time > UTC() - 3600000 AND " + + "region IN ('AMER', 'EMEA', 'APJ') AND " + + "(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", // This complex SQL filter expression will be evaluted server-side + // per message at stage 2. + }, + messageHandler: (context, message) => { + // message processing + }, + }) +consumer.start() +``` + + If `order.created` events represent only a small percentage of all events, RabbitMQ can filter the stream efficiently because only a small fraction of messages need to be parsed and evaluated in memory.