Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Queue

An `asyncio.Queue` equivalence for asyncgui.
An `asyncio.Queue` equivalent for asyncgui.

```python
import asyncgui as ag
Expand Down
54 changes: 54 additions & 0 deletions examples/fix_quirk_in_tkinter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncgui as ag
from asyncgui_ext.queue import Queue


async def fn1(q, received):
await q.put('A')
await q.put('B')
item = await q.get()
received.append(item)
await q.put('C')
item = await q.get()
received.append(item)


async def fn2(q, received):
item = await q.get()
received.append(item)


def fix_quirk(q: Queue, after_method, *, delay=100):
is_triggered = False
real_transfer_items = q.transfer_items

def transfer_items():
nonlocal is_triggered
real_transfer_items()
is_triggered = False

def trigger_transfer_items():
nonlocal is_triggered
if is_triggered:
return
is_triggered = True
after_method(delay, transfer_items)

q.transfer_items = trigger_transfer_items


def main():
import tkinter as tk
root = tk.Tk()
root.geometry('320x240')

received = []
q = Queue(capacity=1, order='fifo')
fix_quirk(q, root.after)
task = ag.start(ag.wait_all(fn1(q, received), fn2(q, received)))
root.mainloop()
task.cancel()
print(received)


if __name__ == "__main__":
main()
16 changes: 16 additions & 0 deletions sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,19 @@
# https://sphinx-tabs.readthedocs.io/en/latest/
sphinx_tabs_disable_tab_closing = True


def modify_signature(app, what: str, name: str, obj, options, signature, return_annotation: str,
prefix="asyncgui_ext.queue.",
len_prefix=len("asyncgui_ext.queue."),
):
if not name.startswith(prefix):
return (signature, return_annotation, )
name = name[len_prefix:]
if name == "QueueState":
print(f"Hide the signature of {name!r}")
return ('', None)
return (signature, return_annotation, )


def setup(app):
app.connect('autodoc-process-signature', modify_signature)
61 changes: 17 additions & 44 deletions sphinx/index.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Queue
=====

An :class:`asyncio.Queue` equivalence for asyncgui.
An :class:`asyncio.Queue` equivalent for asyncgui.

Usage
-----
Expand Down Expand Up @@ -65,7 +65,7 @@ The output of the following code may surprise you.
received.append(item)

received = []
q = Queue(capacity=1)
q = Queue(capacity=1, order='fifo')
ag.start(fn1(q, received))
ag.start(fn2(q, received))
print(received)
Expand All @@ -74,45 +74,18 @@ The output of the following code may surprise you.

['B', 'C', 'A']

.. Why it doesn't print ``['A', 'B', 'C']`` when it clearly puts ``A``, ``B``, and ``C`` in that order?
.. This is because :meth:`asyncgui_ext.queue.Queue.get` not only retrieves an item from the queue,
.. but also fills the resulting vacancy with an item if there is a Task waiting to put one into the queue.
.. Then if there is a Task waiting to get an item from the queue, it will be woken up and an item will be passed to it.
.. And this goes forever until either of the following conditions are met:

.. 1. The queue is empty and there is no Task waiting to put an item into the queue.
.. 2. The queue is full and there is no Task waiting to get an item from the queue.

.. 何故 ``A``, ``B``, ``C`` の順でキューに入れているのにその順で出力されないのか?
.. それは :meth:`asyncgui_ext.queue.Queue.get` が只キューから取り出すだけでなく取り出してできた空きを埋めもするからです。
.. そしてそれを終えた時にもしキューから受け取る為に停まっているタスクが居ればそれを再開させもします。
.. そういった転送処理をその必要が無くなるまでやり続け、それが終わってようやく ``await queue.get()`` が完了します。
.. なので上のコードの進行を追うと

..
.. async def fn1(q, received):
.. await q.put('A') # B
.. await q.put('B') # C
.. item = await q.get()
.. received.append(item)
.. await q.put('C')
.. item = await q.get()
.. received.append(item)

.. async def fn2(q, received):
.. item = await q.get() # E
.. received.append(item)

.. received = []
.. q = Queue(capacity=1)
.. ag.start(fn1(q, received)) # A
.. ag.start(fn2(q, received)) # D
.. print(received)

.. 1. ``fn1`` が始まる。 (A行)
.. 2. ``fn1`` がキューに ``A`` を入れる事でキューが満たされる。 (B行)
.. 3. ``fn1`` がキューに ``B`` を入れようとするが空きがないので空くまで待つ。 (C行)
.. 4. ``fn1`` の進行が停まりA行が完遂される。
.. 5. ``fn2`` が始まる。 (D行)
.. 6. ``fn2`` がキューから ``A`` を取り出すがそれで終わりではない。 (E行)
.. 7. 6によりキューに空きができたため ``fn1`` を再開する。
As you can see, even though ``fn1`` enqueues items in the order A, B, C, the ``received`` list ends up with the order B, C, A,
which is probably not what you'd expect.
In this particular case, you can work around the issue by increasing the queue's capacity so that ``fn1`` does not block (e.g. to 2).
However, to avoid this behavior in all situations, you must rely on a timer to defer the execution of :meth:`~asyncgui_ext.queue.Queue.transfer_items`.
For example, if you are using ``Kivy``, you want to do:

.. code::

from asyncgui_ext.queue import Queue
from kivy.clock import Clock

q = Queue(...)
q.transfer_items = Clock.create_trigger(q.transfer_items)

As for :mod:`tkinter`, refer to the example ``examples/fix_quirk_in_tkinter.py``.
7 changes: 3 additions & 4 deletions src/asyncgui_ext/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Closed(QueueException):
'''
* ``'fifo'``: First In First Out
* ``'lifo'``: Last In First Out
* ``'small-first'``: Smallest item first
* ``'small-first'``: Smallest One First Out
'''


Expand Down Expand Up @@ -115,7 +115,7 @@ def __len__(self) -> int:

@property
def capacity(self) -> int | None:
'''Number of items allowed in the queue. None if unbounded.'''
'''Number of items allowed in the queue. None if unlimited.'''
return self._capacity

@property
Expand Down Expand Up @@ -228,8 +228,7 @@ def half_close(self):
def close(self):
'''
Fully closes the queue.
Putting or getting an item is no longer allowed,
All items it holds will be discarded.
Putting or getting items are no longer allowed, and any items currently held will be discarded.
'''
if self._state is QueueState.CLOSED:
return
Expand Down