teratail header banner
teratail header banner
質問するログイン新規登録

回答編集履歴

2

Queue不要のため修正

2020/12/30 03:24

投稿

退会済みユーザー
answer CHANGED
@@ -1,5 +1,3 @@
1
- キューにポンポン投入して順番に実行していきます。
2
-
3
1
  実行を止めるためのフラグのところはお行儀が悪いかもしれませんがとりあえずの実装です。
4
2
 
5
3
  現状、clientでの処理(5秒間隔で1つずつ)に対して、serverからの送出が早いため、どんどんタスクはたまっていきます。
@@ -8,24 +6,14 @@
8
6
  ```python
9
7
  (loggerとかの定義は質問でも省略されているので略)
10
8
 
11
- from asyncio import Queue
12
-
13
- # タスクを先入先出処理するためのキュー 
14
- que = Queue()
15
-
16
9
  # タスク処理屋
17
10
  executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="thread")
18
11
 
19
- # 実行を止めるためのフラグ
12
+ # 処理を止めるためのフラグ
20
13
  working = {'state':True}
21
14
 
22
- # 処理
23
- async def process():
24
- while working['state']:
25
- board = await que.get()
26
- logger.info(f"[TASKID:{board.get('id')}] 処理を開始します。")
27
- executor.submit(event, board)
28
15
 
16
+
29
17
  # 受信部分
30
18
  async def stream():
31
19
  uri = 'ws://127.0.0.1:5679/'
@@ -34,7 +22,7 @@
34
22
  response = await ws.recv()
35
23
  board = json.loads(response)
36
24
  logger.info(f"[TASKID:{board.get('id')}] 受信しました。")
37
- await que.put(board)
25
+ executor.submit(event, board)
38
26
  logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。")
39
27
 
40
28
 
@@ -48,7 +36,7 @@
48
36
 
49
37
  loop = asyncio.get_event_loop()
50
38
  loop.create_task(stream())
51
- loop.create_task(process())
39
+
52
40
  try:
53
41
  loop.run_forever()
54
42
  except KeyboardInterrupt:

1

修正

2020/12/30 03:24

投稿

退会済みユーザー
answer CHANGED
@@ -33,6 +33,7 @@
33
33
  while not ws.closed:
34
34
  response = await ws.recv()
35
35
  board = json.loads(response)
36
+ logger.info(f"[TASKID:{board.get('id')}] 受信しました。")
36
37
  await que.put(board)
37
38
  logger.info(f"[TASKID:{board.get('id')}] タスクをキューに投入しました。")
38
39