Skip to content

Using the async iterator is significantly slower compared to using callbacks in a subscription #541

@DeltaEpsilon7787

Description

@DeltaEpsilon7787

Observed behavior

The way async iterators are implemented, they have an unacceptable drop in throughput compared to callback-based subscription. Checking the code of how they're implemented, they appear to be doing a lot of unnecessary actions for each response in __anext__.

Using code in Steps to Reproduce, on my PC I get 1.29 seconds for callbacks and 4.6 seconds for async iter, which translates to 310k msg/sec for callbacks and 87k msg/sec for built-in async its. This is a 3.5x drop in throughput.

Expected behavior

This should either be remedied or explicitly mentioned somewhere as this is relatively unexpected behavior. One would expect the async iterator to, well, be a thin wrapper around the pending queue.

In the code provided in Steps to Reproduce, I implement a very basic custom async iterator that uses callbacks to fill in an internal queue that the async it then gets results from. Given that callback impl and async it impl are mutually exclusive in current API, this does not break any API preconditions to my knowledge.
This implementation only takes 1.4 seconds which is only ~5% slower than just callbacks, although it is missing various boilerplate for stopping the iteration.

Server and client version

nats-server: v2.10.5
nats-py: v2.7.0

Host environment

No response

Steps to reproduce

from time import perf_counter

import asyncio as aio
import multiprocessing as mp

import nats

TEST_AMOUNT = 400000


class CbMessageConsumer:
    def __init__(self, target):
        self.counter = 0

        self._target = target
        self._completed = aio.Event()

    async def cb(self, msg):
        self.counter += 1
        if self.counter >= self._target:
            self._completed.set()

    async def completion_observer(self):
        await self._completed.wait()


class IterMessageConsumer:
    def __init__(self, target):
        self.counter = 0

        self._target = target
        self._completed = aio.Event()

    async def do_work(self, source):
        async for msg in source:
            self.counter += 1
            if self.counter >= self._target:
                self._completed.set()
                break

    async def completion_observer(self):
        await self._completed.wait()


# Custom `async for` using callbacks
class FasterAsyncIt:
    def __init__(self):
        self._queue = aio.Queue()

    async def _queue_cb(self, msg):
        self._queue.put_nowait(msg)

    def __aiter__(self):
        return self

    async def __anext__(self):
        # Ommitted checking for closed channel and such
        return await self._queue.get()


def cb_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        cb_message_consumer = CbMessageConsumer(TEST_AMOUNT)
        cb_observer_task = aio.create_task(cb_message_consumer.completion_observer())
        cb_observer_task.add_done_callback(
            lambda _: print(f"CB: {perf_counter() - start_time}")
        )

        start_time = perf_counter()
        await nc.subscribe("test.cb", cb=cb_message_consumer.cb)

        await cb_observer_task
        await nc.close()

    aio.run(_())


def iter_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        it_message_consumer = IterMessageConsumer(TEST_AMOUNT)
        it_observer_task = aio.create_task(it_message_consumer.completion_observer())
        it_observer_task.add_done_callback(
            lambda _: print(f"IT: {perf_counter() - start_time}")
        )

        it_based_sub = await nc.subscribe("test.iter")
        start_time = perf_counter()
        aio.create_task(it_message_consumer.do_work(it_based_sub.messages))

        await it_observer_task
        await nc.close()

    aio.run(_())

def custom_iter_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        custom_it = FasterAsyncIt()

        custom_it_message_consumer = IterMessageConsumer(TEST_AMOUNT)
        custom_it_observer_task = aio.create_task(
            custom_it_message_consumer.completion_observer()
        )
        custom_it_observer_task.add_done_callback(
            lambda _: print(f"CUSTOM IT: {perf_counter() - start_time}")
        )

        await nc.subscribe("test.custom_iter", cb=custom_it._queue_cb)
        start_time = perf_counter()
        aio.create_task(custom_it_message_consumer.do_work(custom_it))
        await custom_it_observer_task
        await nc.close()

    aio.run(_())


def sender_side():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        source = iter(range(TEST_AMOUNT))

        try:
            while True:
                for _ in range(30000):
                    next(source)
                    await nc.publish("test.cb", b"")
                    await nc.publish("test.iter", b"")
                    await nc.publish("test.custom_iter", b"")
                # Throttle to avoid slow consumers
                await aio.sleep(0.01)

        except StopIteration:
            pass

        await nc.close()

    aio.run(_())


if __name__ == "__main__":
    sender = mp.Process(target=sender_side)
    receiver_1 = mp.Process(target=cb_receiver)
    receiver_2 = mp.Process(target=iter_receiver)
    receiver_3 = mp.Process(target=custom_iter_receiver)

    receiver_1.start()
    receiver_2.start()
    receiver_3.start()
    sender.start()

    sender.join()
    receiver_1.join()
    receiver_2.join()
    receiver_3.join()

Metadata

Metadata

Assignees

No one assigned

    Labels

    defectSuspected defect such as a bug or regression

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions