asyncioとwebsocketを使ってデータを受信→加工しています。
加工処理が重いため、データ受信とは別に加工の部分はバックグラウンドで動くようにしたいです。更に加工はデータを受け取った順に実行したいです。
以下のとおりコードを書いてみたのですが、maxworkerの数だけ実行され、さらには、次のメッセージ受信も5秒待ってから実行されます。
5秒待たずして次のresponseを受けたいのですが、どのように表現すればいいでしょうか?
client
1async def stream(): 2 uri = 'ws://127.0.0.1:5679/' 3 4 async with websockets.connect(uri, ping_timeout=None) as ws: 5 while not ws.closed: 6 7 8 with ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") as executor: 9 response = await ws.recv()# ここを滞りなく実行したいです。 10 board = json.loads(response) 11 print(board) # スレッドのバックグラウンド処理とは別に、このプリントだけはリアルタイムで表示したい。 12 13 executor.map(event, board.get('id')) 14 logger.info("submit end") 15 logger.info("main end") 16 17 18def event(board): 19 print('これから5秒待つ' + str(board)) 20 time.sleep(5) 21 print('wait終わり sleepなし') 22 print(datetime.datetime.now()) 23 24 25loop = asyncio.get_event_loop() 26loop.create_task(stream()) 27try: 28 loop.run_forever() 29except KeyboardInterrupt: 30 exit() 31
これから5秒待つ
これから5秒待つ
これから5秒待つ
これから5秒待つ
wait終わり sleepなし
wait終わり sleepなし
wait終わり sleepなし
wait終わり sleepなし
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
これから5秒待つ
これから5秒待つ0
wait終わり sleepなし
wait終わり sleepなし
2020-12-30 10:56:07.135456
2020-12-30 10:56:07.135456
これから5秒待つ
これから5秒待つ
これから5秒待つ
これから5秒待つ
server
1async def time(websocket, path): 2 count = 0 3 while True: 4 now = datetime.datetime.utcnow().isoformat() + "Z" 5 cid = f"{count:>6}" 6 char = 'longlongstr' 7 content = {'id':cid, 'message': char, 'time': str(now)} 8 j_content = json.dumps(content) 9 count += 1 10 await websocket.send(j_content) 11 await asyncio.sleep(random.random() * 3) 12 13 14start_server = websockets.serve(time, "127.0.0.1", 5679) 15 16asyncio.get_event_loop().run_until_complete(start_server) 17asyncio.get_event_loop().run_forever()
回答2件
あなたの回答
tips
プレビュー