-
-
Notifications
You must be signed in to change notification settings - Fork 117
Expand file tree
/
Copy pathtest_retry_task.py
More file actions
43 lines (36 loc) · 1.15 KB
/
test_retry_task.py
File metadata and controls
43 lines (36 loc) · 1.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pytest
from taskiq import (
Context,
InMemoryBroker,
SmartRetryMiddleware,
TaskiqDepends,
TaskiqScheduler,
)
from taskiq.schedule_sources import LabelScheduleSource
@pytest.mark.parametrize(
"retry_count",
range(5),
)
@pytest.mark.anyio
async def test_save_task_id_for_retry(retry_count: int) -> None:
broker = InMemoryBroker().with_middlewares(
SmartRetryMiddleware(
default_retry_count=retry_count + 1,
default_delay=0.1,
),
)
scheduler = TaskiqScheduler(broker, [LabelScheduleSource(broker)])
check_interval = 0.5
@broker.task("exc_task", retry_on_error=True)
async def exc_task(count: int = 0, context: "Context" = TaskiqDepends()) -> int:
retry = int(context.message.labels.get("_retries", 0))
if retry < count:
raise Exception("test")
return retry
await broker.startup()
await scheduler.startup()
task_with_retry = await exc_task.kiq(retry_count)
task_with_retry_result = await task_with_retry.wait_result(
check_interval=check_interval,
)
assert task_with_retry_result.return_value == retry_count