実現したいこと
① websocketを使ってデータをリアルタイムで取得
② サブプロセスをマルチプロセスで実行
③ ①で取得したデータをpipeで②のサブプロセスに送って処理する
前提
PythonでBybitのAPIを叩いています。websocketを用いてデータを逐次取得しながら、サブプロセスで分析するようなコードを書こうとしています。
構成としては、websocketを動かすプロセスはデータを取得することだけをさせて、サブプロセスでは一定時間ごとにそのデータを加工して分析するというように、作業を二つのプロセスに分けようとしています。
とりあえず、全体の動きを見るために、websocketを動かすメインプロセスからpipeで 送られてきたデータを、サブプロセスでプリントすると言うだけのプログラムを書きました。
ここでは、サブプロセスでのデータのプリントを実現したいです。
プラットフォームはmacOSです。
本番環境はLinax(ubuntu)です。mac上でうまくいけば、Linax上でも大丈夫かなと考えていますが、安易でしょうか?
発生している問題・エラーメッセージ
特にエラーは発生していませんが、別プロセスからのデータの表示も全くされません。ps でプロセスIDを確認すると、プログラムが停止しているわけではなさそうでした。
該当のソースコード
Python
1import json 2import websocket 3from pprint import pprint 4import multiprocessing as mp 5 6class MyWS(): 7 def __init__(self): 8 self.endpoint_public = 'wss://stream.bybit.com/v5/public/linear' 9 self.resv_conn, self.send_conn = mp.Pipe() 10 self.connect(self.endpoint_public) 11 self.child = mp.Process(target=self.create_df, args=(self.resv_conn, )) 12 self.child.start() 13 self.child.join() 14 15 def connect(self, endpoint): 16 self.ws = websocket.WebSocketApp(url = endpoint, 17 on_open = self.on_open, 18 on_message = self.on_message, 19 on_error = self.on_error, 20 on_close = self.on_close,) 21 self.ws.run_forever() 22 23 def on_message(self, ws, message): 24 try: 25 message = json.loads(message) 26 if message.get('data'): 27 for d in message.get('data'): 28 self.send_conn.send(d) ### ここでデータをサブプロセスに送る。 29 # pprint(message) ### A ### 30 except Exception as e: 31 self.on_error(ws, e) 32 33 def on_open(self, ws): 34 self.ws.send(json.dumps({"op":"subscribe", "args":["publicTrade.BTCUSDT"]})) 35 36 def exit(self): 37 self.ws.close() 38 39 def on_error(self, ws, error): 40 print(error) 41 self.exit() 42 43 def on_close(self, ws, close_status_code, close_msg): 44 print("### closed ###") 45 46 # データ処理用のサブプロセス 47 def create_df(self, connection): 48 while True: 49 received_data = connection.recv() 50 if received_data: 51 print(received_data) 52 53 54if __name__ == '__main__': 55 myws = MyWS() 56 57 58
試したこと
関数on_messageの中の「### A ###」の箇所を有効にすると、データは取得できています。
ですので、pipeでうまく送信できていないか、そもそもサブプロセスが動いていないのか、そのような問題だと思います。しかし、問題の切り分けができません。どなたか、websocketとマルチプロセスに詳しい方、教えていただけないでしょうか。
追記
コンストラクタ__init__()のコードの順序を変えました。
Python
1 def __init__(self): 2 self.endpoint_public = 'wss://stream.bybit.com/v5/public/linear' 3 self.resv_conn, self.send_conn = mp.Pipe() 4 # 以下のようにコードの順序を変更 5 self.child = mp.Process(target=self.create_df, args=(self.resv_conn, )) 6 self.child.start() 7 self.child.join() 8 self.connect(self.endpoint_public)
回答1件
あなたの回答
tips
プレビュー