問題の原因
asyncio はイベントループを持ち、
await のタイミングで処理を切り替え、他のタスクを進めます。
問題のコードでは、
for data in data_list: の data_list が無限ループの為、for 分を抜けられず
await asyncio.gather(*tasks) まで実行は到達しません。⇒ 他のタスクは実行されない
解決策A;
同期 <=> 非同期のやりとりとして、まず思いつくのは、
janus というライブラリの同期<=>非同期キューを用いると
例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。(追記: 標準ライブラリの asyncio.Queue でも工夫すれば可能)
実質、同期ジェネレーターで生成する値を、
コルーチン側で非同期ジェネレーターとして扱う事が可能になります。
別スレッドでの実行は to_thread, run_in_executor 等。
追記2: 要点が埋もれてしまったので、ブロッキング処理を含むジェネレータを別スレッドで実行し非同期読み出し。for文は使わずに
python
1# ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む ⇒ asyncio のイベントループはブロックされない)
2value = await asyncio.to_thread(next, gen)
解決策B
タスク生成スケジュールの見直し、
現状のコードではタスクの内容次第では、処理速度が追いつかないことがあるかもしれません。
(タスクを溢れさせないための sleep 10 だとは思いますが)
例えば 20個のタスクを生成し、10個のタスクが完了した時点でもう10個次のタスクを追加する等、
無限ループのコードを変更することなく、
next() でジェネレーターから値を順次呼び出すことは可能です。
(time.sleep(10) は不要になると思うので見直し候補)
追記: Semaphore 等で数量制限
python
1# 何か重い処理
2await asyncio.sleep(10)
何か重たい処理として仮置きしてるのかもしれませんが、
コルーチン内での重たい処理は別スレッドで行いましょう。
実際のコード次第では問題になる場合があります。
追記: 動作サンプルコード
python
1import time
2import asyncio
3from concurrent.futures import ThreadPoolExecutor
4import logging
5
6logger = logging.getLogger(__name__)
7
8TASK_LIMIT = 5
9QUEUE_SIZE = 10
10INTERVAL = 10
11DATA_LIST = [
12 {"key": "a", "value": 1},
13 {"key": "b", "value": 2},
14 {"key": "c", "value": 3},
15]
16
17def producer(data=DATA_LIST, interval=INTERVAL):
18 # sync generator
19 while True:
20 yield from data
21 logger.info(f"waiting {interval} sec")
22 time.sleep(interval)
23
24async def producer_task(queue, sync_gen):
25 # 非同期: ジェネレーターから値を取り出す
26 logger.info("producer start")
27 while True:
28 item = await asyncio.to_thread(next, sync_gen)
29 logger.debug(f"P: {item=}")
30 await queue.put(item) # キューの空を待つ (maxsize)
31
32async def consumer(queue):
33 # 非同期 generator: async for で扱う為の wrapper
34 while True:
35 item = await queue.get() # キューに値が入るのを待つ
36 yield item
37
38async def consumer_task(queue):
39 logger.info("consumer start")
40 semaphore = asyncio.Semaphore(TASK_LIMIT) # 同時実行数
41 loop = asyncio.get_event_loop()
42
43 def done(future):
44 # 同期タスクの実行完了時
45 result = future.result()
46 logger.debug(f"task done {result=}")
47 queue.task_done()
48 semaphore.release()
49
50 with ThreadPoolExecutor(thread_name_prefix="worker") as executor:
51 async for item in consumer(queue):
52 logger.debug(f"C: {item=}")
53 await semaphore.acquire() # 複数実行中のタスク完了を待つ
54 future = loop.run_in_executor(executor, heavy_task, item)
55 future.add_done_callback(done)
56
57def debug(func):
58 # タスクの実行タイム計測
59 from datetime import datetime
60 from functools import wraps
61
62 @wraps(func)
63 def _func(*args, **kw):
64 result = None
65 start_time = datetime.now()
66 logger.debug(f"start {func.__name__}")
67 try:
68 result = func(*args, **kw)
69 except Exception as exc:
70 logger.error(f"error ", exc_info=exc)
71 end_time = datetime.now()
72 logger.debug(f"{result=} (time: {end_time - start_time})")
73 return result
74 return _func
75
76@debug
77def heavy_task(arg):
78 # 時間のかかるタスク。run_in_executor で実行する
79 time.sleep(5)
80 return arg
81
82async def main():
83 # producer->consumer へ値を渡す、非同期キュー
84 queue = asyncio.Queue(maxsize=QUEUE_SIZE)
85
86 async with asyncio.TaskGroup() as tg:
87 task1 = tg.create_task(producer_task(queue, producer()))
88 task2 = tg.create_task(consumer_task(queue))
89
90if __name__ == '__main__':
91 logging.basicConfig(
92 format="[%(asctime)s][%(threadName)-10s][%(levelname)-8s] %(message)s",
93 level=logging.DEBUG,
94 )
95 try:
96 asyncio.run(main())
97 except (KeyboardInterrupt, SystemExit):
98 pass
99 finally:
100 logger.info("Done.")
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2024/12/11 23:22