websocketのデータを受け取った後eventを起こすようなプログラムを書いています。
<現状>
streamでキューに入れたboardを元にprocess及びeventが動いている。
<実現したいこと>
event内で一定条件の時に別のキューにタスクを入れて別のprocess2及びevent2(数分バックグラウンドで待機した後処理を実行するようなメソッド)を実行したい
<問題点>
if文の中にawaitがあると'event' was never awaitedとなってしまう? 通常のdefでThreadPoolExecutorのqueueに方法があるのでしょうか?
client
1import asyncio 2import json 3import websockets 4import datetime 5import logging 6 7 8import time 9from concurrent.futures import ThreadPoolExecutor 10 11from asyncio import Queue 12 13# タスクを先入先出処理するためのキュー 14que = Queue() 15que2 = Queue() 16 17# タスク処理屋 18executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 19executor2 = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 20 21# 実行を止めるためのフラグ 22working = {'state':True} 23 24# --- 25formatter = '%(levelname)s : %(asctime)s : %(message)s' 26logging.basicConfig(level=logging.INFO, format=formatter) 27logger = logging.getLogger(__name__) 28 29# 処理 30async def process(): 31 while working['state']: 32 board = await que.get() 33 logger.info(f"[TASKID:{board.get('id')}] 処理を開始します。") 34 executor.submit(event, board) 35 36# 処理 37async def process2(): 38 while working['state']: 39 board = await que2.get() 40 logger.info(f"[TASKID:{board.get('id')}] process2処理を開始します。") 41 executor2.submit(event2, board) 42 43 44# 受信部分 45async def stream(): 46 uri = 'ws://127.0.0.1:5679/' 47 async with websockets.connect(uri, ping_timeout=None) as ws: 48 while not ws.closed: 49 response = await ws.recv() 50 board = json.loads(response) 51 await que.put(board) 52 logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。") 53 54async def event(board): 55 logger.info(f"[TASKID:{board.get('id')}] これから5秒待つ" + str(board)) 56 if board.get('id') == 1: 57 await que2.put(board) 58 time.sleep(5) 59 logger.info(f"[TASKID:{board.get('id')}] 処理が完了しました [wait終わり sleepなし]") 60 logger.info(datetime.now()) 61 62def event2(board): 63 logger.info(f"[TASKID:{board.get('id')}] event2これから10秒待つ" + str(board)) 64 time.sleep(10) 65 logger.info(f"[TASKID:{board.get('id')}] event2処理が完了しました [wait終わり sleepなし]") 66 logger.info(datetime.now()) 67 68 69loop = asyncio.get_event_loop() 70loop.create_task(stream()) 71loop.create_task(process()) 72loop.create_task(process2()) 73 74try: 75 loop.run_forever() 76except KeyboardInterrupt: 77 working['state'] = False 78 exit() 79 80
server
1import asyncio 2import datetime 3import random 4import websockets 5import json 6 7 8async def time(websocket, path): 9 count = 0 10 while True: 11 now = datetime.datetime.utcnow().isoformat() + "Z" 12 cid = f"{count:>6}" 13 char = 'longlongstr' 14 content = {'id':cid, 'message': char, 'time': str(now)} 15 j_content = json.dumps(content) 16 count += 1 17 await websocket.send(j_content) 18 # await asyncio.sleep(random.random() * 3) 19 await asyncio.sleep(1) 20 21start_server = websockets.serve(time, "127.0.0.1", 5679) 22 23asyncio.get_event_loop().run_until_complete(start_server) 24asyncio.get_event_loop().run_forever() 25
<結果>
INFO : 2021-01-02 23:59:50,151 : [TASKID: 0] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:50,151 : [TASKID: 0] 処理を開始します。
C:/Users/Owner/PycharmProjects/pythonProject/async_client2.py:35: RuntimeWarning: coroutine 'event' was never awaited
executor.submit(event, board)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO : 2021-01-02 23:59:51,170 : [TASKID: 1] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:51,170 : [TASKID: 1] 処理を開始します。
C:\Users\Owner\anaconda3\lib\concurrent\futures\thread.py:82: RuntimeWarning: coroutine 'event' was never awaited
del work_item
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO : 2021-01-02 23:59:52,172 : [TASKID: 2] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:52,172 : [TASKID: 2] 処理を開始します。
INFO : 2021-01-02 23:59:53,176 : [TASKID: 3] タスクをキューに投入しました。
INFO : 2021-01-02 23:59:53,176 : [TASKID: 3] 処理を開始します。
回答1件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2021/01/03 05:06
退会済みユーザー
2021/01/03 05:46
2021/01/03 07:25