回答ではありません
質問が全然良くならないので内容をサマリして、回答を加えておきます。
質問サマリ
前提
- FastAPIを使用したREST APIを作成
- CPUがボトルネックとなる処理で、1リクエスト0.5秒程度(嘘っぽい)
- asyncを使っているがI/O waitがないので単一スレッドでは限界
試したこと
- いくつか試した(ことになってる)が、(多分やってないので)コードが出せない
質問
同時リクエストに対する処理を速くする方法はないか?
仮定
- 0.5秒は暗号化とのことでhoge.pyとして暗号化サンプルを用意したが、数MB程度では0.5秒もかからない
→ 0.5秒程度かかるループ処理を作成し、プログラム開始時に回数を測定、計測時にその回数で同様な処理を流すことで同等の負荷をCPUにかけることにした(example_async.py)
コード
venv環境に入りpython example_async.py
でwebサーバが起動すれば、http://localhost:3000/encrypt/encryptでREST APIにアクセス出来る。
別端末からsh wget.sh
をすれば(サーバーをバックグラウンド起動でも可)、上記URLに複数リクエストを同時に投げる(50リクエスト)ことで、捌く時間を計測できる。手元の環境だと27.5秒程度かかる。
example_async.py(環境構築スクリプトで出力されます)
python
1from fastapi import FastAPI, Request
2from pydantic import BaseModel
3import time
4from threading import current_thread
5
6app = FastAPI()
7
8def measure(func, args):
9 s = time.perf_counter()
10 ret = func(*args)
11 e = time.perf_counter()
12 return (ret, e - s)
13
14class EncryptRequest(BaseModel):
15 body: str
16
17def f(b, count = 0, t=0.5):
18 number = 0
19 s = time.perf_counter()
20 while True:
21 is_timeup = (time.perf_counter() - s >= t)
22 number += 1
23 if count > 0:
24 if count == number:
25 return b.decode('utf8')
26 elif is_timeup:
27 return number
28
29count_500ms = 0
30@app.post("/encrypt/encrypt")
31async def encrypt_encrypt(request: EncryptRequest):
32 ret, t = measure(f, [request.body.encode('utf8'), count_500ms])
33 return {"encrypted": ret, "time": t, "thread": current_thread().name, "ident": current_thread().ident}
34
35if __name__ == "__main__":
36 count_500ms = f(b'', 0, 0.5)
37 print(count_500ms)
38 import uvicorn
39 uvicorn.run(app, host="0.0.0.0", port=8000)
wget.sh(環境構築スクリプトで出力されます)
bash
1for _ in `seq 50`; do
2 wget -q -O - --method=post --header 'Content-Type: application/json; charset=utf-8' --body-data='{"body":"こんにちは、世界!"}' 'http://localhost:8000/encrypt/encrypt'&
3done
4wait
調査
スレッドの使用
FastAPIは同期関数にすると、スレッドプールを使用するので、example_async.pyからasyncを除いただけのものです。通常の言語ではスレッドにするとCPUをコア数まで使用できるので、他に激しく動いているものがなければ、CPUxコア数近くの速度上昇が望めます。しかし、2023年4月18日現在のpythonは仕様としてnativeコードを使わない限り、スレッドによるCPU処理の並列化による高速化は期待できません。
https://docs.python.org/ja/3/glossary.html#term-global-interpreter-lock
コード
環境構築で出力されますが、僅かな変更なのでここに掲載はしません。
手元の環境では27.1秒程度かかり、asyncと変わりません(誤差程度)。
スレッドのプール数は30個。
プロセスプール(ProcessPoolExecutor)の使用
実際のCPUを使う処理を別プロセスのpythonで処理する方法です。pickleによりシリアライズしたデータ授受をQueue経由でプロセス間通信します。プロセスで並列化すればGILで頭打ちにならないため、CPUxコア数分に近い速度向上が望めます。
コード
プロセスのプール数はCPUxコア数です。
手元の環境ではCPU1個でコア数は2個(VirtualBoxで割り当てた数)で17.6秒程度。
example_async_process.py
python
1from fastapi import FastAPI, Request
2from pydantic import BaseModel
3import time
4from threading import current_thread
5from concurrent.futures import ProcessPoolExecutor
6import asyncio
7
8app = FastAPI()
9
10def measure(func, args):
11 s = time.perf_counter()
12 ret = func(*args)
13 e = time.perf_counter()
14 return (ret, e - s)
15
16class EncryptRequest(BaseModel):
17 body: str
18
19def f(b, count = 0, t=0.5):
20 number = 0
21 s = time.perf_counter()
22 while True:
23 is_timeup = (time.perf_counter() - s >= t)
24 number += 1
25 if count > 0:
26 if count == number:
27 return b.decode('utf8')
28 elif is_timeup:
29 return number
30
31count_500ms = 0
32@app.post("/encrypt/encrypt")
33async def encrypt_encrypt(request: EncryptRequest):
34 loop = asyncio.get_event_loop()
35 ret, t = await loop.run_in_executor(app.state.executor, measure, f, [request.body.encode('utf8'), count_500ms])
36 return {"encrypted": ret, "time": t, "thread": current_thread().name, "ident": current_thread().ident}
37
38if __name__ == "__main__":
39 app.state.executor = ProcessPoolExecutor()
40 count_500ms = f(b'', 0, 0.5)
41 print(count_500ms)
42 import uvicorn
43 uvicorn.run(app, host="0.0.0.0", port=8000)
回答
タイプ | ファイル名 | リクエスト数 | 時間[s] |
---|
非同期 | example_async.py | 50 | 27.5 |
スレッドプール | example_thread.py | 50 | 27.1 |
プロセスプール | example_async_process.py | 50 | 17.6 |
CPUxコア数=2
→プロセスプールを使用すればCPUバウンドな処理でもCPUxコア数に近い速度向上が望める
環境構築スクリプト
今回は面倒なのでUnixライクなOS用しか用意していません。
空のディレクトリを作ってbashで以下を実行するだけです。venvを使っているので、中のpythonスクリプトを使う場合はenv環境に入ってから使ってください。
create_env.sh
bash
1python3 -m venv env
2. env/bin/activate
3pip install -U pip
4pip install -U setuptools
5pip install cryptography==40.0.2 fastapi==0.95.1 uvicorn==0.21.1
6cat >hoge.py <<EOF
7from fastapi import FastAPI, Request
8from pydantic import BaseModel
9from cryptography.fernet import Fernet
10import time
11
12key = Fernet.generate_key()
13f = Fernet(key)
14app = FastAPI()
15
16def measure(func, args):
17 s = time.perf_counter()
18 ret = func(*args)
19 e = time.perf_counter()
20 return (ret, e - s)
21
22@app.get("/")
23async def root():
24 return {"message": "Hello World"}
25
26class EncryptRequest(BaseModel):
27 body: str
28
29@app.post("/encrypt/encrypt")
30async def encrypt_encrypt(request: EncryptRequest):
31 ret, t = measure(f.encrypt, [request.body.encode('utf8')])
32 return {"encrypted": ret, "time": t}
33
34@app.post("/encrypt/decrypt")
35async def encrypt_decrypt(request: EncryptRequest):
36 ret, t = measure(f.decrypt, [request.body.encode('utf8')])
37 return {"decrypted": ret, "time": t}
38
39if __name__ == "__main__":
40 import uvicorn
41 uvicorn.run(app, host="0.0.0.0", port=8000)
42EOF
43python hoge.py&
44sleep 5
45for _ in `seq 10`; do
46 wget -q -O - --method=post --header 'Content-Type: application/json; charset=utf-8' --body-data='{"body":"こんにちは、世界!"}' 'http://localhost:8000/encrypt/encrypt'
47done
48kill $(jobs -p)
49wait
50cat >wget.sh <<EOF
51for _ in \`seq 50\`; do
52 wget -q -O - --method=post --header 'Content-Type: application/json; charset=utf-8' --body-data='{"body":"こんにちは、世界!"}' 'http://localhost:8000/encrypt/encrypt'&
53done
54wait
55EOF
56cat >example_async.py <<EOF
57from fastapi import FastAPI, Request
58from pydantic import BaseModel
59import time
60from threading import current_thread
61
62app = FastAPI()
63
64def measure(func, args):
65 s = time.perf_counter()
66 ret = func(*args)
67 e = time.perf_counter()
68 return (ret, e - s)
69
70class EncryptRequest(BaseModel):
71 body: str
72
73def f(b, count = 0, t=0.5):
74 number = 0
75 s = time.perf_counter()
76 while True:
77 is_timeup = (time.perf_counter() - s >= t)
78 number += 1
79 if count > 0:
80 if count == number:
81 return b.decode('utf8')
82 elif is_timeup:
83 return number
84
85count_500ms = 0
86@app.post("/encrypt/encrypt")
87async def encrypt_encrypt(request: EncryptRequest):
88 ret, t = measure(f, [request.body.encode('utf8'), count_500ms])
89 return {"encrypted": ret, "time": t, "thread": current_thread().name, "ident": current_thread().ident}
90
91if __name__ == "__main__":
92 count_500ms = f(b'', 0, 0.5)
93 print(count_500ms)
94 import uvicorn
95 uvicorn.run(app, host="0.0.0.0", port=8000)
96EOF
97cp -p example_async.py example_thread.py
98patch -p1 <<EOF
99diff -up a/example_thread.py b/example_thread.py
100--- a/example_thread.py 2023-04-18 12:00:00.000000000 +0900
101+++ b/example_thread.py 2023-04-18 12:00:00.000000000 +0900
102@@ -28,7 +28,7 @@ def f(b, count = 0, t=0.5):
103
104 count_500ms = 0
105 @app.post("/encrypt/encrypt")
106-async def encrypt_encrypt(request: EncryptRequest):
107+def encrypt_encrypt(request: EncryptRequest):
108 ret, t = measure(f, [request.body.encode('utf8'), count_500ms])
109 return {"encrypted": ret, "time": t, "thread": current_thread().name, "ident": current_thread().ident}
110
111EOF
112cp -p example_async.py example_async_process.py
113patch -p1 <<EOF
114diff -up a/example_async_process.py b/example_async_process.py
115--- a/example_async_process.py 2023-04-18 12:00:00.000000000 +0900
116+++ b/example_async_process.py 2023-04-18 12:00:00.000000000 +0900
117@@ -2,6 +2,8 @@ from fastapi import FastAPI, Request
118 from pydantic import BaseModel
119 import time
120 from threading import current_thread
121+from concurrent.futures import ProcessPoolExecutor
122+import asyncio
123
124 app = FastAPI()
125
126@@ -29,10 +31,12 @@ def f(b, count = 0, t=0.5):
127 count_500ms = 0
128 @app.post("/encrypt/encrypt")
129 async def encrypt_encrypt(request: EncryptRequest):
130- ret, t = measure(f, [request.body.encode('utf8'), count_500ms])
131+ loop = asyncio.get_event_loop()
132+ ret, t = await loop.run_in_executor(app.state.executor, measure, f, [request.body.encode('utf8'), count_500ms])
133 return {"encrypted": ret, "time": t, "thread": current_thread().name, "ident": current_thread().ident}
134
135 if __name__ == "__main__":
136+ app.state.executor = ProcessPoolExecutor()
137 count_500ms = f(b'', 0, 0.5)
138 print(count_500ms)
139 import uvicorn
140EOF
141for f in example_*.py; do
142 echo "[$f]"
143 python "$f"&
144 sleep 5
145 time sh wget.sh
146 kill $(jobs -p)
147 wait
148done
149deactivate