質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

2回答

2925閲覧

asyncio.Queueを利用した非同期処理のコードを理解できない

退会済みユーザー

退会済みユーザー

総合スコア0

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

2クリップ

投稿2019/01/28 14:55

前提・実現したいこと

数万行あるデータの加工を行おうと思い、処理を書いたところ加工処理がソコソコ時間がかかることがわかりました。
逐次処理ですと、データ件数*加工時間となり非常に遅いため重たい処理を非同期で動かして高速化しようと思い調べたところ
asyncio.Queue を利用しることで実現できそうということがわかりました。

下記リファレンスを読み最下部にあるサンプルコードを動かしてみたところ求めている動作をしているようなのですが、コードと動作のつながりが理解できずモヤっとしています。

  • 参照したリファレンス

https://docs.python.org/ja/3/library/asyncio-queue.html

モヤっとしてるのは以下の点

  • asyncio.create_task で作成されたTaskはリストに詰めているだけのように見えるのに実行される点
    • task.cancel() の記述があるが、これはTaskを停止する処理と考えています。
    • await queue.join() の行をコメントアウトしたらTaskは実行されなくなった、リファレンス読む限りキューがなくなるまでブロックして取り出すとあるので、Taskを実行しているようには見えない。

該当のソースコード

python

1import asyncio 2import random 3import time 4 5 6async def worker(name, queue): 7 while True: 8 # Get a "work item" out of the queue. 9 sleep_for = await queue.get() 10 11 # Sleep for the "sleep_for" seconds. 12 await asyncio.sleep(sleep_for) 13 14 # Notify the queue that the "work item" has been processed. 15 queue.task_done() 16 17 print(f'{name} has slept for {sleep_for:.2f} seconds') 18 19 20async def main(): 21 # Create a queue that we will use to store our "workload". 22 queue = asyncio.Queue() 23 24 # Generate random timings and put them into the queue. 25 total_sleep_time = 0 26 for _ in range(20): 27 sleep_for = random.uniform(0.05, 1.0) 28 total_sleep_time += sleep_for 29 queue.put_nowait(sleep_for) 30 31 # Create three worker tasks to process the queue concurrently. 32 tasks = [] 33 for i in range(3): 34 task = asyncio.create_task(worker(f'worker-{i}', queue)) 35 tasks.append(task) 36 37 # Wait until the queue is fully processed. 38 started_at = time.monotonic() 39 await queue.join() 40 total_slept_for = time.monotonic() - started_at 41 42 # Cancel our worker tasks. 43 for task in tasks: 44 task.cancel() 45 # Wait until all worker tasks are cancelled. 46 await asyncio.gather(*tasks, return_exceptions=True) 47 48 print('====') 49 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 50 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 51 52 53asyncio.run(main())

試したこと

  • コメントアウトやprint文を仕込んでどういう動きをするか試した。
  • qiitaなどの解説などを読んだ

補足情報(FW/ツールのバージョンなど)

  • 自分のマシンにはpython3.6しか入っていなかったので、一部3.7以上でしか動作しないコードは3.6以下でも動作するよう修正しました。
  • create_taskensure_future に置き換えた
  • asyncio.run(main())asyncio.get_event_loop().run_until_complete(main())とした

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

そのモヤっとする気持ち、すごくよくわかります。私はその気持は、asyncioパッケージで提供されている関数等が「名が体を表していない」「なんだか他の標準ライブラリと毛色が違う」という状態だから起こるんだと、解釈するようにしています。

1)asyncio.create_task

このasyncio.create_taskですがdocstringには、

Schedule the execution of a coroutine object in a spawn task.

と書かれています。「じゃあasyncio.create_taskじゃなくてasyncio.schedule_taskだろ!!」と私は思います。なので以下のコードは「ただ単純にタスクを作成している」のではなく「タスクをスケジュールしてそのタスクへの参照を得ている」と解釈すべきなのです。

task = asyncio.create_task(worker(f'worker-{i}', queue))

2)task.cancel()

このtask.cancel()ですがdocstringには、

Request that this task cancel itself.

と書かれています。ので、厳密には「Taskの停止(or 取りやめ or 中断)をリクエストする処理」と考えると良いと思います。

3)await queue.join()

awaitは待つ処理であり、その待ちに入るところではじめてPythonは他のタスクの処理に移ります。

もしこの行がなくなると、次のawaitはタスクをキャンセルした後なので、タスクは実行される機会がなくキャンセルされるという動作になります。

投稿2019/01/29 01:31

YouheiSakurai

総合スコア6142

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

直接的な回答ではないのですが・・・

逐次処理ですと、データ件数*加工時間となり非常に遅いため重たい処理を非同期で動かして高速化しようと思い調べたところasyncio.Queue を利用しることで実現できそうということがわかりました。

もし質問者さんがデータの加工を単一のPythonのプロセス上で行うことを想定しているなら、それをasyncioで並列化することはできない気がしました。それは以下のリファレンスの先頭に
https://docs.python.org/ja/3.7/library/asyncio.html

asyncio is often a perfect fit for IO-bound and high-level structured network code.

とあるからです。自分はasyncioは文字通り「I/Oネックの非同期処理を効率よく実行するためのもの」でありちょうどNodeJSの非同期I/Oのように単一スレッド前提で以下のような考え方で動作するものだと思ったのです。

  • I/Oのような時間のかかる処理の起動は指示するが、完了を待たない関数(非同期関数)が前提
  • 処理の起動をした後はすぐに次の別の処理(別のタスクのコルーチンへの切り替え)を行う。

そこで実験のために次のようなコードを動かしてみました。

Python

1import asyncio 2import time 3import threading 4 5start_time = time.perf_counter_ns() 6 7def log(*m): 8 cur_time = (time.perf_counter_ns() - start_time) * 1E-9 9 print('{:.9f}:'.format(cur_time), *m) 10 11async def sub(name): 12 log(f'{name}: thread {threading.current_thread().getName()}') 13 for i in range(2): 14 log(f'{name}: before sleep') 15 time.sleep(0.1) 16 log(f'{name}: after sleep') 17 18async def main(): 19 name = threading.current_thread().getName() 20 q = asyncio.Queue() 21 log(f'main: thread {name}') 22 log('main: before create_task') 23 tasks = [asyncio.create_task(sub(f'sub{i}')) for i in range(2)] 24 log('main: after create_task') 25 26 log('main: before sleep') 27 time.sleep(0.1) 28 log('main: after sleep') 29 30 log('main: before await gather') 31 await asyncio.gather(*tasks, return_exceptions=True) 32 log('main: after await gather') 33 34 35asyncio.run(main())

結果は次の通り、全て同じメインスレッドで実行されてます。

text

10.001005765: main: thread MainThread <==注目 20.001037492: main: before create_task 30.001064477: main: after create_task 40.001078699: main: before sleep 50.096260583: main: after sleep 60.096434532: main: before await gather 70.096769665: sub0: thread MainThread <==注目 80.096850987: sub0: before sleep 90.205639503: sub0: after sleep 100.205755469: sub0: before sleep 110.314956065: sub0: after sleep 120.315034469: sub1: thread MainThread <==注目 130.315053432: sub1: before sleep 140.424338996: sub1: after sleep 150.424473560: sub1: before sleep 160.533754019: sub1: after sleep 170.533960788: main: after await gather

テストのためにsub関数の中でtime.sleepを用いましたが、実際のアプリではここをインタープリタの任意の処理に置き換えることになります。しかし結果は上記と同じであり、sub同士が並列して動作することはなさそうです。同一のスレッドで実行しているので当たり前ですね・・・


ではasyncioではなくthreading.Threadだとどうかというと・・・PythonのインタープリタはGIL(Global Interpreter Lock)方式を採用していまして、インタープリタの実行は例えマルチスレッドにしてもあまり並列化の効果がないと思います。

https://ja.wikipedia.org/wiki/グローバルインタプリタロック

複数のプロセッサによる並列データ処理を狙うならmultiprocessingパッケージを用いて複数のプロセスでの並列処理を検討するとよいと思います。

https://docs.python.org/ja/3.7/library/multiprocessing.html


上記回答は質問者さんが単一のインタープリタプロセス上でnumpyやpandasなどを用いてデータの加工処理を狙っていると想定しています。もし複数のプロセスで加工処理をすることを想定していらしてその結果の制御を行う部分をasyncioを用いて行おうとしているのでしたら本回答は大外れになってしまいます。もしそうだったらゴメンナサイ。

投稿2019/01/28 18:36

編集2019/01/28 18:39
KSwordOfHaste

総合スコア18394

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだベストアンサーが選ばれていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問