質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.46%
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

2回答

1588閲覧

threadPoolExecutorの使い方について

taro_yamada

総合スコア55

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2020/12/30 02:06

編集2020/12/30 02:46

asyncioとwebsocketを使ってデータを受信→加工しています。

 加工処理が重いため、データ受信とは別に加工の部分はバックグラウンドで動くようにしたいです。更に加工はデータを受け取った順に実行したいです。

以下のとおりコードを書いてみたのですが、maxworkerの数だけ実行され、さらには、次のメッセージ受信も5秒待ってから実行されます。

 5秒待たずして次のresponseを受けたいのですが、どのように表現すればいいでしょうか?

client

1async def stream(): 2 uri = 'ws://127.0.0.1:5679/' 3 4 async with websockets.connect(uri, ping_timeout=None) as ws: 5 while not ws.closed: 6 7 8 with ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") as executor: 9 response = await ws.recv()# ここを滞りなく実行したいです。 10 board = json.loads(response) 11        print(board) # スレッドのバックグラウンド処理とは別に、このプリントだけはリアルタイムで表示したい。 12 13 executor.map(event, board.get('id')) 14 logger.info("submit end") 15 logger.info("main end") 16 17 18def event(board): 19 print('これから5秒待つ' + str(board)) 20 time.sleep(5) 21 print('wait終わり sleepなし') 22 print(datetime.datetime.now()) 23 24 25loop = asyncio.get_event_loop() 26loop.create_task(stream()) 27try: 28 loop.run_forever() 29except KeyboardInterrupt: 30 exit() 31

これから5秒待つ
これから5秒待つ
これから5秒待つ
これから5秒待つ
wait終わり sleepなし
wait終わり sleepなし
wait終わり sleepなし
wait終わり sleepなし
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
2020-12-30 10:56:02.123801
これから5秒待つ
これから5秒待つ0
wait終わり sleepなし
wait終わり sleepなし
2020-12-30 10:56:07.135456
2020-12-30 10:56:07.135456
これから5秒待つ
これから5秒待つ
これから5秒待つ
これから5秒待つ

server

1async def time(websocket, path): 2 count = 0 3 while True: 4 now = datetime.datetime.utcnow().isoformat() + "Z" 5 cid = f"{count:>6}" 6 char = 'longlongstr' 7 content = {'id':cid, 'message': char, 'time': str(now)} 8 j_content = json.dumps(content) 9 count += 1 10 await websocket.send(j_content) 11 await asyncio.sleep(random.random() * 3) 12 13 14start_server = websockets.serve(time, "127.0.0.1", 5679) 15 16asyncio.get_event_loop().run_until_complete(start_server) 17asyncio.get_event_loop().run_forever()

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

退会済みユーザー

退会済みユーザー

2020/12/30 02:41

加工する処理は、バックグラウンドで受け取った順に1つ1つ処理すればよく、多重に処理する必要はないという理解でよろしいでしょうか? たとえば、1,2,3,4・・・という加工処理が発生した場合、この1,2,3,4・・・を同時に行うまでは必要なく、順番に処理していけばよいという理解でいいでしょうか
taro_yamada

2020/12/30 02:48

お見込みの通りです。 やりたいことをソースコードに追記しました。 <今回の課題は以下の点です> ・データ受信が遅れてしまってwebsocketがtimeoutエラーになってしまうこと ・前回qnoir様にアドバイスいただいたawaitで進めていたのですが、中のasyinc def関数にawaitを使うものが一つもなかったのでうまく動かなかったこと。(私の調査ミスです。申し訳ありません。)
taro_yamada

2020/12/30 02:49

イメージとしては受信したresponseをキューに入れて、バックグラウンドで先入先出でメソッドで処理していくようなイメージだと思っています。
guest

回答2

0

ざっとThreadPoolExecutorのドキュメントを読んでみました。
ThreadPoolExecutorは、パラメーターを変えながら同じ処理を順序を無視して複数実行するための機能です。実行順序の保証はありません。

taro_yamadaさんのやろうとしていることは、データ受け取り(receiver)、データ処理(analyzer)についてそれぞれ一個のスレッドがあれば十分ですし、それぞれのスレッドが実行するのは決まった処理ですから、途中で新しいタスクを依頼する必要はありません。

やるべきことは、receiverスレッドを立ち上げて5秒眠っては受信するのを無限ループさせ、親スレッド側はanalyzer処理を行うことです。両スレッドは、排他制御した先入れ先出しキューで受け取ったデータを受け渡しし、キューが空ならanalyzer処理は何秒かスリープさせればよいです。
このためには、threadingモジュールで十分でしょう。

投稿2020/12/30 02:53

ppaul

総合スコア24666

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2020/12/30 04:25

threadingのスレッドとasyncioでのスレッドとどちらが効率的なのか判断が難しいですが、とりあえず、現状、動いているasyncioのスレッドで処理してみようと思います。 ありがとうございました。
guest

0

ベストアンサー

実行を止めるためのフラグのところはお行儀が悪いかもしれませんがとりあえずの実装です。

現状、clientでの処理(5秒間隔で1つずつ)に対して、serverからの送出が早いため、どんどんタスクはたまっていきます。

client.py

python

1(loggerとかの定義は質問でも省略されているので略) 2 3# タスク処理屋 4executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread") 5 6# 処理を止めるためのフラグ 7working = {'state':True} 8 9 10 11# 受信部分 12async def stream(): 13 uri = 'ws://127.0.0.1:5679/' 14 async with websockets.connect(uri, ping_timeout=None) as ws: 15 while not ws.closed: 16 response = await ws.recv() 17 board = json.loads(response) 18 logger.info(f"[TASKID:{board.get('id')}] 受信しました。") 19 executor.submit(event, board) 20 logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。") 21 22 23 24def event(board): 25 logger.info(f"[TASKID:{board.get('id')}] これから5秒待つ" + str(board)) 26 time.sleep(5) 27 logger.info(f"[TASKID:{board.get('id')}] 処理が完了しました [wait終わり sleepなし]") 28 logger.info(datetime.now()) 29 30 31loop = asyncio.get_event_loop() 32loop.create_task(stream()) 33 34try: 35 loop.run_forever() 36except KeyboardInterrupt: 37 working['state'] = False 38 exit()

投稿2020/12/30 03:15

編集2020/12/30 03:24
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2020/12/30 04:21

ありがとうございました。無事にうまく動き出しました。 logを見ているのが楽しいです。 ずっとこの問題で苦労していたので本当に助かりました。 今後ともよろしくお願いします。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.46%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問