質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
並列処理

複数の計算が同時に実行される手法

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

WebSocket

WebSocketとは双方向・全二重コミュニケーションのためのAPIでありプロトコルのことを指します。WebSocketはHTML5に密接に結びついており、多くのウェブブラウザの最新版に導入されています。

Q&A

解決済

1回答

826閲覧

Pythonでwebsocketのから逐次送ったデータをマルチプロセスで処理したいです

fu_3823

総合スコア81

並列処理

複数の計算が同時に実行される手法

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

WebSocket

WebSocketとは双方向・全二重コミュニケーションのためのAPIでありプロトコルのことを指します。WebSocketはHTML5に密接に結びついており、多くのウェブブラウザの最新版に導入されています。

0グッド

0クリップ

投稿2023/02/27 21:40

編集2023/02/28 06:26

実現したいこと

① websocketを使ってデータをリアルタイムで取得
② サブプロセスをマルチプロセスで実行
③ ①で取得したデータをpipeで②のサブプロセスに送って処理する

前提

PythonでBybitのAPIを叩いています。websocketを用いてデータを逐次取得しながら、サブプロセスで分析するようなコードを書こうとしています。
構成としては、websocketを動かすプロセスはデータを取得することだけをさせて、サブプロセスでは一定時間ごとにそのデータを加工して分析するというように、作業を二つのプロセスに分けようとしています。
とりあえず、全体の動きを見るために、websocketを動かすメインプロセスからpipeで 送られてきたデータを、サブプロセスでプリントすると言うだけのプログラムを書きました。
ここでは、サブプロセスでのデータのプリントを実現したいです。

プラットフォームはmacOSです。
本番環境はLinax(ubuntu)です。mac上でうまくいけば、Linax上でも大丈夫かなと考えていますが、安易でしょうか?

発生している問題・エラーメッセージ

特にエラーは発生していませんが、別プロセスからのデータの表示も全くされません。ps でプロセスIDを確認すると、プログラムが停止しているわけではなさそうでした。

該当のソースコード

Python

1import json 2import websocket 3from pprint import pprint 4import multiprocessing as mp 5 6class MyWS(): 7 def __init__(self): 8 self.endpoint_public = 'wss://stream.bybit.com/v5/public/linear' 9 self.resv_conn, self.send_conn = mp.Pipe() 10 self.connect(self.endpoint_public) 11 self.child = mp.Process(target=self.create_df, args=(self.resv_conn, )) 12 self.child.start() 13 self.child.join() 14 15 def connect(self, endpoint): 16 self.ws = websocket.WebSocketApp(url = endpoint, 17 on_open = self.on_open, 18 on_message = self.on_message, 19 on_error = self.on_error, 20 on_close = self.on_close,) 21 self.ws.run_forever() 22 23 def on_message(self, ws, message): 24 try: 25 message = json.loads(message) 26 if message.get('data'): 27 for d in message.get('data'): 28 self.send_conn.send(d) ### ここでデータをサブプロセスに送る。 29 # pprint(message) ### A ### 30 except Exception as e: 31 self.on_error(ws, e) 32 33 def on_open(self, ws): 34 self.ws.send(json.dumps({"op":"subscribe", "args":["publicTrade.BTCUSDT"]})) 35 36 def exit(self): 37 self.ws.close() 38 39 def on_error(self, ws, error): 40 print(error) 41 self.exit() 42 43 def on_close(self, ws, close_status_code, close_msg): 44 print("### closed ###") 45 46 # データ処理用のサブプロセス 47 def create_df(self, connection): 48 while True: 49 received_data = connection.recv() 50 if received_data: 51 print(received_data) 52 53 54if __name__ == '__main__': 55 myws = MyWS() 56 57 58

試したこと

関数on_messageの中の「### A ###」の箇所を有効にすると、データは取得できています。
ですので、pipeでうまく送信できていないか、そもそもサブプロセスが動いていないのか、そのような問題だと思います。しかし、問題の切り分けができません。どなたか、websocketとマルチプロセスに詳しい方、教えていただけないでしょうか。

追記
コンストラクタ__init__()のコードの順序を変えました。

Python

1 def __init__(self): 2 self.endpoint_public = 'wss://stream.bybit.com/v5/public/linear' 3 self.resv_conn, self.send_conn = mp.Pipe() 4 # 以下のようにコードの順序を変更 5 self.child = mp.Process(target=self.create_df, args=(self.resv_conn, )) 6 self.child.start() 7 self.child.join() 8 self.connect(self.endpoint_public)

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

TakaiY

2023/02/28 01:49 編集

プラットフォームはWindowsですか? Linuxですか? multiprocessの挙動が違うのでこの情報は重要です。 create_df(self, connection) メソッドのprintに flush=trueを指定してみるとどうなりますか? また、whileループの前に 何かprintしてみたらそれは表示されますか? https://docs.python.org/ja/3.7/library/functions.html#print 回答はここではなく、質問を編集して追記するといいでしょう。
fu_3823

2023/02/28 02:43

コメントありがとうございます。 本文の方も加筆いたしましたが、こちらにも転記いたします。 プラットフォームはmacOSです。 本番環境はLinax(ubuntu)ですが、mac上でうまくいけば、Linax上でも大丈夫かなと考えていますが、安易でしょうか?
fu_3823

2023/02/28 02:44

printに flush=trueを指定 については後ほど試して返信いたします。
bsdfan

2023/02/28 02:46

self.connect(...) の中で、self.ws.run_forever() しているので、connect() からはもどってこないのではないでしょうか。だとすると、それより後ろのコードは実行されないのでサブプロセスは作られていないと思います。
TakaiY

2023/02/28 05:33

ああ、たしかにそうですね。 子プロセス起動してから、connectですかね。
fu_3823

2023/02/28 06:22 編集

ありがとうございます。 コンストラクタのコードの順序をbsdfanのご指摘とおり変えました。 質問欄に追記した通りです。 また、TakaiY さんのおっしゃる通り、whileループの前に print("test") を置いたところ、コンストラクタのコードの順序変更後はサブプロセスが作成されたようで、うまく"test" と表示されました。 しかし、メインプロセスのwebsocket からのデータは表示されません。pipe経由で送れていないか、受け取れていないかどちらかのようです。 あと、printの引数、flush=True では目に見える変化はありませんでした。
TakaiY

2023/02/28 07:25

join()は、生成した子プロセスの終了を待つ命令なので、joinまで入れてしまってはだめですね。
fu_3823

2023/02/28 09:44

何度もご意見ありがとうございます。joinのドキュメントを再読して、もう一度改善にトライしてみます。
fu_3823

2023/03/01 00:57 編集

コードの順序とjoinの使い方を見直すことで解決しました。 ありがとうございました。 マルチプロセスの書き方のセオリーのようなものを知らないので、下手なロジックだったと思いますが、ご丁寧にありがとうございまいた。 BAしたいですが、コメント欄からの助けていただいた場合は自己解決で良いのでしょうか。内容を回答欄にコピーしていただいたら、BAにさせていただきたいです。
TakaiY

2023/03/01 00:59

キーとなるコメントは bsdfanさんのものなので僕の出る幕ではないのですが、夕方くらいまで待って応答なければ、簡単な修正ポイントのまとめを書いて自己解決にしてしまっていいと思います。
fu_3823

2023/03/01 01:34

ありがとうございます。 おっしゃる通りにさせて頂きたいと思います。
bsdfan

2023/03/01 01:36

やったことをまとめて、自己解決にしていただくほうが、いい回答になると思いますので、お願いします。
fu_3823

2023/03/01 02:00

わかりました。おふたりともありがとうございました。
guest

回答1

0

自己解決

コンストラクタを以下のように書き換えることで、うまく動作するようになりました。

Python

1 def __init__(self): 2 self.endpoint_public = 'wss://stream.bybit.com/v5/public/linear' 3 self.resv_conn, self.send_conn = mp.Pipe() 4# 削除 - self.connect(self.endpoint_public) 5 self.child = mp.Process(target=self.create_df, args=(self.resv_conn, )) 6 self.child.start() 7# 削除 - self.child.join() 8# 追加 + self.connect(self.endpoint_public)

問題点は、.run_forever()が実行されるタイミングがサブプロセスの作成・実行後にすることと、join()が不要だったことです。
サブプロセスは終了させず、メインプロセスからのデータを受け取り続けるように動かすことを意図していたので、サブロセスの終了を待つjoin()は不要でした。

投稿2023/03/13 20:00

fu_3823

総合スコア81

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問