回答編集履歴
2
Queue不要のため修正
answer
CHANGED
@@ -1,5 +1,3 @@
|
|
1
|
-
キューにポンポン投入して順番に実行していきます。
|
2
|
-
|
3
1
|
実行を止めるためのフラグのところはお行儀が悪いかもしれませんがとりあえずの実装です。
|
4
2
|
|
5
3
|
現状、clientでの処理(5秒間隔で1つずつ)に対して、serverからの送出が早いため、どんどんタスクはたまっていきます。
|
@@ -8,24 +6,14 @@
|
|
8
6
|
```python
|
9
7
|
(loggerとかの定義は質問でも省略されているので略)
|
10
8
|
|
11
|
-
from asyncio import Queue
|
12
|
-
|
13
|
-
# タスクを先入先出処理するためのキュー
|
14
|
-
que = Queue()
|
15
|
-
|
16
9
|
# タスク処理屋
|
17
10
|
executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread")
|
18
11
|
|
19
|
-
#
|
12
|
+
# 処理を止めるためのフラグ
|
20
13
|
working = {'state':True}
|
21
14
|
|
22
|
-
# 処理
|
23
|
-
async def process():
|
24
|
-
while working['state']:
|
25
|
-
board = await que.get()
|
26
|
-
logger.info(f"[TASKID:{board.get('id')}] 処理を開始します。")
|
27
|
-
executor.submit(event, board)
|
28
15
|
|
16
|
+
|
29
17
|
# 受信部分
|
30
18
|
async def stream():
|
31
19
|
uri = 'ws://127.0.0.1:5679/'
|
@@ -34,7 +22,7 @@
|
|
34
22
|
response = await ws.recv()
|
35
23
|
board = json.loads(response)
|
36
24
|
logger.info(f"[TASKID:{board.get('id')}] 受信しました。")
|
37
|
-
|
25
|
+
executor.submit(event, board)
|
38
26
|
logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。")
|
39
27
|
|
40
28
|
|
@@ -48,7 +36,7 @@
|
|
48
36
|
|
49
37
|
loop = asyncio.get_event_loop()
|
50
38
|
loop.create_task(stream())
|
51
|
-
|
39
|
+
|
52
40
|
try:
|
53
41
|
loop.run_forever()
|
54
42
|
except KeyboardInterrupt:
|
1
修正
answer
CHANGED
@@ -33,6 +33,7 @@
|
|
33
33
|
while not ws.closed:
|
34
34
|
response = await ws.recv()
|
35
35
|
board = json.loads(response)
|
36
|
+
logger.info(f"[TASKID:{board.get('id')}] 受信しました。")
|
36
37
|
await que.put(board)
|
37
38
|
logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。")
|
38
39
|
|