質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

949閲覧

python 並列処理の実装

redp

総合スコア49

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

1グッド

3クリップ

投稿2017/11/11 13:24

###実現したいこと
現在Pythonにおいて以下のソースコードの並列処理化をしようとしています。
未だにPythonの並列処理に慣れておらずどのように書けばいいかわかりません。

ちなみにこのスクリプトは動画から顔を切り出すものです。単純に教師データの確保のためです。
C#ではTask.Runで実装してたのですがPythonでも勉強のためやってみようと思ったため書いてみた次第です。拙いコードで正直自分で見てもコンストラクタとか気持ち悪いですがどうかご容赦ください。

具体的にしたいことはrun関数にあるforを並列処理にできないかと考えています。
どなたか教えてください。

###該当のソースコード

python

1# import modules 2import cv2 3import os 4import re 5import sys 6import tkinter as tk 7from tkinter import filedialog, messagebox 8 9 10class Cutter: 11 12 def __init__(self): 13 self.cascade_path = 'lbpcascade_animeface.xml' 14 self.video_paths = list() 15 self.save_path = '' 16 self.IsChoice = None 17 self.frame = None 18 self.cap = None 19 self.num = 0 20 21 def __select_make(self): 22 self.IsChoice = True 23 root = tk.Tk() 24 root.withdraw() 25 dir_path = filedialog.askdirectory(initialdir='C:/', title='動画フォルダを選択してください') 26 paths = list() 27 try: 28 for i, filename in enumerate(os.listdir(dir_path)): 29 fn = str(dir_path + '/' + filename) 30 self.video_paths.append(fn) 31 path, ext = os.path.splitext(filename) 32 if not ext == '.mp4' or not ext == '.avi': 33 continue 34 paths = [(re.search("[0-9]+", x).group(), x) for x in self.video_paths] 35 paths.sort(key=lambda x: int(x[0])) 36 for i, p in enumerate(paths): 37 self.video_paths[i] = p[1] 38 del root 39 except: 40 choice = messagebox.askquestion('Error : Can\'t find file', '動画ファイルが見つかりませんでした\n' 41 '終了しますか?') 42 if choice == 'yes': 43 sys.exit() 44 if len(self.video_paths) == 0: 45 self.IsChoice = False 46 47 def __select_save(self): 48 root = tk.Tk() 49 root.withdraw() 50 self.save_path = filedialog.askdirectory(initialdir='C:/', title='保存フォルダを選択してください') 51 if self.save_path == '': 52 choice = messagebox.askquestion('Error', '保存先が選択されていません\n' 53 '終了しますか?') 54 if choice == 'yes': 55 sys.exit() 56 self.IsChoice = True 57 58 def __cut(self, filename): 59 60 def change(p): 61 result = '' 62 for w in p: 63 if w == '\': 64 result += '/' 65 else: 66 result += w 67 return result 68 69 def face_cut(): 70 gray = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY) 71 gray = cv2.equalizeHist(gray) 72 73 cascade = cv2.CascadeClassifier(self.cascade_path) 74 facerect = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=3, minSize=(50, 50)) 75 76 print(facerect) 77 return facerect 78 79 self.cap = cv2.VideoCapture(filename) 80 self.num += 1 81 frame_num = 0 82 save_num = 1 83 other_num = 0 84 output_dir = change(os.path.abspath(self.save_path)) 85 if not os.path.exists(output_dir + '/faces'): 86 os.mkdir(output_dir + '/faces') 87 output_dir += '/faces' 88 89 other_dir = change(os.path.abspath(self.save_path)) 90 if not os.path.exists(other_dir + '/other'): 91 os.mkdir(other_dir + '/other') 92 other_dir += '/other' 93 94 while self.cap.isOpened(): 95 96 frame_num += 1 97 ret, self.frame = self.cap.read() 98 if not ret: 99 break 100 if frame_num % 50 == 0: 101 face = face_cut() 102 103 if len(face) == 0: 104 other_path = os.path.join(other_dir, 'd_{0}_{1}.jpg'.format(self.num, other_num)) 105 other_num += 1 106 try: 107 cv2.imwrite(other_path, self.frame) 108 except: 109 messagebox.showerror('Error', '保存に失敗しました') 110 111 for j, (x, y, w, h) in enumerate(face): 112 face_img = self.frame[y: y + h, x: x + w] 113 # face_img = cv2.resize(face_img, (28, 28)) 114 output_path = str(output_dir) + '/' + '{0}_{1}_{2}.jpg'.format(self.num, save_num, j) 115 save_num += 1 116 try: 117 cv2.imwrite(output_path, face_img) 118 print(output_path) 119 except: 120 messagebox.showerror('Error', '保存に失敗しました') 121 122 self.cap.release() 123 124 def __select(self): 125 while not self.IsChoice: 126 self.__select_make() 127 self.IsChoice = False 128 while not self.IsChoice: 129 self.__select_save() 130 131 def run(self): 132 self.__select() 133 for path in self.video_paths: 134 self.__cut(path) 135 136 137# main function 138if __name__ == '__main__': 139 cutter = Cutter() 140 cutter.run() 141

###試したこと
joblibを使いrunを

Parallel(n_jobs=-1)(delayed(self.cut(path) for path in self.video_paths))

としたら

TypeError: can't pickle generator objects

と表示されてしまった。
いまいちjoblib.Parallelに関して理解できてないのでエラーの理由も教えていただけたら嬉しいです。

###補足情報(言語/FW/ツール等のバージョンなど)
python 3.6.3

退会済みユーザー👍を押しています

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

Pythonに不慣れなうちは標準ライブラリで並列化したほうが良いですよ。

PythonにはJavaみたいなThreadPoolExecutorと、それをマルチプロセスでやってしまおうというProcessPoolExecutorがあります。

詳細は以下のコードとその実行結果、ドキュメントを参照してください。

python

1from concurrent.futures import ProcessPoolExecutor 2from concurrent.futures.process import BrokenProcessPool 3from concurrent.futures import ThreadPoolExecutor 4from os import getpid 5from time import sleep 6from threading import current_thread 7 8 9def func_result_as_str(n): 10 sleep(1) # あまりにはやいと並列化されないのでsleep 11 return "n=%d, Process ID=%d, Thread ID=%d" %( 12 n, getpid(), current_thread().ident) 13 14 15def func_result_as_generator(n): 16 yield from func_result_as_str(n) 17 18 19def func_result_as_tuple(n): 20 return tuple(func_result_as_generator(n)) 21 22 23def main(): 24 N_jobs = 10 25 26 # スレッドベースの並列化 27 # メリット - 基本的になんでも簡単に並列化できる 28 # デメリット - GILの制限から解放されない 29 with ThreadPoolExecutor() as pool: 30 print("*" * 10, "func_result_as_strの戻り値たち: Thread IDがばらばら") 31 for result in pool.map(func_result_as_str, range(N_jobs)): 32 print(result) 33 34 print("*" * 10, "func_result_as_generatorの戻り値たち: generatorが返ってくる、" 35 "でも結局generator生成だけが並列化されてそれを回すところが並列されない") 36 for result in pool.map(func_result_as_generator, range(N_jobs)): 37 print(result, "".join(result)) 38 39 # プロセスベースの並列化 40 # メリット - GILの制限から解放される 41 # デメリット - プロセス間でpicklable(シリアライズ可能)なオブジェクトしかやり取りできない 42 with ProcessPoolExecutor() as pool: 43 print("*" * 10, "strはpicklable(シリアライズ可能)なのでProcessPoolExecutor" 44 "でも並列化OK、Process IDがばらばら") 45 for result in pool.map(func_result_as_str, range(N_jobs)): 46 print(result) 47 48 print("*" * 10, "generatorはpicklable(シリアライズ可能)ではないのでなので" 49 "ProcessPoolExecutorでは並列化NG") 50 try: 51 for result in pool.map(func_result_as_generator, range(N_jobs)): 52 print(result, "".join(result)) 53 except BrokenProcessPool as e: 54 print(e) 55 56 with ProcessPoolExecutor() as pool: 57 print("*" * 10, "tupleはpicklable(シリアライズ可能)なのでなので" 58 "ProcessPoolExecutorでも並列化OK") 59 for result in pool.map(func_result_as_tuple, range(N_jobs)): 60 print(result, "".join(result)) 61 62 63if __name__ == "__main__": # マルチプロセス化には必須 64 main()
********** func_result_as_strの戻り値たち: Thread IDがばらばら n=0, Process ID=13768, Thread ID=7692 n=1, Process ID=13768, Thread ID=15164 n=2, Process ID=13768, Thread ID=13164 n=3, Process ID=13768, Thread ID=1312 n=4, Process ID=13768, Thread ID=2292 n=5, Process ID=13768, Thread ID=7260 n=6, Process ID=13768, Thread ID=6204 n=7, Process ID=13768, Thread ID=1296 n=8, Process ID=13768, Thread ID=16284 n=9, Process ID=13768, Thread ID=14668 ********** func_result_as_generatorの戻り値たち: generatorが返ってくる、でも結局generator生成だけが並列化されてそれを回すところが並列されない <generator object func_result_as_generator at 0x0000019061330410> n=0, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330468> n=1, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330570> n=2, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x00000190613305C8> n=3, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x00000190613304C0> n=4, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330620> n=5, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330518> n=6, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330780> n=7, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x0000019061330728> n=8, Process ID=13768, Thread ID=12788 <generator object func_result_as_generator at 0x00000190613306D0> n=9, Process ID=13768, Thread ID=12788 ********** strはpicklable(シリアライズ可能)なのでProcessPoolExecutorでも並列化OK、Process IDがばらばら n=0, Process ID=8180, Thread ID=8664 n=1, Process ID=3628, Thread ID=15880 n=2, Process ID=8920, Thread ID=12636 n=3, Process ID=6400, Thread ID=908 n=4, Process ID=8180, Thread ID=8664 n=5, Process ID=3628, Thread ID=15880 n=6, Process ID=8920, Thread ID=12636 n=7, Process ID=6400, Thread ID=908 n=8, Process ID=8180, Thread ID=8664 n=9, Process ID=3628, Thread ID=15880 ********** generatorはpicklable(シリアライズ可能)ではないのでなのでProcessPoolExecutorでは並列化NG Process Process-4: Process Process-3: Process Process-1: Process Process-2: Traceback (most recent call last): File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "C:\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker result=r)) File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put obj = _ForkingPickler.dumps(obj) File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle generator objects Traceback (most recent call last): File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "C:\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker result=r)) File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put obj = _ForkingPickler.dumps(obj) File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle generator objects Traceback (most recent call last): File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "C:\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker result=r)) File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put obj = _ForkingPickler.dumps(obj) File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) Traceback (most recent call last): TypeError: can't pickle generator objects File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap self.run() File "C:\Python36\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs) File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker result=r)) File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put obj = _ForkingPickler.dumps(obj) File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle generator objects A process in the process pool was terminated abruptly while the future was running or pending. ********** tupleはpicklable(シリアライズ可能)なのでなのでProcessPoolExecutorでも並列化OK ('n', '=', '0', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=0, Process ID=12512, Thread ID=9924 ('n', '=', '1', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=1, Process ID=13808, Thread ID=15596 ('n', '=', '2', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '9', '7', '8', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '1', '2', '7', '6') n=2, Process ID=9788, Thread ID=11276 ('n', '=', '3', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '4', '9', '9', '6', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '3', '5', '2', '4') n=3, Process ID=4996, Thread ID=13524 ('n', '=', '4', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=4, Process ID=12512, Thread ID=9924 ('n', '=', '5', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=5, Process ID=13808, Thread ID=15596 ('n', '=', '6', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '9', '7', '8', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '1', '2', '7', '6') n=6, Process ID=9788, Thread ID=11276 ('n', '=', '7', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '4', '9', '9', '6', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '3', '5', '2', '4') n=7, Process ID=4996, Thread ID=13524 ('n', '=', '8', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=8, Process ID=12512, Thread ID=9924 ('n', '=', '9', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=9, Process ID=13808, Thread ID=15596

ちなみに多重度の初期値はProcessPoolExecutorよりもThreadPoolExecutorのほうが多くなっています。

投稿2017/11/12 05:24

YouheiSakurai

総合スコア6142

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

mkgrei

2017/11/12 06:22

今回のケースではGILの制限を受けないように実装しようとしたら、クラスのpickle化に失敗したように見えます。確かに関数を並列化するのは比較的簡単ですが、自分で定義したクラスをやり取りしなければならない場合、①そもそも容易にシリアライズできるようにクラスを実装できるのか、②できるとしたらその時に注意すべき点(シリアライズ失敗する原因)はどこにあるのでしょうか? そもそも並列化の常識としては並列にやり取りしなければならないデータを最小に留めることが重要に思うものの、並列化したいところだけをくくりだす手間を省きたいところです。 以前にライブラリもいろいろ使ったクラスをまるごとシリアライズしようとして失敗した経験があって、何か手軽にプロセス並列できないか探っているところです。
YouheiSakurai

2017/11/12 06:47

①ProcessPoolExecutorで並列化しようとする範囲が大きいと、容易ではなくなるとおもいます。まずloggingの初期化周りで問題が出ますし。私の場合は並列化対象を、関数(+引数)にしたりクラスをcollections.namedtupleのサブクラスにしたりしてシリアライズに失敗しないようなデータになるように心がけています。 ②私は、generatorやthreading.Lock/Event、キュー等々がシリアライズの失敗原因になりやすい予感がしているので気をつけています。generatorに関してはtuplifyデコレータを作ってしのいだり、その他に関してはmultithreading.Managerやmultiprocessing.managers.*を使って頑張って逃げています。 そもそもプロセス間通信自体がネットワークを介したRPCとかわらないトリッキーな事なので、これ以上のお手軽は難しいんじゃないかと思います。multiprocessingの中を覗いてみるとProcessPoolExecutorが非常に高いレベルでお手軽に仕上がっていることが感じられると思います。
mkgrei

2017/11/12 09:04

詳しく説明していただきありがとうございます。 並列化によって問題になりそうなところを全部無視して、手続きを突っ込むとマジカルに全部解決してくれるような方法はないですよね。 ちなみに、関数を直接multiprocessingに渡す方法とは他に、スクリプトをくくりだしてsubprocessでプロセスを立てて並列化することも考えられますが、同等の処理をするにおいて速度に差が大きくつくでしょうか? 例えば、今度のケースで大きくクラスを定義し直したくない場合、.run()にself.video_pathsの範囲を引数を渡して、部分的に実装を変えることが容易です。 そこにsys.argsに引数の範囲を渡すようにして、別にsubprocessを大量に投げるコードを実装することもできます。 subprocessを用いることで大きな性能の劣化が予想されるでしょうか?
YouheiSakurai

2017/11/12 10:22

subprocessを使用することによる性能劣化はそんなにないと思います。 subprocessの主なディスアドバンテージは、shellをTrueにした場合にshell起動分のオーバーヘッドがでる、exceptionが呼び出し元に伝搬されない、複数のsubprocessの結果を順不同に処理する仕組みがない、の3点だと思います。(他にもあるかもしれませんが) multiprocessing.Processとsubprocess.Popenで厳密にどれくらい性能に違いが出るかは、OSによっても結果が違いそうですし、何より難しすぎて詳細が追えません。multiprocessing.Processのほうがsubprocess.Popenよりも若干効率が良いくらいだと予想しますが、それ以上は分かりません。
redp

2017/11/12 23:30

ご丁寧に回答いただきありがとうございました。素直に標準ライブラリを使用してみます。
mkgrei

2017/11/13 00:06

勉強になります。 おもしろい話ありがとうございました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問