質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Q&A

解決済

1回答

2812閲覧

(結果を待たない)非同期処理 × multithread は可能でしょうか

giseta

総合スコア2

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

0グッド

4クリップ

投稿2020/12/24 22:18

編集2020/12/24 23:11

websocket からのPUSHを受け取り次第、DB(SQLite3)に格納する場面でのご相談です。

0 現状
PUSHを受け取り次第、「DB格納する処理※を発火・結果を待たずに次のPUSHに対応する」ようにしているのですが、ピークタイムで処理が集中するとどうしても処理が溜まってしまいます。
※1処理0.01~0.03secほどですがピークタイムでは90sec待ちになる場面もあります。

1 実現したいこと
処理が溜まっている間もCPUリソース※は十分に空きがあり、かつその処理順序を問わない・結果を待たないことからmultithread(?)でできる限り早く処理を終わらせたいです。
※ Ryzen3900x環境、12コア24スレッドです

そこまで突飛な話ではないと思うのですが、なかなか解決策が見つかりません。
ご教示いただけると助かります。サンプルコードもいただけると涙が出ます。

2 備考
・現状は見よう見まねでtwisted × crochetでfire and forgetを実現しています。
・実現できればtwistedにこだわりません。asyncio × multiprocessingでサクっとできるよ!等があれば非常に助かります。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

y_waiwai

2020/12/24 23:15

その実現しているというコードを提示しましょう それがないとはなしがすすめません
t_obara

2020/12/25 03:14

> ピークタイムで処理が集中するとどうしても処理が溜まって この場合のボトルネックが何なのかよく確認して進めた方が良いと思いますが、明確ですか?
giseta

2020/12/27 16:33 編集

--
guest

回答1

0

ベストアンサー

Twisted + crochet はかなり渋いチョイスですね。Websocket を扱うなら websockets が良さそうです。Python の multiprocessing はしんどいのでまずは threading を使うと良いでしょう(それでもしんどいのだが)。ちなみに非同期処理とマルチスレッドは直接関係ないのでご注意を。イメージとしては Websocket サーバのスレッドと DB に書き込むスレッドを立てて間にキューをもたせて Websocket スレッドからどんどんキューにデータ投げ入れて DB サーバはひたすら書き込む作業をするって感じですね。適当に書き下すとこんなイメージかな。

Python

1import asyncio 2import websockets 3import threading 4import sql # 適当なやーつ 5import queue 6import functools 7import time 8 9async def echo(websocket, path, q): 10 async for message in websocket: 11 await q.put(message) 12 13def serve_websocket(q): 14 handler = functools.partial(push_and_echo, q=q) # キューと DB を注入(適用と呼ぶ)した関数を作る 15 server = websockets.serve(handler, "localhost", 8765) 16 asyncio.get_event_loop().run_until_complete(server) 17 asyncio.get_event_loop().run_forever() 18 19def push(q, conn): 20 # バッファー処理をしているが、最近のライブラリは賢いからしなくてもいいかもしれないし、必要かもしれない 21 buffer = [] 22 prev = time.time() 23 MAX_BUF = 10 24 TIMEOUT_SEC = 0.5 25 While True: 26 now = time.time() 27 duration = now - prev 28 if len(buffer) < MAX_BUF && duration < TIMEOUT_SEC: 29 buffer.append(q.get()) 30 else: 31 conn.inserts(buffer) 32 buffer = [] 33 prev = now 34 35if __name__ == '__main__': 36 db = sql.conn('/tmp/sqlite3.db') 37 q = queue.Queue() 38 # daemon=True にしておくと親スレッド(= メインスレッド)が死んだときに子スレッドも死ぬので楽 39 t_server = threading.Thread(target=serve_websocket,args=(q,), daemon=True) 40 t_push = threading.Thread(target=push,args=(q, db), daemon=True) 41 t_server.start() 42 t_push.start() 43 44 While True: 45 time.sleep(1)

ほぼ霊感で書いてますがだいたいこんな感じです。まあバッファの部分については今日日の気の利いた DB や、ORM 使ってのアクセスだったらこんなん気にしなくてもよく、echo 関数から直接ぶん投げても大丈夫なので、そっちを見直したほうがよいかもしれません。

投稿2020/12/26 04:25

A_kirisaki

総合スコア2853

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

giseta

2020/12/27 16:25

一次回答になりますが、非常に参考になりました!ありがとうございます。 ふわっとしていた疑問を書き出していただいたようです。ご教示いただいたコードを参考に試行錯誤してみます。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問