質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

180閲覧

Python 非同期ジェネレータでないジェネレータを利用しての非同期処理

keruuuu

総合スコア17

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2024/12/11 03:12

編集2024/12/11 03:18

実現したいこと

非同期ジェネレータでないジェネレータの戻り値を利用して非同期処理を行う
*generate_date()は非同期ジェネレータに変更できないことを前提としています。

発生している問題・分からないこと

heavy_func()が呼び出されない

該当のソースコード

python

1import asyncio 2import datetime as dt 3import time 4 5 6def main(): 7 exec_time = dt.datetime.now() 8 print(f'{exec_time}, start') 9 10 data_list = generate_data() 11 asyncio.run(calc_from_data(data_list)) 12 13 14def generate_data(): 15 data_list = [ 16 {'key': 's1', 'val': 1.0}, 17 {'key': 's2', 'val': 2.0}, 18 {'key': 's3', 'val': 3.0}, 19 ] 20 21 while True: 22 yield data_list[0] 23 yield data_list[1] 24 yield data_list[2] 25 time.sleep(10) 26 27 28async def calc_from_data(data_list): 29 tasks = [] 30 for data in data_list: 31 key = data['key'] 32 val = data['val'] 33 exec_time = dt.datetime.now() 34 print(f'{exec_time}, {key}: {val}') 35 36 loop = asyncio.get_event_loop() 37 task = loop.create_task(heavy_func(key, val)) 38 tasks.append(task) 39 40 await asyncio.gather(*tasks) 41 42 43async def heavy_func(key, val): 44 exec_time = dt.datetime.now() 45 print(f'{exec_time}, called, {key}: {val}') 46 47 # 何か重い処理 48 await asyncio.sleep(10) 49 50 exec_time = dt.datetime.now() 51 print(f'{exec_time}, end, {key}: {val}') 52 53 54if __name__ == '__main__': 55 main()

console

12024-12-11 03:10:33.140207, start 22024-12-11 03:10:33.140740, s1: 1.0 32024-12-11 03:10:33.140784, s2: 2.0 42024-12-11 03:10:33.140808, s3: 3.0 52024-12-11 03:10:43.150936, s1: 1.0 62024-12-11 03:10:43.151100, s2: 2.0 72024-12-11 03:10:43.151130, s3: 3.0

試したこと・調べたこと

  • teratailやGoogle等で検索した
  • ソースコードを自分なりに変更した
  • 知人に聞いた
  • その他
上記の詳細・結果

同様の問題が見つかりませんでした。

補足

特になし

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

問題の原因
asyncio はイベントループを持ち、
await のタイミングで処理を切り替え、他のタスクを進めます。

問題のコードでは、
for data in data_list: の data_list が無限ループの為、for 分を抜けられず
await asyncio.gather(*tasks) まで実行は到達しません。⇒ 他のタスクは実行されない

解決策A;
同期 <=> 非同期のやりとりとして、まず思いつくのは、
janus というライブラリの同期<=>非同期キューを用いると
例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。(追記: 標準ライブラリの asyncio.Queue でも工夫すれば可能)

実質、同期ジェネレーターで生成する値を、
コルーチン側で非同期ジェネレーターとして扱う事が可能になります。

別スレッドでの実行は to_thread, run_in_executor 等。

追記2: 要点が埋もれてしまったので、ブロッキング処理を含むジェネレータを別スレッドで実行し非同期読み出し。for文は使わずに

python

1# ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む ⇒ asyncio のイベントループはブロックされない) 2value = await asyncio.to_thread(next, gen)

解決策B
タスク生成スケジュールの見直し、
現状のコードではタスクの内容次第では、処理速度が追いつかないことがあるかもしれません。
(タスクを溢れさせないための sleep 10 だとは思いますが)
例えば 20個のタスクを生成し、10個のタスクが完了した時点でもう10個次のタスクを追加する等、

無限ループのコードを変更することなく、
next() でジェネレーターから値を順次呼び出すことは可能です。
(time.sleep(10) は不要になると思うので見直し候補)

追記: Semaphore 等で数量制限


python

1# 何か重い処理 2await asyncio.sleep(10)

何か重たい処理として仮置きしてるのかもしれませんが、
コルーチン内での重たい処理は別スレッドで行いましょう。
実際のコード次第では問題になる場合があります。

追記: 動作サンプルコード

python

1import time 2import asyncio 3from concurrent.futures import ThreadPoolExecutor 4import logging 5 6logger = logging.getLogger(__name__) 7 8TASK_LIMIT = 5 9QUEUE_SIZE = 10 10INTERVAL = 10 11DATA_LIST = [ 12 {"key": "a", "value": 1}, 13 {"key": "b", "value": 2}, 14 {"key": "c", "value": 3}, 15] 16 17def producer(data=DATA_LIST, interval=INTERVAL): 18 # sync generator 19 while True: 20 yield from data 21 logger.info(f"waiting {interval} sec") 22 time.sleep(interval) 23 24async def producer_task(queue, sync_gen): 25 # 非同期: ジェネレーターから値を取り出す 26 logger.info("producer start") 27 while True: 28 item = await asyncio.to_thread(next, sync_gen) 29 logger.debug(f"P: {item=}") 30 await queue.put(item) # キューの空を待つ (maxsize) 31 32async def consumer(queue): 33 # 非同期 generator: async for で扱う為の wrapper 34 while True: 35 item = await queue.get() # キューに値が入るのを待つ 36 yield item 37 38async def consumer_task(queue): 39 logger.info("consumer start") 40 semaphore = asyncio.Semaphore(TASK_LIMIT) # 同時実行数 41 loop = asyncio.get_event_loop() 42 43 def done(future): 44 # 同期タスクの実行完了時 45 result = future.result() 46 logger.debug(f"task done {result=}") 47 queue.task_done() 48 semaphore.release() 49 50 with ThreadPoolExecutor(thread_name_prefix="worker") as executor: 51 async for item in consumer(queue): 52 logger.debug(f"C: {item=}") 53 await semaphore.acquire() # 複数実行中のタスク完了を待つ 54 future = loop.run_in_executor(executor, heavy_task, item) 55 future.add_done_callback(done) 56 57def debug(func): 58 # タスクの実行タイム計測 59 from datetime import datetime 60 from functools import wraps 61 62 @wraps(func) 63 def _func(*args, **kw): 64 result = None 65 start_time = datetime.now() 66 logger.debug(f"start {func.__name__}") 67 try: 68 result = func(*args, **kw) 69 except Exception as exc: 70 logger.error(f"error ", exc_info=exc) 71 end_time = datetime.now() 72 logger.debug(f"{result=} (time: {end_time - start_time})") 73 return result 74 return _func 75 76@debug 77def heavy_task(arg): 78 # 時間のかかるタスク。run_in_executor で実行する 79 time.sleep(5) 80 return arg 81 82async def main(): 83 # producer->consumer へ値を渡す、非同期キュー 84 queue = asyncio.Queue(maxsize=QUEUE_SIZE) 85 86 async with asyncio.TaskGroup() as tg: 87 task1 = tg.create_task(producer_task(queue, producer())) 88 task2 = tg.create_task(consumer_task(queue)) 89 90if __name__ == '__main__': 91 logging.basicConfig( 92 format="[%(asctime)s][%(threadName)-10s][%(levelname)-8s] %(message)s", 93 level=logging.DEBUG, 94 ) 95 try: 96 asyncio.run(main()) 97 except (KeyboardInterrupt, SystemExit): 98 pass 99 finally: 100 logger.info("Done.")

投稿2024/12/11 16:40

編集2024/12/11 23:43
teamikl

総合スコア8760

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

keruuuu

2024/12/11 23:22

説明とサンプルコードありがとうございます。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問