前提・実現したいこと
数万行あるデータの加工を行おうと思い、処理を書いたところ加工処理がソコソコ時間がかかることがわかりました。
逐次処理ですと、データ件数*加工時間となり非常に遅いため重たい処理を非同期で動かして高速化しようと思い調べたところ
asyncio.Queue
を利用しることで実現できそうということがわかりました。
下記リファレンスを読み最下部にあるサンプルコードを動かしてみたところ求めている動作をしているようなのですが、コードと動作のつながりが理解できずモヤっとしています。
- 参照したリファレンス
https://docs.python.org/ja/3/library/asyncio-queue.html
モヤっとしてるのは以下の点
asyncio.create_task
で作成されたTaskはリストに詰めているだけのように見えるのに実行される点task.cancel()
の記述があるが、これはTaskを停止する処理と考えています。await queue.join()
の行をコメントアウトしたらTaskは実行されなくなった、リファレンス読む限りキューがなくなるまでブロックして取り出すとあるので、Taskを実行しているようには見えない。
該当のソースコード
python
1import asyncio 2import random 3import time 4 5 6async def worker(name, queue): 7 while True: 8 # Get a "work item" out of the queue. 9 sleep_for = await queue.get() 10 11 # Sleep for the "sleep_for" seconds. 12 await asyncio.sleep(sleep_for) 13 14 # Notify the queue that the "work item" has been processed. 15 queue.task_done() 16 17 print(f'{name} has slept for {sleep_for:.2f} seconds') 18 19 20async def main(): 21 # Create a queue that we will use to store our "workload". 22 queue = asyncio.Queue() 23 24 # Generate random timings and put them into the queue. 25 total_sleep_time = 0 26 for _ in range(20): 27 sleep_for = random.uniform(0.05, 1.0) 28 total_sleep_time += sleep_for 29 queue.put_nowait(sleep_for) 30 31 # Create three worker tasks to process the queue concurrently. 32 tasks = [] 33 for i in range(3): 34 task = asyncio.create_task(worker(f'worker-{i}', queue)) 35 tasks.append(task) 36 37 # Wait until the queue is fully processed. 38 started_at = time.monotonic() 39 await queue.join() 40 total_slept_for = time.monotonic() - started_at 41 42 # Cancel our worker tasks. 43 for task in tasks: 44 task.cancel() 45 # Wait until all worker tasks are cancelled. 46 await asyncio.gather(*tasks, return_exceptions=True) 47 48 print('====') 49 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 50 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 51 52 53asyncio.run(main())
試したこと
- コメントアウトやprint文を仕込んでどういう動きをするか試した。
- qiitaなどの解説などを読んだ
補足情報(FW/ツールのバージョンなど)
- 自分のマシンにはpython3.6しか入っていなかったので、一部3.7以上でしか動作しないコードは3.6以下でも動作するよう修正しました。
create_task
をensure_future
に置き換えたasyncio.run(main())
をasyncio.get_event_loop().run_until_complete(main())
とした

バッドをするには、ログインかつ
こちらの条件を満たす必要があります。