質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

87.37%

asyncio.Queueを利用した非同期処理のコードを理解できない

受付中

回答 2

投稿

  • 評価
  • クリップ 2
  • VIEW 5,442
退会済みユーザー

退会済みユーザー

前提・実現したいこと

  • https://docs.python.org/ja/3/library/asyncio-queue.html

数万行あるデータの加工を行おうと思い、処理を書いたところ加工処理がソコソコ時間がかかることがわかりました。
逐次処理ですと、データ件数*加工時間となり非常に遅いため重たい処理を非同期で動かして高速化しようと思い調べたところ
asyncio.Queue を利用しることで実現できそうということがわかりました。

下記リファレンスを読み最下部にあるサンプルコードを動かしてみたところ求めている動作をしているようなのですが、コードと動作のつながりが理解できずモヤっとしています。

  • 参照したリファレンス
    https://docs.python.org/ja/3/library/asyncio-queue.html

モヤっとしてるのは以下の点

  • asyncio.create_task で作成されたTaskはリストに詰めているだけのように見えるのに実行される点
  • task.cancel() の記述があるが、これはTaskを停止する処理と考えています。
  • await queue.join() の行をコメントアウトしたらTaskは実行されなくなった、リファレンス読む限りキューがなくなるまでブロックして取り出すとあるので、Taskを実行しているようには見えない。

該当のソースコード

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

試したこと

  • コメントアウトやprint文を仕込んでどういう動きをするか試した。
  • qiitaなどの解説などを読んだ

補足情報(FW/ツールのバージョンなど)

  • 自分のマシンにはpython3.6しか入っていなかったので、一部3.7以上でしか動作しないコードは3.6以下でも動作するよう修正しました。
  • create_task を ensure_future に置き換えた
  • asyncio.run(main()) を asyncio.get_event_loop().run_until_complete(main())とした
  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 2

+1

直接的な回答ではないのですが・・・

逐次処理ですと、データ件数*加工時間となり非常に遅いため重たい処理を非同期で動かして高速化しようと思い調べたところasyncio.Queue を利用しることで実現できそうということがわかりました。

もし質問者さんがデータの加工を単一のPythonのプロセス上で行うことを想定しているなら、それをasyncioで並列化することはできない気がしました。それは以下のリファレンスの先頭に
https://docs.python.org/ja/3.7/library/asyncio.html

asyncio is often a perfect fit for IO-bound and high-level structured network code.

とあるからです。自分はasyncioは文字通り「I/Oネックの非同期処理を効率よく実行するためのもの」でありちょうどNodeJSの非同期I/Oのように単一スレッド前提で以下のような考え方で動作するものだと思ったのです。

  • I/Oのような時間のかかる処理の起動は指示するが、完了を待たない関数(非同期関数)が前提
  • 処理の起動をした後はすぐに次の別の処理(別のタスクのコルーチンへの切り替え)を行う。

そこで実験のために次のようなコードを動かしてみました。

import asyncio
import time
import threading

start_time = time.perf_counter_ns()

def log(*m):
    cur_time = (time.perf_counter_ns() - start_time) * 1E-9
    print('{:.9f}:'.format(cur_time), *m)

async def sub(name):
    log(f'{name}: thread {threading.current_thread().getName()}')
    for i in range(2):
        log(f'{name}: before sleep')
        time.sleep(0.1)
        log(f'{name}: after  sleep')

async def main():
    name = threading.current_thread().getName()
    q = asyncio.Queue()
    log(f'main: thread {name}')
    log('main: before create_task')
    tasks = [asyncio.create_task(sub(f'sub{i}')) for i in range(2)]
    log('main: after  create_task')

    log('main: before sleep')
    time.sleep(0.1)
    log('main: after  sleep')

    log('main: before await gather')
    await asyncio.gather(*tasks, return_exceptions=True)
    log('main: after  await gather')


asyncio.run(main())


結果は次の通り、全て同じメインスレッドで実行されてます。

0.001005765: main: thread MainThread <==注目
0.001037492: main: before create_task
0.001064477: main: after  create_task
0.001078699: main: before sleep
0.096260583: main: after  sleep
0.096434532: main: before await gather
0.096769665: sub0: thread MainThread <==注目
0.096850987: sub0: before sleep
0.205639503: sub0: after  sleep
0.205755469: sub0: before sleep
0.314956065: sub0: after  sleep
0.315034469: sub1: thread MainThread <==注目
0.315053432: sub1: before sleep
0.424338996: sub1: after  sleep
0.424473560: sub1: before sleep
0.533754019: sub1: after  sleep
0.533960788: main: after  await gather


テストのためにsub関数の中でtime.sleepを用いましたが、実際のアプリではここをインタープリタの任意の処理に置き換えることになります。しかし結果は上記と同じであり、sub同士が並列して動作することはなさそうです。同一のスレッドで実行しているので当たり前ですね・・・


ではasyncioではなくthreading.Threadだとどうかというと・・・PythonのインタープリタはGIL(Global Interpreter Lock)方式を採用していまして、インタープリタの実行は例えマルチスレッドにしてもあまり並列化の効果がないと思います。

https://ja.wikipedia.org/wiki/グローバルインタプリタロック

複数のプロセッサによる並列データ処理を狙うならmultiprocessingパッケージを用いて複数のプロセスでの並列処理を検討するとよいと思います。

https://docs.python.org/ja/3.7/library/multiprocessing.html


上記回答は質問者さんが単一のインタープリタプロセス上でnumpyやpandasなどを用いてデータの加工処理を狙っていると想定しています。もし複数のプロセスで加工処理をすることを想定していらしてその結果の制御を行う部分をasyncioを用いて行おうとしているのでしたら本回答は大外れになってしまいます。もしそうだったらゴメンナサイ。

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

+1

そのモヤっとする気持ち、すごくよくわかります。私はその気持は、asyncioパッケージで提供されている関数等が「名が体を表していない」「なんだか他の標準ライブラリと毛色が違う」という状態だから起こるんだと、解釈するようにしています。

1)asyncio.create_task

このasyncio.create_taskですがdocstringには、

Schedule the execution of a coroutine object in a spawn task.

と書かれています。「じゃあasyncio.create_taskじゃなくてasyncio.schedule_taskだろ!!」と私は思います。なので以下のコードは「ただ単純にタスクを作成している」のではなく「タスクをスケジュールしてそのタスクへの参照を得ている」と解釈すべきなのです。

task = asyncio.create_task(worker(f'worker-{i}', queue))

2)task.cancel()

このtask.cancel()ですがdocstringには、

Request that this task cancel itself.

と書かれています。ので、厳密には「Taskの停止(or 取りやめ or 中断)をリクエストする処理」と考えると良いと思います。

3)await queue.join()

awaitは待つ処理であり、その待ちに入るところではじめてPythonは他のタスクの処理に移ります。

もしこの行がなくなると、次のawaitはタスクをキャンセルした後なので、タスクは実行される機会がなくキャンセルされるという動作になります。

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 87.37%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る