回答編集履歴

6

コードレイアウト調整、不要な処理を消去。内容に変更はありません。

2024/12/11 23:43

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -49,7 +49,6 @@
49
49
 
50
50
  追記: 動作サンプルコード
51
51
  ```python
52
-
53
52
  import time
54
53
  import asyncio
55
54
  from concurrent.futures import ThreadPoolExecutor
@@ -60,7 +59,6 @@
60
59
  TASK_LIMIT = 5
61
60
  QUEUE_SIZE = 10
62
61
  INTERVAL = 10
63
-
64
62
  DATA_LIST = [
65
63
  {"key": "a", "value": 1},
66
64
  {"key": "b", "value": 2},
@@ -106,11 +104,9 @@
106
104
  await semaphore.acquire() # 複数実行中のタスク完了を待つ
107
105
  future = loop.run_in_executor(executor, heavy_task, item)
108
106
  future.add_done_callback(done)
109
- await asyncio.sleep(0)
110
107
 
111
108
  def debug(func):
112
109
  # タスクの実行タイム計測
113
- import time
114
110
  from datetime import datetime
115
111
  from functools import wraps
116
112
 

5

コード修正: 変数名修正、キューサイズ等の調整用変数をファイル冒頭へ移動

2024/12/11 23:39

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -49,6 +49,7 @@
49
49
 
50
50
  追記: 動作サンプルコード
51
51
  ```python
52
+
52
53
  import time
53
54
  import asyncio
54
55
  from concurrent.futures import ThreadPoolExecutor
@@ -56,13 +57,17 @@
56
57
 
57
58
  logger = logging.getLogger(__name__)
58
59
 
60
+ TASK_LIMIT = 5
61
+ QUEUE_SIZE = 10
62
+ INTERVAL = 10
63
+
59
64
  DATA_LIST = [
60
65
  {"key": "a", "value": 1},
61
66
  {"key": "b", "value": 2},
62
67
  {"key": "c", "value": 3},
63
68
  ]
64
69
 
65
- def producer(data=DATA_LIST, interval=10):
70
+ def producer(data=DATA_LIST, interval=INTERVAL):
66
71
  # sync generator
67
72
  while True:
68
73
  yield from data
@@ -85,7 +90,7 @@
85
90
 
86
91
  async def consumer_task(queue):
87
92
  logger.info("consumer start")
88
- semapho = asyncio.Semaphore(5) # 同時実行数
93
+ semaphore = asyncio.Semaphore(TASK_LIMIT) # 同時実行数
89
94
  loop = asyncio.get_event_loop()
90
95
 
91
96
  def done(future):
@@ -93,12 +98,12 @@
93
98
  result = future.result()
94
99
  logger.debug(f"task done {result=}")
95
100
  queue.task_done()
96
- semapho.release()
101
+ semaphore.release()
97
102
 
98
103
  with ThreadPoolExecutor(thread_name_prefix="worker") as executor:
99
104
  async for item in consumer(queue):
100
105
  logger.debug(f"C: {item=}")
101
- await semapho.acquire() # 複数実行中のタスク完了を待つ
106
+ await semaphore.acquire() # 複数実行中のタスク完了を待つ
102
107
  future = loop.run_in_executor(executor, heavy_task, item)
103
108
  future.add_done_callback(done)
104
109
  await asyncio.sleep(0)
@@ -131,7 +136,7 @@
131
136
 
132
137
  async def main():
133
138
  # producer->consumer へ値を渡す、非同期キュー
134
- queue = asyncio.Queue(maxsize=10)
139
+ queue = asyncio.Queue(maxsize=QUEUE_SIZE)
135
140
 
136
141
  async with asyncio.TaskGroup() as tg:
137
142
  task1 = tg.create_task(producer_task(queue, producer()))
@@ -148,5 +153,4 @@
148
153
  pass
149
154
  finally:
150
155
  logger.info("Done.")
151
-
152
156
  ```

4

文章・用語の校正

2024/12/11 19:42

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -17,10 +17,10 @@
17
17
  別スレッドでの実行は to_thread, run_in_executor 等。
18
18
 
19
19
 
20
- 追記2: 要点が埋もれてしまったので、同期ジェネレータから非同期読み出し。for文は使わずに
20
+ 追記2: 要点が埋もれてしまったので、ブロッキング処理を含むジェネレータを別スレッドで実行し非同期読み出し。for文は使わずに
21
21
 
22
22
  ```python
23
- # 同期ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む)
23
+ # ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む ⇒ asyncio のイベントループはブロックされない)
24
24
  value = await asyncio.to_thread(next, gen)
25
25
  ```
26
26
 

3

追記: 解決策の要点だった、同期ジェネレーターから値を非同期に読み出す方法を回答内に追記

2024/12/11 19:39

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -17,6 +17,14 @@
17
17
  別スレッドでの実行は to_thread, run_in_executor 等。
18
18
 
19
19
 
20
+ 追記2: 要点が埋もれてしまったので、同期ジェネレータから非同期読み出し。for文は使わずに
21
+
22
+ ```python
23
+ # 同期ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む)
24
+ value = await asyncio.to_thread(next, gen)
25
+ ```
26
+
27
+ ----
20
28
  解決策B
21
29
  タスク生成スケジュールの見直し、
22
30
  現状のコードではタスクの内容次第では、処理速度が追いつかないことがあるかもしれません。

2

回答例の実装サンプルコードを追記

2024/12/11 18:52

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -9,7 +9,7 @@
9
9
  解決策A;
10
10
  同期 <=> 非同期のやりとりとして、まず思いつくのは、
11
11
  janus というライブラリの同期<=>非同期キューを用いると
12
- 例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。
12
+ 例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。(追記: 標準ライブラリの asyncio.Queue でも工夫すれば可能)
13
13
 
14
14
  実質、同期ジェネレーターで生成する値を、
15
15
  コルーチン側で非同期ジェネレーターとして扱う事が可能になります。
@@ -37,3 +37,108 @@
37
37
  何か重たい処理として仮置きしてるのかもしれませんが、
38
38
  コルーチン内での重たい処理は別スレッドで行いましょう。
39
39
  実際のコード次第では問題になる場合があります。
40
+
41
+
42
+ 追記: 動作サンプルコード
43
+ ```python
44
+ import time
45
+ import asyncio
46
+ from concurrent.futures import ThreadPoolExecutor
47
+ import logging
48
+
49
+ logger = logging.getLogger(__name__)
50
+
51
+ DATA_LIST = [
52
+ {"key": "a", "value": 1},
53
+ {"key": "b", "value": 2},
54
+ {"key": "c", "value": 3},
55
+ ]
56
+
57
+ def producer(data=DATA_LIST, interval=10):
58
+ # sync generator
59
+ while True:
60
+ yield from data
61
+ logger.info(f"waiting {interval} sec")
62
+ time.sleep(interval)
63
+
64
+ async def producer_task(queue, sync_gen):
65
+ # 非同期: ジェネレーターから値を取り出す
66
+ logger.info("producer start")
67
+ while True:
68
+ item = await asyncio.to_thread(next, sync_gen)
69
+ logger.debug(f"P: {item=}")
70
+ await queue.put(item) # キューの空を待つ (maxsize)
71
+
72
+ async def consumer(queue):
73
+ # 非同期 generator: async for で扱う為の wrapper
74
+ while True:
75
+ item = await queue.get() # キューに値が入るのを待つ
76
+ yield item
77
+
78
+ async def consumer_task(queue):
79
+ logger.info("consumer start")
80
+ semapho = asyncio.Semaphore(5) # 同時実行数
81
+ loop = asyncio.get_event_loop()
82
+
83
+ def done(future):
84
+ # 同期タスクの実行完了時
85
+ result = future.result()
86
+ logger.debug(f"task done {result=}")
87
+ queue.task_done()
88
+ semapho.release()
89
+
90
+ with ThreadPoolExecutor(thread_name_prefix="worker") as executor:
91
+ async for item in consumer(queue):
92
+ logger.debug(f"C: {item=}")
93
+ await semapho.acquire() # 複数実行中のタスク完了を待つ
94
+ future = loop.run_in_executor(executor, heavy_task, item)
95
+ future.add_done_callback(done)
96
+ await asyncio.sleep(0)
97
+
98
+ def debug(func):
99
+ # タスクの実行タイム計測
100
+ import time
101
+ from datetime import datetime
102
+ from functools import wraps
103
+
104
+ @wraps(func)
105
+ def _func(*args, **kw):
106
+ result = None
107
+ start_time = datetime.now()
108
+ logger.debug(f"start {func.__name__}")
109
+ try:
110
+ result = func(*args, **kw)
111
+ except Exception as exc:
112
+ logger.error(f"error ", exc_info=exc)
113
+ end_time = datetime.now()
114
+ logger.debug(f"{result=} (time: {end_time - start_time})")
115
+ return result
116
+ return _func
117
+
118
+ @debug
119
+ def heavy_task(arg):
120
+ # 時間のかかるタスク。run_in_executor で実行する
121
+ time.sleep(5)
122
+ return arg
123
+
124
+ async def main():
125
+ # producer->consumer へ値を渡す、非同期キュー
126
+ queue = asyncio.Queue(maxsize=10)
127
+
128
+ async with asyncio.TaskGroup() as tg:
129
+ task1 = tg.create_task(producer_task(queue, producer()))
130
+ task2 = tg.create_task(consumer_task(queue))
131
+
132
+ if __name__ == '__main__':
133
+ logging.basicConfig(
134
+ format="[%(asctime)s][%(threadName)-10s][%(levelname)-8s] %(message)s",
135
+ level=logging.DEBUG,
136
+ )
137
+ try:
138
+ asyncio.run(main())
139
+ except (KeyboardInterrupt, SystemExit):
140
+ pass
141
+ finally:
142
+ logger.info("Done.")
143
+
144
+ ```

1

タスクの数量制限の解決策の追記

2024/12/11 16:48

投稿

teamikl
teamikl

スコア8760

test CHANGED
@@ -27,6 +27,7 @@
27
27
  next() でジェネレーターから値を順次呼び出すことは可能です。
28
28
  (time.sleep(10) は不要になると思うので見直し候補)
29
29
 
30
+ 追記: Semaphore 等で数量制限
30
31
 
31
32
  ----
32
33
  ```python