🎄teratailクリスマスプレゼントキャンペーン2024🎄』開催中!

\teratail特別グッズやAmazonギフトカード最大2,000円分が当たる!/

詳細はこちら
Django

DjangoはPythonで書かれた、オープンソースウェブアプリケーションのフレームワークです。複雑なデータベースを扱うウェブサイトを開発する際に必要な労力を減らす為にデザインされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

2325閲覧

asyncioを使ったwebsocketの処理

taro_yamada

総合スコア55

Django

DjangoはPythonで書かれた、オープンソースウェブアプリケーションのフレームワークです。複雑なデータベースを扱うウェブサイトを開発する際に必要な労力を減らす為にデザインされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2020/12/29 00:27

編集2020/12/29 14:21

以下のとおり、djangoのカスタムコマンドでasyncioを使ってwebsocketのデータを加工しています。
なお、元のソースコードはものすごく長いので、省略しています。

デバッグで追ったところ、method3のreturnまではいけたのですが、method2のreturn out_layerで止まってしまいます。
どなたかアドバイスをいただけないでしょうか?

(asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。)

実行したいことは、save_dataが終わった後に、method2と新たなメッセージの受信を同時に始めたいです。

python

1class Command(BaseCommand): 2 3 def handle(self, *args, **kwargs): 4 5 """ループの開始""" 6 loop = asyncio.get_event_loop() 7 loop.create_task(self.stream()) 8 print('start receiving') 9 try: 10 loop.run_forever() 11 except KeyboardInterrupt: 12 exit() 13 14 async def stream(self): 15 uri = 'ws://localhost:1234/' 16 17 async with websockets.connect(uri, ping_interval=None) as ws: 18 while not ws.closed: 19 response = await ws.recv() 20 content = json.loads(response) 21 task = loop.create_task(self.save_pushdata(content, sid)) 22 task.add_done_callback(self.finish) 23 24 async def save_pushdata(self, content, sid): 25 loop = asyncio.get_event_loop() 26 newData = list( 27 symbol=content['****'], 28 ) 29 newData.save() 30 task = loop.create_task(self.method2(content, sid)) 31 task.add_done_callback(self.finish) 32 save_pushdata_sid = str(sid) + 'save_pushdata' 33 return save_pushdata_sid 34 35 async def method2(self, content): 36 37 args1 = self.method4(content) 38 args2 = self.method5(content) 39 self.method3(args1, args2) 40 method2_sid = str(sid) + 'method2' 41 return method2_sid 42 43 def method3(self, content): 44 print('123') 45 return content 46 47 def method4(self, content): 48 newData = list( 49 symbol=content['****'], 50 ) 51 newData.save() 52 return content['****'] 53 54 def method5(self, content): 55 newData = list( 56 symbol=content['****'], 57 ) 58 newData.save() 59 return content['****'] 60 61 def finish(self, text): 62 logging.info('%s', text.result()) 63

python

1 2 def handle(self, *args, **kwargs): 3 4 """ループの開始""" 5 loop = asyncio.get_event_loop() 6 loop.create_task(self.stream()) 7 print('start receiving') 8 try: 9 loop.run_forever() 10 except KeyboardInterrupt: 11 exit() 12 13 async def stream(self): 14 uri = 'ws://localhost:1234/' 15 16 async with websockets.connect(uri, ping_interval=None) as ws: 17 while not ws.closed: 18 response = await ws.recv() 19 content = json.loads(response) 20 21 executor = concurrent.futures.ProcessPoolExecutor() 22 queue = asyncio.Queue() 23 24 for i in range(10): 25 queue.put_nowait(i) 26 27 async def proc(q): 28 while not q.empty(): 29 i = await q.get() 30 future = loop.run_in_executor(executor, save_pushdata(content, sid), i) 31 await future 32 33 tasks = [proc(queue) for i in range(4)] # 4 = number of cpu core 34 return await asyncio.wait(tasks) 35 36def save_pushdata(self, content, sid): 37 loop = asyncio.get_event_loop() 38 newData = list( 39 symbol=content['****'], 40 ) 41 newData.save() 42 self.method2(content, sid)

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

asyncioを使っているとエラーメッセージが表示できず、なぜ止まっているか判断が難しいです。

asyncio自身はエラーメッセージに干渉しません。
django 側の設定や実行環境の問題ではないでしょうか?

djangoのカスタムコマンドについては知りませんが、コードには多数の問題があるので、
エラーメッセージを得られる環境設定を先に目指した方が良いと思います。

追記: カスタムコマンドではなく、djangoに依存しない通常のスクリプトとして
同様のコードを実装・実行しエラーメッセージは得られますか?

(モデル関連のコードは省き、asyncio/websockets の部分だけ動作確認する)


未定義のメソッド: save_data -> save_pushdata ?

python

1 await self.save_pushdata(content)

組み込みのリストはキーワード引数やsave()メソッドを持ちません。

python

1newData = list( 2 symbol=content['****'], 3 ) 4newData.save()

model の save() を呼び出したい?


python

1async def method2(self, content): 2 ... 3 return out_layer

コルーチンは最低限一つ以上の await が必要です。
内容次第では、コルーチンにする必要があるかどうかから検討された方が良いと思います。

  • method4, method5 が未定義
  • method3 の引数が一致しない、等

投稿2020/12/29 01:02

編集2020/12/29 01:22
teamikl

総合スコア8731

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2020/12/29 01:40

ご回答ありがとうございます。 とにかく元のソースコードが長く、djangoの別のクラスで様々なメソッドも定義してしまっていて、djangoでないスクリプトで実行するのが困難な状況に陥ってしまっています涙 <エラーメッセージについて> djangoだけの場合エラーメッセージを出すので、asyncioの方が原因ではないかと思っています。 <メソッドの定義エラー> ちなみに、メソッドの定義エラーは、省略したときのミスで、直します。 <save()> djangoのORMのメソッドでDBにinsertしているので、一応問題なく動いています。こちらも省略したときの表現が悪かったです。 <コルーチン化> 問題は、コルーチンにする必要があるかどうかという点です。 目的として、メッセージを受信してからIO処理を含むいろいろなことをやっている間に次々とメッセージが溜まってtimeoutになってしまうという課題感から、並列処理を目指しています。 データ受信とその後の処理を切り分けて並列処理をするためにコルーチン化以外で適切な方法はありますでしょうか? 初心者で、どこにawaitを付けることができるかも今一つ理解していません。すみません。 ちなみに、method4とmethod5では、省略していますが、save()と同様にdjangoのORMを使ってDBへの書き込みをしています。
teamikl

2020/12/29 03:02

エラーメッセージの表示に関しては、 asyncio単体で利用する場合も特に問題ないので、 django/asyncio で発生する問題なのかな? どこかで例外が適切に伝搬されてないのだと思います。 djangoのORMに依存するモデル関連のコードは省き、 asyncio/websockets の部分だけで動作確認してみてください。 > どこにawaitを付けることができるか これに関しては、method3~5 がもう少し具体的でないと、わかりませんが、 await を付けることができるのは、コールーチン、future, task 等 __await__ を実装するオブジェクトです。 通常は、何かの完了待ちをする処理を実装したオブジェクトです。 while not ws.closed:   ...   await self.save_pushdata(content) では、save_pushdata が完了するまで待ってから次の読み出しとなるので、 完了を待つ必要がないなら、ここは await 不要で task にするべきだと思います。 例えば、save_pushdata はコルーチンにはせずに、task を返す関数にする、等。 ==== ちなみに、「並行」処理です。 await 迄処理 → asyncio のイベントループに戻る →他のコルーチンを処理 → await の次から処理を再開~ と順番に実行されるので、 何処かで時間の掛かる処理があると全体の遅延に繋がります。 コードの見かけは複数の関数を同時に実行してるように見えますが、 内部ではawait の時点で切り替えて実行されます。 完了待ちが終了したもの次第再開される為、 どの関数が次に再開されるか決まってないという点が非同期です。 最初は「非同期」と「並列」「平行」の区別が解りにくいかもしれません。
taro_yamada

2020/12/29 03:24

save_pushdata(content)をtaskを返すように変更したところ回り始めました。 ただ、contentに次の値が入ってきているのですが、なぜか2回目以降はtask = loop.create_task(self.save_pushdata(content))のループが始まりません。
teamikl

2020/12/29 03:37

> save_pushdata(content)をtaskを返すように変更した だと > loop.create_task(self.save_pushdata(content)) は、task が2重になってませんか? save_pushdata() が返すのは task オブジェクト create_task() に必要なのはコルーチンです。(※ ここでは厳密に task != コルーチン) 恐らくエラーが出るはずだけど、 エラーメッセージが見れない原因を先に調べたほうが良さそうですね。
teamikl

2020/12/29 04:24

django 3.1.4/asyncio 利用でエラーメッセージの表示確認出来ました。 例外がどこかで補足されてエラーが表示されてない、もしくは コードが期待通りに実行されてない可能性もありますね。 ---- 他の問題点。 エラーメッセージ表示の問題が解消すれば簡単に修正できると思います。 > self.method3(args1, args2) メソッドの定義と引き数が一致しない > return out_layer 変数 out_layter は未定義 > task.add_done_callback(self.finish('test')) コールバック登録になっていない。 finishを呼び出してその戻り値をcallbackに登録している。
taro_yamada

2020/12/29 04:28

task.add_done_callback(self.finish)のfinishの引数を外したらなぜか動き始めました・・・ しかも、ほとんどすべて自分が望んでいた通りの動きをしています。 (DB処理等が終わる前に次のpushdataの受け取り) かなり、申し訳ないソースコードでご対応いただいて本当にありがとうございました。 ちなみに、taskが二重のところですが、save_pushdataのコルーチンの中でmethod2のコルーチンがループしていて二重という意味でしょうか? 現状、問題なく動いていますが、やり方に問題がありますでしょうか?
teamikl

2020/12/29 04:50

> 現状、問題なく動いていますが、やり方に問題がありますでしょうか? どの様に修正されたのか把握してないので不明な部分です。 >DB処理等が終わる前に次のpushdataの受け取り を解決したのは、await self.save_pushdata() が処理をブロックしていた点ですね。 動いているが、冗長になってないか不安であれば、 質問のコードを修正して頂ければ確認できます。 想定した修正は、 - awaitself. save_pushdata(content) -> self.save_data(content) - save_data 関数は async を外し、最後に return task 「2重に」というのは save_data 関数内で task を作るので、 その戻り値のtaskに再び loop.create_task を使う必要はないという点です。
teamikl

2020/12/29 04:58 編集

> task.add_done_callback(self.finish)のfinishの引数を外したらなぜか動き始めました・・・ future オブジェクトがコールバックの引数に渡されて future.result() で結果を取り出しみたいな感じになってると思います。 add_done_callbackの時点で引数を渡したい場合は、 functools.partial も使えます https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.add_done_callback とりあえずは、期待通りに動いたようなのは良かったですが、 エラーメッセージが表示されない点は気になりますね。
taro_yamada

2020/12/29 06:40

最終のバージョンに直しました。 ついでに処理の管理をするためにsidを付けてみました。 ちなみに、save_data()はsave_pushdata()の誤りです。失礼しました。 さて、save_pushdata()もmethod2()もstrを戻していますが動いてしまっています・・・。add_done_callbackでfutureを戻しているからこれでOKということでしょうか? >「2重に」というのは save_data 関数内で task を作るので、 >その戻り値のtaskに再び loop.create_task を使う必要はないという点です。 この文が少し難しくてわかりにくかったですが、 save_data内でmethod2のtaskを作る。 すなわち、save_dataの戻り値はsave_data内で宣言したtaskにするといけない ということでいいですか? ほんの少しずつですがasyncioの動きがわかってきたような気がします。とにかく難しいですね!
taro_yamada

2020/12/29 06:42

ちなみに、エラーメッセージが表示されないのは、djangoでasyincioを使い始めたときからずっと表示されていないので、やはり、djangoとasyncioの関係性にあるのかもしれません。 かれこれ1か月くらいエラーメッセージなしにデバッグしています笑 改めて調べてみます。
teamikl

2020/12/29 10:35

>さて、save_pushdata()もmethod2()もstrを戻していますが動いてしまっています・・・。 エラーメッセージを得られないと、 編集してるコードと実行してるコードが違う可能性もあるので、 前提から見直した方が良さそうです。 >新しい実行環境、新しいプロジェクトで djangoとasyncioを使った最小限のコードで試してみる、等。 私の視点からすると method3 の呼び出しも引数が違いエラーが出るはずなのに、 動いてしまうのも疑問なので、質問に掲載されたコードと実際のコードに大きな隔たりがありそうで、現状の提示されたコードでは正確な事が何も解りません。 2重に~というのは、 - stream 関数内で create_task - save_pushdata 関数内でも create_task という部分を指します。
teamikl

2020/12/29 11:04 編集

method3 の呼び出しに関しては > TypeError: method3() takes 2 positional arguments but 3 were given というエラーが出るはずです。 ---- method2~method5 のコードに await を使う箇所がないのであれば、 非同期になってないので、タスクにする必要もないかもしれません。 コルーチン(method2) →create_task(save_pushdata内) → create_task(stream内) となっていますが、 タスク等の入れ物を余分に作ってるだけで、 実際のコードは同期的に実行されてます。 (await が時間の掛かる処理の待機から、他のコードの実行に処理を切り替えるタイミングなので、 await を使ってないコードでは、非同期の恩恵を付けられません) 追記: もしバックグラウンドで処理したいなら、 asyncioのタスクよりもconcurrent.future の方が適切です。 asyncioから使うなら、run_in_eecutor等。 DBへの保存部分もasyncioで処理したい場合は、 django の ORM が非同期処理に対応してるかどうかを調べると良いと思います。 (この辺りのdjangoの最近の情報には詳しくないので、言及のみ) ---- > かれこれ1か月くらいエラーメッセージなしにデバッグしています笑 > 改めて調べてみます。 ロギングの設定でエラー出力を無効にしてないか、等も調べてみてはどうでしょう。 https://docs.djangoproject.com/en/3.1/topics/logging/ 一応、新規プロジェクトで何も特別な設定してない状態でも、 django command で asyncio を使ったコードでエラー表示は確認出来ました。
taro_yamada

2020/12/29 12:58

methodの中で使っているのはkerasのpredict()とdjangoのORMとpandasとnumpyです。 どれもawaitを使えないので、concurrent.futureに切り替えたほうがいいのかもしれません。 ちなみに、これまでは、ORM→numpy、pandas→predict→ORMの処理をしている間にpushがtimeoutしてしまうといった状態だったのが、今回のawaitなしのasyncioで、logでsidの進行状況を見ると、save_dataやmethodのsidよりも先に次のデータ受信ができているように見えました。 これは、awaitで作ったsave_dataやmethodは、その下に非同期処理に対応したmethodがなくても一度上のネストに戻っていくということでしょうか? もし、awaitの時点で順番を変えて受信だけ優先的にしてもらえらばいいという観点でいけば、最低限はクリアできていそうな気がしますが、concurrent.futureの方が並列処理の効率がいいということでよろしいでしょうか? なお、method3の引数はソースを省略した際の転記ミスです。 色々と教えていただいてありがとうございます。助かっています。
teamikl

2020/12/29 13:35

> concurrent.futureの方が並列処理の効率がいいということでよろしいでしょうか? await のないコードの場合は、期待する並列処理にもなってません。 質問に提示されたコード内で非同期なのは await でデータ受信の箇所のみですが、 await のないコードを実行してる間は、コルーチンであったとしても データ受信処理は止まってます。 現在のコードでは 1. データ受信 await ws.recv() 2. タスク生成 3. 2回目のデータ受信待機 await ws.recv() 4. タスク実行開始 (この間のデータ受信は中断されて出来ない) なので、実質タスクを使わないで書いた場合と処理時間は変わらない でコードのみ複雑になってる状態です。 優先的に処理という点は、タスクの実行時間が短い場合はそのように思うかもしれませんが、保証された動作ではありません。 コルーチンのコードは自動でバックグラウンド動作しているわけではなくて、 待ち時間が掛かる処理を await ~ に記述し、 await により適切に実行待ちの処理を切り替える必要があります。 恐らくここで望まれる動作は、データ受信後に即次のデータ受信を待機し、 バックグラウンドでDBへの保存だと思いますが、 await を使うコードがない場合は、 選択肢としては concurrent.future (asyncioから使う場合は run_in_executor) になると思います。
taro_yamada

2020/12/29 14:22

即席でrun_in_executorのバージョンを作ってみました。 大体、こんなイメージでいいのでしょうか?
teamikl

2020/12/29 18:38 編集

>大体、こんなイメージでいいのでしょうか? 恐らく期待する挙動になってないと思います。 (単体で実行可能なコードでないと、こちらでは動作確認できません) エラーメッセージが読めれば気付ける事ばかりなので、 手探りでコードを書くよりも、先にそちらの問題を解決した方がよさそうです。 どうしても解決できない場合は、 django を使わずにコードを書いてテストして、 django command からはそのモジュールを呼び出すだけにするような工夫をして見て下さい。 (django に依存する部分を切り分けてモジュールを設計します) ---- >loop.run_in_executor(executor, save_pushdata(content, sid), i) このコードでは、save_pushdata を呼び出してから、 その戻り値を run_in_executor に渡してます。 run_in_executor で save_pushdata を実行したいはずなので、 関数と引数を別々にrun_in_executor に渡すようにしてください。 run_in_executor(executor, func(arg)) # 間違い run_in_executor(executor, func, arg) # OK https://docs.python.org/ja/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor ---- - queue や proc はwhileループの外側に出した方が良いです。 - asyncio.wait(tasks) は終了待機する為、次の await ws.recv() が実行されません。 タスク完了まで次のデータを読めないので、処理時間次第では一番最初にあった問題が再発すると思います。
teamikl

2020/12/29 17:56 編集

以前のコメントに誤解があるようなので、説明しておきます。 >save_pushdata()もmethod2()もstrを戻していますが動いてしまっています・・・。 関数定義に async が付いてる場合 save_pushdata() の戻り値は str ではなくコルーチン オブジェクトです。 create_task に str を渡してるわけではありません。
taro_yamada

2020/12/30 01:42

ありがとうございました。 次のスレッドを立ててみたいと思います。 またよろしくお願いします。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.36%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問