質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
86.02%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

Python concurrent.futuresの初歩的な質問

beluga00nm
beluga00nm

総合スコア11

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

2回答

0グッド

0クリップ

377閲覧

投稿2022/12/05 11:48

前提

Pythonでマルチスレッド、マルチプロセスの処理を勉強しようと思い、concurrent.futuresを触っています。
concurrent.futuresでは原則マルチスレッドとマルチプロセスは同じ書き方でできるようですが、
ThreadPoolExecutorとProcessPoolExecutorの違いについて疑問があります。
また、ThreadPoolExecutorではうまくいってもProcessPoolExecutorではうまくいかないので何が原因なのかを知りたいです。

Windows11のsurface laptop4で
CPUは
11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz 3.00 GHz
メモリは32GBのsurfaceです。
Python3.9.12です。Anacondaで実装しました。

実現したいこと

手始めに時間がかかる計算を行う関数を作りました。

Python

1def sample_func(num, result): 2 print("start:", str(num)) 3 the_list = [] 4 list1 = np.arange(0,num,1) 5 list2 = np.arange(0,num,1) 6 for i in list1: 7 for j in list2: 8 the_list = np.append(the_list, j*i) 9 10 answer = np.log10(sum(the_list)) 11 r = result 12 r.append(answer) 13 print("end:", str(num)) 14 return num

単純に1からnumまでの数字のリストを2つ作り、すべてのペアを掛け合わせて足すだけです。
num=400くらいだと10秒前後かかります。

これを

Python

1numbers_list = [397, 398, 399, 400] 2results = [] 3# 並列化 4with timer(): 5 with futures.ProcessPoolExecutor(max_workers=2) as executor: 6 task_list = [] 7 for i in numbers_list: 8 # 関数の実行部 9 task_list.append(executor.submit(sample_func, num = i, result=results)) 10 print("submit end") 11 true_results = [] 12 for future in task_list: 13 true_results.append(future.result()) 14 15 16# 結果表示 17print(results) #処理終わった順 18print(true_results) #入力した順 19print('completed')

で実行するとエラーが返ってきます。

Python

1submit end 2--------------------------------------------------------------------------- 3BrokenProcessPool Traceback (most recent call last) 4Input In [35], in <cell line: 5>() 5 12 true_results = [] 6 13 for future in task_list: 7---> 14 true_results.append(future.result()) 8 17 # 結果表示 9 18 print(results) #処理終わった順 10 11File ~\anaconda3\lib\concurrent\futures\_base.py:446, in Future.result(self, timeout) 12 444 raise CancelledError() 13 445 elif self._state == FINISHED: 14--> 446 return self.__get_result() 15 447 else: 16 448 raise TimeoutError() 17 18File ~\anaconda3\lib\concurrent\futures\_base.py:391, in Future.__get_result(self) 19 389 if self._exception: 20 390 try: 21--> 391 raise self._exception 22 392 finally: 23 393 # Break a reference cycle with the exception in self._exception 24 394 self = None 25 26BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

ProcessPoolExecutorではなくThreadPoolExecutorで行うと問題なく実行されます。
ProcessPoolExecutorの部分をThreadPoolExecutorに置き換えただけで他は何も変えていません。

ここで疑問が2つあります。


ThreadPoolExecutorで行ったときでも、通常のforループ処理で計算したときより早くなります(42.6589秒→29.5869秒)
マルチスレッドで高速化できるのはI/O処理といったCPUではない部分が律速しているときだと認識していたのですが、
なぜ42秒から29秒まで速くなったのでしょうか?
上記の関数(400*400個程度の数字のペアの計算)はCPUバインドな計算ではないということでしょうか?


ThreadPoolExecutorをProcessPoolExecutorに置き換えただけではエラーが出てくるのは何が理由でしょうか?
BrokenProcessPoolというエラーは何かが無理やりプロセスを止めているような感じだと思っているのですが、
コードを修正すればいいのか、環境を修正すればいいのか、さっぱりわかりません。
普段はJupyter Notebookですべて行っているのですが、futureは対話型ではうまくいかないと書いてあったので
.pyファイルにしてコマンドプロンプトから実行してみましたが、それでも同じエラーが返ってきました。

どなたか助けていただけないでしょうか。
初心者ですので、質問に不備があれば追記いたします。
よろしくお願いいたします。

以下のような質問にはグッドを送りましょう

  • 質問内容が明確
  • 自分も答えを知りたい
  • 質問者以外のユーザにも役立つ

グッドが多くついた質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

気になる質問をクリップする

クリップした質問は、後からいつでもマイページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

下記のような質問は推奨されていません。

  • 間違っている
  • 質問になっていない投稿
  • スパムや攻撃的な表現を用いた投稿

適切な質問に修正を依頼しましょう。

回答2

2

ベストアンサー

マルチスレッドで高速化できるのはI/O処理といったCPUではない部分が律速しているときだと認識していたのですが、なぜ42秒から29秒まで速くなったのでしょうか?

Why are numpy calculations not affected by the global interpreter lock?
に書かれてるように、numpyの計算の多くの部分はgilの制約を受けないので、マルチスレッドで高速化できます
「def sample_func(num, result):」内の計算に、そういうものがあるのでしょう

 

ThreadPoolExecutorをProcessPoolExecutorに置き換えただけではエラーが出てくるのは何が理由でしょうか?

concurrent.futures -- 並列タスク実行
の「ProcessPoolExecutor」のコード例のように、「def main():」の中に入れて、

python

1if __name__ == '__main__':

の中で「main()」を実行してみてください

投稿2022/12/05 12:47

編集2022/12/05 12:56
jbpb0

総合スコア7505

dameo, quickquip👍を押しています

良いと思った回答にはグッドを送りましょう。
グッドが多くついた回答ほどページの上位に表示されるので、他の人が素晴らしい回答を見つけやすくなります。

下記のような回答は推奨されていません。

  • 間違っている回答
  • 質問の回答になっていない投稿
  • スパムや攻撃的な表現を用いた投稿

このような回答には修正を依頼しましょう。

回答へのコメント

dameo

2022/12/05 13:24

一応引数で渡してるresultには値が入らないので、そこに対する言及もお願いします。
dameo

2022/12/05 23:58

プロセスなので、IPCがないと消えてしまう=戻り値で返さないとダメ、という話だけ触れてくれれば良かったんですが、お忙しいところ失礼しました。 一応現行CPythonで調べた結果だけ載せておきます。 プロセスプールなので、何らかのIPCを使ってないとデータの授受が出来ません。 ProcessPoolExecutorはCPython(AS-IS)だと https://github.com/python/cpython/blob/main/Lib/concurrent/futures/process.py#L8-L25 こんな形のアーキテクチャなので、Process Pool内のプロセスとはCall Q(Queue)に積まれた引数を渡し、Result Q(Queue)から返ってきたResultを受け取ります。 このコードを追うとQueueは https://github.com/python/cpython/blob/main/Lib/concurrent/futures/process.py#L706-L710 https://github.com/python/cpython/blob/main/Lib/concurrent/futures/process.py#L161 https://github.com/python/cpython/blob/main/Lib/concurrent/futures/process.py#L53 multiprocessing.queues.Queue、 https://github.com/python/cpython/blob/main/Lib/multiprocessing/__init__.py#L16 https://github.com/python/cpython/blob/main/Lib/multiprocessing/context.py#L100-L103 つまりmultiprocessing.Queueなので https://docs.python.org/ja/3/library/multiprocessing.html#pipes-and-queues にある注釈を引用すると… 「注釈: オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。」 なっています。つまりオブジェクトを参照する形態はないということです。 引数で渡されたresultはそのときの状態のまま値でプロセスプールのプロセスに渡されるということになります。 resultの中身を計算でいくら書き換えても、戻り値以外で結果を返す手段がないということです。
beluga00nm

2022/12/07 11:38

両方の疑問に答えていただきありがとうございました。無事動きました! 加えて疑問なのですが、上記の私のコードのままでは動かず、 with futures.ProcessPoolExecutorの中の true_results = [] for future in task_list: true_results.append(future.result()) の部分を削除して withの前の時点でtrue_results = []を作ってから.resultで順に結果を取得するようにすると動きました。 これがまさしくdameo様がおっしゃっている、プロセス間の通信ができないとうことなのでしょうか? このあたりは難しそうなので自分でももう少し勉強してみるつもりですが、 もしよろしければ少し言及していただけると嬉しいです。

1

futureは対話型ではうまくいかない

はい。 なので、メインの実行部分を

python

1if __name__ == '__main__':

の後に入れてみてください。
timer()がどのような関数なのかわからかったので、それを省いたコードでこちらで動きました。

ちなみに、windowsはfork()がちゃんと実装されていないので、マルチプロセスによる並列処理がうまくうごかないことがあります。ちゃんとマルチプロセスで動かしたいのであれば、linuxなどに移行することをお勧めします。

投稿2022/12/05 12:16

TakaiY

総合スコア10535

quickquip👍を押しています

良いと思った回答にはグッドを送りましょう。
グッドが多くついた回答ほどページの上位に表示されるので、他の人が素晴らしい回答を見つけやすくなります。

下記のような回答は推奨されていません。

  • 間違っている回答
  • 質問の回答になっていない投稿
  • スパムや攻撃的な表現を用いた投稿

このような回答には修正を依頼しましょう。

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
86.02%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問

同じタグがついた質問を見る

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。