質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
並列処理

複数の計算が同時に実行される手法

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

1315閲覧

Pythonのasyncioで並行処理(ダウンロードと処理)させたいです

teraterakoya

総合スコア8

並列処理

複数の計算が同時に実行される手法

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2020/08/06 11:48

Pythonで複数の外部リソースへのアクセスとその処理を1つのまとまりとして、並行処理させたいです
そこで、以下のようなコードを書いています(実際の処理などは抽象化しますが、tweepyrequestsなどのライブラリを使っています)
しかし、実際はasyncio.create_task によって作られたタスクが順々に実行されてしまいます

python

1def download(url): 2 ... 3 return sentence 4 5def text_preprocess(sentence): 6 ... 7 return 8 9async def download_and_preprocess(url): 10 sentence = download(url) 11 return preprocess(sentence) 12 13 14def main(): 15 urls = [...] 16 loop = asyncio.get_event_loop() 17 tasks = [asyncio.create_task(download_and_preprocess(url) for url in urls] 18 loop.run_until_complete(asyncio.gather(*tasks)) 19 20main()

私の理解では

  • download_and_preprocess を呼び出すとcoroutineオブジェクトが返ってくる
  • asyncio.create_task に coroutineオブジェクトを渡すと、Task になる
  • Taskオブジェクトは並行して実行される

と理解しています
期待しているのは

  • download_and_preprocess が並行に実行され、
  • loop.run_until_completeなどでその配列を受取ることです

そこで別の実験として以下の様に書いて実験しました

python

1async def r(i): 2 print(f"start {i}") 3 time.sleep(10-i) 4 return i 5async def main(): 6 tasks = [asyncio.create_task(r(x)) for x in range(5)] 7 for task in tasks: 8 print(await task) 9 return 10asyncio.run(main())

実行してみると

console

1start 0 2start 1 3start 2 4start 3 5start 4 60 71 82 93 104

と、出力されます
特にstart {i}を出力するのに10-iかかってしまいます(ここは並行して出て欲しい)
time.sleepawait asyncio.sleepに変更すると期待したような動きをするのは確認できました


結局、最初のdownload_and_preprocess を期待通り実行させるにはどうしたら良いのでしょうか
どなたかご存じの方いらっしゃればご教示願いたいです

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

requestsモジュールの代わりに非同期通信に対応したライブラリであるaiohttpを使う方法があります。

import asyncio import time import requests import aiohttp import random async def download(url, session): print("download start") s = await session.get(url) # await asyncio.sleep(0) # この行はrequestsバージョンでは必要 sentence = await s.text() # requestsでは.textだが、aiohtttpは.text() print(f"download {url} completed!") return sentence async def text_preprocess(sentence): # 下記は時間がかかる処理を想定 a = random.randint(0,6) print(f"processing for {a} sec") # await asyncio.sleep(a) # ダウンロード処理時間だけ見たい場合、ここはコメントにする return a async def download_and_preprocess(url, session): sentence = await download(url, session) return await text_preprocess(sentence) async def do_tasks(urls): async with aiohttp.ClientSession() as session: tasks = [asyncio.create_task(download_and_preprocess(url, session)) for url in urls] results = await asyncio.gather(*tasks) return results def main(): urls = ["https://www.example.com/domains/reserved", "https://www.example.org/domains/reserved", "https://www.example.edu/domains/reserved"] loop = asyncio.get_event_loop() results = loop.run_until_complete(do_tasks(urls)) for r in results: print(f"result:{r}") start = time.time() main() process_time = time.time() - start print(process_time)

 比較として別途aiohttpをrequestsに単純に置き換えた場合のコードを用意しましたので、これを実行した場合の時間と比べてみてください。

 ネットワーク状態にもよるかもしれませんが、おおむね、aiohttpを使用している上記のコードの方が早く完了すると思います。(requestsの方はダウンロードごとにブロックされており並行処理になっていないため時間がかかる)

[補足1]
上記では、ダウンロードの並行性だけを見るため、 async def text_preprocess(sentence):内のsleepをコメント化しています。

 text_preprocess処理の並行性も見る場合は、async def text_preprocess(sentence): 内のコメント部分行(sleep)をコメントアウトして実行してみてください。

3つのランダムスリープ時間の「合計」ではなく、3つのうちの最大時間までしかウェイトされないことがわかると思います。

 ただし、text_preprocess内のasyncio.sleep()部分を実際の処理に置き換える場合、その処理自体がノンブロッキングでなければasyncioのメリットは享受できません。

ネットワーク通信やデータベース処理、ファイル処理等のI/Oバウンドではなく、たとえばBeautifulSoupによるタグ検索等単純にCPUの計算負荷がかかるような処理については、multiprocessを使った方が効果が出ると思います。

[補足2]
requestsとrun_in_executorを使う方法もありますが、イベントループやマルチスレッド寄りの話になるので割愛しました。

参考:
asyncio --- 非同期 I/O
aiohttp documentation

投稿2020/08/06 15:02

編集2020/08/06 22:28
patapi

総合スコア820

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

teamikl

2020/08/07 00:11

# コメント欄開こうとしたら操作誤って、変な通知がいってしまったかもしれません。 結果の出力が、全てのタスクの完了を迄待つことになっている為 終了したタスクから順に結果を出力するには、 download_and_preprocess側に処理を書く、もしくは as_completed や task.add_done_callbackが使えます。 # 質問文の2つ目のコードに適応すると、 # time.sleep -> asyncio.sleep にして、for 文のところ。 for task asyncio.as_completed(tasks):   print(await task) とすると start 0 start 1 start 2 start 3 start 4 4 3 2 1 0 gather/run_until_complete では、処理順序は期待通りですが、 結果の出力は 0, 1, 2, 3, 4 と tasks の順序になります。 ---- > requestsとrun_in_executorを使う方法… 私も同意見です。asyncio を使うメリットが薄まるので、 何か事情がない限りは、aiohttp を使うに +1
teraterakoya

2020/08/07 02:27

回答ありがとうございます もしかして、`async def` したらなんでも並行処理の対象になるというわけではないのでしょうか? 例えば`async def time_sleep(delay): time.sleep(delay)`は `asyncio.sleep` と同じ処理ををしませんでした 同様に`requests` の代わりに `iohttp` を使えばいいという話はそれに関係していますか? (全くわからないのですが、ブロッキングという話が絡んでくるのでしょうか?)
patapi

2020/08/07 04:02

>teamikl 様 補足ありがとうございました。 >terate様 >もしかして、`async def` したらなんでも並行処理の対象になるというわけではないのでしょうか yesという回答になります。 並行動作させるには、async defの中で、ブロッキングしない処理を用いる必要があります。 requestsはブロッキングしてしまうため、代わりにaiohttpを使用しています。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問