Twisted + crochet はかなり渋いチョイスですね。Websocket を扱うなら websockets が良さそうです。Python の multiprocessing
はしんどいのでまずは threading
を使うと良いでしょう(それでもしんどいのだが)。ちなみに非同期処理とマルチスレッドは直接関係ないのでご注意を。イメージとしては Websocket サーバのスレッドと DB に書き込むスレッドを立てて間にキューをもたせて Websocket スレッドからどんどんキューにデータ投げ入れて DB サーバはひたすら書き込む作業をするって感じですね。適当に書き下すとこんなイメージかな。
Python
1import asyncio
2import websockets
3import threading
4import sql # 適当なやーつ
5import queue
6import functools
7import time
8
9async def echo(websocket, path, q):
10 async for message in websocket:
11 await q.put(message)
12
13def serve_websocket(q):
14 handler = functools.partial(push_and_echo, q=q) # キューと DB を注入(適用と呼ぶ)した関数を作る
15 server = websockets.serve(handler, "localhost", 8765)
16 asyncio.get_event_loop().run_until_complete(server)
17 asyncio.get_event_loop().run_forever()
18
19def push(q, conn):
20 # バッファー処理をしているが、最近のライブラリは賢いからしなくてもいいかもしれないし、必要かもしれない
21 buffer = []
22 prev = time.time()
23 MAX_BUF = 10
24 TIMEOUT_SEC = 0.5
25 While True:
26 now = time.time()
27 duration = now - prev
28 if len(buffer) < MAX_BUF && duration < TIMEOUT_SEC:
29 buffer.append(q.get())
30 else:
31 conn.inserts(buffer)
32 buffer = []
33 prev = now
34
35if __name__ == '__main__':
36 db = sql.conn('/tmp/sqlite3.db')
37 q = queue.Queue()
38 # daemon=True にしておくと親スレッド(= メインスレッド)が死んだときに子スレッドも死ぬので楽
39 t_server = threading.Thread(target=serve_websocket,args=(q,), daemon=True)
40 t_push = threading.Thread(target=push,args=(q, db), daemon=True)
41 t_server.start()
42 t_push.start()
43
44 While True:
45 time.sleep(1)
ほぼ霊感で書いてますがだいたいこんな感じです。まあバッファの部分については今日日の気の利いた DB や、ORM 使ってのアクセスだったらこんなん気にしなくてもよく、echo
関数から直接ぶん投げても大丈夫なので、そっちを見直したほうがよいかもしれません。