質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

10150閲覧

asyncio ThreadPoolExecutor メソッド内からキューに値を入れる方法

taro_yamada

総合スコア55

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2021/01/02 15:09

編集2021/01/03 00:08

websocketのデータを受け取った後eventを起こすようなプログラムを書いています。
<現状>
streamでキューに入れたboardを元にprocess及びeventが動いている。

<実現したいこと>
event内で一定条件の時に別のキューにタスクを入れて別のprocess2及びevent2(数分バックグラウンドで待機した後処理を実行するようなメソッド)を実行したい

<問題点>
if文の中にawaitがあると'event' was never awaitedとなってしまう? 通常のdefでThreadPoolExecutorのqueueに方法があるのでしょうか?

client

1import asyncio 2import json 3import websockets 4import datetime 5import logging 6 7 8import time 9from concurrent.futures import ThreadPoolExecutor 10 11from asyncio import Queue 12 13# タスクを先入先出処理するためのキュー  14que = Queue() 15que2 = Queue() 16 17# タスク処理屋 18executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 19executor2 = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 20 21# 実行を止めるためのフラグ 22working = {'state':True} 23 24# --- 25formatter = '%(levelname)s : %(asctime)s : %(message)s' 26logging.basicConfig(level=logging.INFO, format=formatter) 27logger = logging.getLogger(__name__) 28 29# 処理 30async def process(): 31 while working['state']: 32 board = await que.get() 33 logger.info(f"[TASKID:{board.get('id')}] 処理を開始します。") 34 executor.submit(event, board) 35 36# 処理 37async def process2(): 38 while working['state']: 39 board = await que2.get() 40 logger.info(f"[TASKID:{board.get('id')}] process2処理を開始します。") 41 executor2.submit(event2, board) 42 43 44# 受信部分 45async def stream(): 46 uri = 'ws://127.0.0.1:5679/' 47 async with websockets.connect(uri, ping_timeout=None) as ws: 48 while not ws.closed: 49 response = await ws.recv() 50 board = json.loads(response) 51 await que.put(board) 52 logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。") 53 54async def event(board): 55 logger.info(f"[TASKID:{board.get('id')}] これから5秒待つ" + str(board)) 56 if board.get('id') == 1: 57 await que2.put(board) 58 time.sleep(5) 59 logger.info(f"[TASKID:{board.get('id')}] 処理が完了しました [wait終わり sleepなし]") 60 logger.info(datetime.now()) 61 62def event2(board): 63 logger.info(f"[TASKID:{board.get('id')}] event2これから10秒待つ" + str(board)) 64 time.sleep(10) 65 logger.info(f"[TASKID:{board.get('id')}] event2処理が完了しました [wait終わり sleepなし]") 66 logger.info(datetime.now()) 67 68 69loop = asyncio.get_event_loop() 70loop.create_task(stream()) 71loop.create_task(process()) 72loop.create_task(process2()) 73 74try: 75 loop.run_forever() 76except KeyboardInterrupt: 77 working['state'] = False 78 exit() 79 80

server

1import asyncio 2import datetime 3import random 4import websockets 5import json 6 7 8async def time(websocket, path): 9 count = 0 10 while True: 11 now = datetime.datetime.utcnow().isoformat() + "Z" 12 cid = f"{count:>6}" 13 char = 'longlongstr' 14 content = {'id':cid, 'message': char, 'time': str(now)} 15 j_content = json.dumps(content) 16 count += 1 17 await websocket.send(j_content) 18 # await asyncio.sleep(random.random() * 3) 19 await asyncio.sleep(1) 20 21start_server = websockets.serve(time, "127.0.0.1", 5679) 22 23asyncio.get_event_loop().run_until_complete(start_server) 24asyncio.get_event_loop().run_forever() 25

<結果>
INFO : 2021-01-02 23:59:50,151 : [TASKID: 0] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:50,151 : [TASKID: 0] 処理を開始します。
C:/Users/Owner/PycharmProjects/pythonProject/async_client2.py:35: RuntimeWarning: coroutine 'event' was never awaited
executor.submit(event, board)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO : 2021-01-02 23:59:51,170 : [TASKID: 1] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:51,170 : [TASKID: 1] 処理を開始します。
C:\Users\Owner\anaconda3\lib\concurrent\futures\thread.py:82: RuntimeWarning: coroutine 'event' was never awaited
del work_item
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO : 2021-01-02 23:59:52,172 : [TASKID: 2] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:52,172 : [TASKID: 2] 処理を開始します。
INFO : 2021-01-02 23:59:53,176 : [TASKID: 3] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:53,176 : [TASKID: 3] 処理を開始します。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

(以下コルーチン等の用語は、https://docs.python.org/ja/3/library/asyncio-task.html 内の使い方に準じています)

「 RuntimeWarning: coroutine 'event' was never awaited」が出る原因ですが、
process()内で、eventコルーチンをThreadpoolExecutor.submit()関数に渡しているためです。

エラーをなくすには、submit()関数に渡すのではなく、awaitをつけて呼び出します。

またeventコルーチン内で、time.sleep(5)としていますが、これだとメインスレッドを止めてしまうため、serverから送られてくるデータの受信処理もsleepの間止まってしまいます。
したがって、想定する動作にするためには、await asyncio.sleep(5)とする必要があります。

また、boardのidは桁を揃えた文字列となっているので、仮にserver側を変更しないならば
条件判定の部分は、

int(board.get('id').lstrip()) == 1

のように、空白を取り除いて数値に変換してやらないといけません。

以上をまとめたのが下記になります。

なお、id==1の場合、event()での処理(5秒)が終わってから、その約5秒後(que2に入ってから約10秒後)にevemt2()での処理が終わる形になっています。

つまりid==1のboardは、queに入った直後にque2にも入り、最初の5秒間は同時にevent()とevent2()での処理が走っていることになります。

client.py

python

1import asyncio 2import json 3import websockets 4import datetime 5import logging 6from datetime import datetime # これがないとエラーになる。 7 8import time 9from concurrent.futures import ThreadPoolExecutor 10 11from asyncio import Queue 12 13# タスクを先入先出処理するためのキュー  14que = Queue() 15que2 = Queue() 16 17# タスク処理屋 18executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 19executor2 = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread2") 20 21# 実行を止めるためのフラグ 22working = {'state':True} 23 24# --- 25formatter = '%(levelname)s : %(asctime)s : %(message)s' 26logging.basicConfig(level=logging.INFO, format=formatter) 27logger = logging.getLogger(__name__) 28 29# 処理 30async def process(): 31 while working['state']: 32 board = await que.get() 33 logger.info(f"[TASKID:{board.get('id')}] 処理を開始します。") 34 # executor.submit(event, board) # 削除 35 await event(board) # 代わりにこうする。 36 37# 処理 38async def process2(): 39 while working['state']: 40 board = await que2.get() 41 logger.info(f"[TASKID:{board.get('id')}] process2処理を開始します。") 42 executor2.submit(event2, board) 43 44 45# 受信部分 46async def stream(): 47 uri = 'ws://127.0.0.1:5679/' 48 async with websockets.connect(uri, ping_timeout=None) as ws: 49 while not ws.closed: 50 response = await ws.recv() 51 board = json.loads(response) 52 await que.put(board) 53 logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。") 54 55async def event(board): 56 if int(board.get('id').lstrip()) == 1: # 修正 57 await que2.put(board) 58 logger.info(f"[TASKID:{board.get('id')}] これから5秒待つ" + str(board)) # 実行順番を修正(修正前だと処理時間の表示が想定通りにならない) 59 await asyncio.sleep(5) # 修正 60 logger.info(f"[TASKID:{board.get('id')}] 処理が完了しました [wait終わり sleepなし] {board}") 61 logger.info(datetime.now()) 62 63def event2(board): 64 logger.info(f"[TASKID:{board.get('id')}] event2これから10秒待つ" + str(board)) 65 time.sleep(10) 66 logger.info(f"[TASKID:{board.get('id')}] event2処理が完了しました [wait終わり sleepなし] {board}") 67 logger.info(datetime.now()) 68 69 70loop = asyncio.get_event_loop() 71loop.create_task(stream()) 72loop.create_task(process()) 73loop.create_task(process2()) 74 75try: 76 loop.run_forever() 77except KeyboardInterrupt: 78 working['state'] = False 79 exit()

投稿2021/01/03 02:48

編集2021/01/03 02:59
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2021/01/03 05:06

ありがとうございいました。 無事動きました。 ただ、一つ心配なことがあるのですが、元々eventの処理をsidの順番で行いたかったのですが、asyncにしてしまうことによって、順番どおりに処理されなくなることはないでしょうか?
退会済みユーザー

退会済みユーザー

2021/01/03 05:46

event()での処理自体は、process()内で、queから順番に取り出して処理されるので、順番通りに処理されなくなることはありません。
taro_yamada

2021/01/03 07:25

ありがとうございました! 本当に勉強になります。 実態のソースコードがかなり複雑化していて、しっかりと反映できるか少し不安ではありますが・・・、先が見えてきました! 引き続きよろしくお願いします。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問