回答編集履歴
6
コードレイアウト調整、不要な処理を消去。内容に変更はありません。
test
CHANGED
@@ -49,7 +49,6 @@
|
|
49
49
|
|
50
50
|
追記: 動作サンプルコード
|
51
51
|
```python
|
52
|
-
|
53
52
|
import time
|
54
53
|
import asyncio
|
55
54
|
from concurrent.futures import ThreadPoolExecutor
|
@@ -60,7 +59,6 @@
|
|
60
59
|
TASK_LIMIT = 5
|
61
60
|
QUEUE_SIZE = 10
|
62
61
|
INTERVAL = 10
|
63
|
-
|
64
62
|
DATA_LIST = [
|
65
63
|
{"key": "a", "value": 1},
|
66
64
|
{"key": "b", "value": 2},
|
@@ -106,11 +104,9 @@
|
|
106
104
|
await semaphore.acquire() # 複数実行中のタスク完了を待つ
|
107
105
|
future = loop.run_in_executor(executor, heavy_task, item)
|
108
106
|
future.add_done_callback(done)
|
109
|
-
await asyncio.sleep(0)
|
110
107
|
|
111
108
|
def debug(func):
|
112
109
|
# タスクの実行タイム計測
|
113
|
-
import time
|
114
110
|
from datetime import datetime
|
115
111
|
from functools import wraps
|
116
112
|
|
5
コード修正: 変数名修正、キューサイズ等の調整用変数をファイル冒頭へ移動
test
CHANGED
@@ -49,6 +49,7 @@
|
|
49
49
|
|
50
50
|
追記: 動作サンプルコード
|
51
51
|
```python
|
52
|
+
|
52
53
|
import time
|
53
54
|
import asyncio
|
54
55
|
from concurrent.futures import ThreadPoolExecutor
|
@@ -56,13 +57,17 @@
|
|
56
57
|
|
57
58
|
logger = logging.getLogger(__name__)
|
58
59
|
|
60
|
+
TASK_LIMIT = 5
|
61
|
+
QUEUE_SIZE = 10
|
62
|
+
INTERVAL = 10
|
63
|
+
|
59
64
|
DATA_LIST = [
|
60
65
|
{"key": "a", "value": 1},
|
61
66
|
{"key": "b", "value": 2},
|
62
67
|
{"key": "c", "value": 3},
|
63
68
|
]
|
64
69
|
|
65
|
-
def producer(data=DATA_LIST, interval=
|
70
|
+
def producer(data=DATA_LIST, interval=INTERVAL):
|
66
71
|
# sync generator
|
67
72
|
while True:
|
68
73
|
yield from data
|
@@ -85,7 +90,7 @@
|
|
85
90
|
|
86
91
|
async def consumer_task(queue):
|
87
92
|
logger.info("consumer start")
|
88
|
-
semapho = asyncio.Semaphore(
|
93
|
+
semaphore = asyncio.Semaphore(TASK_LIMIT) # 同時実行数
|
89
94
|
loop = asyncio.get_event_loop()
|
90
95
|
|
91
96
|
def done(future):
|
@@ -93,12 +98,12 @@
|
|
93
98
|
result = future.result()
|
94
99
|
logger.debug(f"task done {result=}")
|
95
100
|
queue.task_done()
|
96
|
-
semapho.release()
|
101
|
+
semaphore.release()
|
97
102
|
|
98
103
|
with ThreadPoolExecutor(thread_name_prefix="worker") as executor:
|
99
104
|
async for item in consumer(queue):
|
100
105
|
logger.debug(f"C: {item=}")
|
101
|
-
await semapho.acquire() # 複数実行中のタスク完了を待つ
|
106
|
+
await semaphore.acquire() # 複数実行中のタスク完了を待つ
|
102
107
|
future = loop.run_in_executor(executor, heavy_task, item)
|
103
108
|
future.add_done_callback(done)
|
104
109
|
await asyncio.sleep(0)
|
@@ -131,7 +136,7 @@
|
|
131
136
|
|
132
137
|
async def main():
|
133
138
|
# producer->consumer へ値を渡す、非同期キュー
|
134
|
-
queue = asyncio.Queue(maxsize=
|
139
|
+
queue = asyncio.Queue(maxsize=QUEUE_SIZE)
|
135
140
|
|
136
141
|
async with asyncio.TaskGroup() as tg:
|
137
142
|
task1 = tg.create_task(producer_task(queue, producer()))
|
@@ -148,5 +153,4 @@
|
|
148
153
|
pass
|
149
154
|
finally:
|
150
155
|
logger.info("Done.")
|
151
|
-
|
152
156
|
```
|
4
文章・用語の校正
test
CHANGED
@@ -17,10 +17,10 @@
|
|
17
17
|
別スレッドでの実行は to_thread, run_in_executor 等。
|
18
18
|
|
19
19
|
|
20
|
-
追記2: 要点が埋もれてしまったので、
|
20
|
+
追記2: 要点が埋もれてしまったので、ブロッキング処理を含むジェネレータを別スレッドで実行し非同期読み出し。for文は使わずに
|
21
21
|
|
22
22
|
```python
|
23
|
-
#
|
23
|
+
# ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む ⇒ asyncio のイベントループはブロックされない)
|
24
24
|
value = await asyncio.to_thread(next, gen)
|
25
25
|
```
|
26
26
|
|
3
追記: 解決策の要点だった、同期ジェネレーターから値を非同期に読み出す方法を回答内に追記
test
CHANGED
@@ -17,6 +17,14 @@
|
|
17
17
|
別スレッドでの実行は to_thread, run_in_executor 等。
|
18
18
|
|
19
19
|
|
20
|
+
追記2: 要点が埋もれてしまったので、同期ジェネレータから非同期読み出し。for文は使わずに
|
21
|
+
|
22
|
+
```python
|
23
|
+
# 同期ジェネレーターからの非同期読み出し (別スレッドでジェネレーター内部の処理は進む)
|
24
|
+
value = await asyncio.to_thread(next, gen)
|
25
|
+
```
|
26
|
+
|
27
|
+
----
|
20
28
|
解決策B
|
21
29
|
タスク生成スケジュールの見直し、
|
22
30
|
現状のコードではタスクの内容次第では、処理速度が追いつかないことがあるかもしれません。
|
2
回答例の実装サンプルコードを追記
test
CHANGED
@@ -9,7 +9,7 @@
|
|
9
9
|
解決策A;
|
10
10
|
同期 <=> 非同期のやりとりとして、まず思いつくのは、
|
11
11
|
janus というライブラリの同期<=>非同期キューを用いると
|
12
|
-
例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。
|
12
|
+
例えば別スレッドで無限ループを処理し、非同期側で値を受け取ることが可能です。(追記: 標準ライブラリの asyncio.Queue でも工夫すれば可能)
|
13
13
|
|
14
14
|
実質、同期ジェネレーターで生成する値を、
|
15
15
|
コルーチン側で非同期ジェネレーターとして扱う事が可能になります。
|
@@ -37,3 +37,108 @@
|
|
37
37
|
何か重たい処理として仮置きしてるのかもしれませんが、
|
38
38
|
コルーチン内での重たい処理は別スレッドで行いましょう。
|
39
39
|
実際のコード次第では問題になる場合があります。
|
40
|
+
|
41
|
+
|
42
|
+
追記: 動作サンプルコード
|
43
|
+
```python
|
44
|
+
import time
|
45
|
+
import asyncio
|
46
|
+
from concurrent.futures import ThreadPoolExecutor
|
47
|
+
import logging
|
48
|
+
|
49
|
+
logger = logging.getLogger(__name__)
|
50
|
+
|
51
|
+
DATA_LIST = [
|
52
|
+
{"key": "a", "value": 1},
|
53
|
+
{"key": "b", "value": 2},
|
54
|
+
{"key": "c", "value": 3},
|
55
|
+
]
|
56
|
+
|
57
|
+
def producer(data=DATA_LIST, interval=10):
|
58
|
+
# sync generator
|
59
|
+
while True:
|
60
|
+
yield from data
|
61
|
+
logger.info(f"waiting {interval} sec")
|
62
|
+
time.sleep(interval)
|
63
|
+
|
64
|
+
async def producer_task(queue, sync_gen):
|
65
|
+
# 非同期: ジェネレーターから値を取り出す
|
66
|
+
logger.info("producer start")
|
67
|
+
while True:
|
68
|
+
item = await asyncio.to_thread(next, sync_gen)
|
69
|
+
logger.debug(f"P: {item=}")
|
70
|
+
await queue.put(item) # キューの空を待つ (maxsize)
|
71
|
+
|
72
|
+
async def consumer(queue):
|
73
|
+
# 非同期 generator: async for で扱う為の wrapper
|
74
|
+
while True:
|
75
|
+
item = await queue.get() # キューに値が入るのを待つ
|
76
|
+
yield item
|
77
|
+
|
78
|
+
async def consumer_task(queue):
|
79
|
+
logger.info("consumer start")
|
80
|
+
semapho = asyncio.Semaphore(5) # 同時実行数
|
81
|
+
loop = asyncio.get_event_loop()
|
82
|
+
|
83
|
+
def done(future):
|
84
|
+
# 同期タスクの実行完了時
|
85
|
+
result = future.result()
|
86
|
+
logger.debug(f"task done {result=}")
|
87
|
+
queue.task_done()
|
88
|
+
semapho.release()
|
89
|
+
|
90
|
+
with ThreadPoolExecutor(thread_name_prefix="worker") as executor:
|
91
|
+
async for item in consumer(queue):
|
92
|
+
logger.debug(f"C: {item=}")
|
93
|
+
await semapho.acquire() # 複数実行中のタスク完了を待つ
|
94
|
+
future = loop.run_in_executor(executor, heavy_task, item)
|
95
|
+
future.add_done_callback(done)
|
96
|
+
await asyncio.sleep(0)
|
97
|
+
|
98
|
+
def debug(func):
|
99
|
+
# タスクの実行タイム計測
|
100
|
+
import time
|
101
|
+
from datetime import datetime
|
102
|
+
from functools import wraps
|
103
|
+
|
104
|
+
@wraps(func)
|
105
|
+
def _func(*args, **kw):
|
106
|
+
result = None
|
107
|
+
start_time = datetime.now()
|
108
|
+
logger.debug(f"start {func.__name__}")
|
109
|
+
try:
|
110
|
+
result = func(*args, **kw)
|
111
|
+
except Exception as exc:
|
112
|
+
logger.error(f"error ", exc_info=exc)
|
113
|
+
end_time = datetime.now()
|
114
|
+
logger.debug(f"{result=} (time: {end_time - start_time})")
|
115
|
+
return result
|
116
|
+
return _func
|
117
|
+
|
118
|
+
@debug
|
119
|
+
def heavy_task(arg):
|
120
|
+
# 時間のかかるタスク。run_in_executor で実行する
|
121
|
+
time.sleep(5)
|
122
|
+
return arg
|
123
|
+
|
124
|
+
async def main():
|
125
|
+
# producer->consumer へ値を渡す、非同期キュー
|
126
|
+
queue = asyncio.Queue(maxsize=10)
|
127
|
+
|
128
|
+
async with asyncio.TaskGroup() as tg:
|
129
|
+
task1 = tg.create_task(producer_task(queue, producer()))
|
130
|
+
task2 = tg.create_task(consumer_task(queue))
|
131
|
+
|
132
|
+
if __name__ == '__main__':
|
133
|
+
logging.basicConfig(
|
134
|
+
format="[%(asctime)s][%(threadName)-10s][%(levelname)-8s] %(message)s",
|
135
|
+
level=logging.DEBUG,
|
136
|
+
)
|
137
|
+
try:
|
138
|
+
asyncio.run(main())
|
139
|
+
except (KeyboardInterrupt, SystemExit):
|
140
|
+
pass
|
141
|
+
finally:
|
142
|
+
logger.info("Done.")
|
143
|
+
|
144
|
+
```
|
1
タスクの数量制限の解決策の追記
test
CHANGED
@@ -27,6 +27,7 @@
|
|
27
27
|
next() でジェネレーターから値を順次呼び出すことは可能です。
|
28
28
|
(time.sleep(10) は不要になると思うので見直し候補)
|
29
29
|
|
30
|
+
追記: Semaphore 等で数量制限
|
30
31
|
|
31
32
|
----
|
32
33
|
```python
|