質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

87.61%

python 並列処理の実装

解決済

回答 1

投稿

  • 評価
  • クリップ 3
  • VIEW 5,981

score 49

実現したいこと

現在Pythonにおいて以下のソースコードの並列処理化をしようとしています。
未だにPythonの並列処理に慣れておらずどのように書けばいいかわかりません。

ちなみにこのスクリプトは動画から顔を切り出すものです。単純に教師データの確保のためです。
C#ではTask.Runで実装してたのですがPythonでも勉強のためやってみようと思ったため書いてみた次第です。拙いコードで正直自分で見てもコンストラクタとか気持ち悪いですがどうかご容赦ください。

具体的にしたいことはrun関数にあるforを並列処理にできないかと考えています。
どなたか教えてください。

該当のソースコード

# import modules
import cv2
import os
import re
import sys
import tkinter as tk
from tkinter import filedialog, messagebox


class Cutter:

    def __init__(self):
        self.cascade_path = 'lbpcascade_animeface.xml'
        self.video_paths = list()
        self.save_path = ''
        self.IsChoice = None
        self.frame = None
        self.cap = None
        self.num = 0

    def __select_make(self):
        self.IsChoice = True
        root = tk.Tk()
        root.withdraw()
        dir_path = filedialog.askdirectory(initialdir='C:/', title='動画フォルダを選択してください')
        paths = list()
        try:
            for i, filename in enumerate(os.listdir(dir_path)):
                fn = str(dir_path + '/' + filename)
                self.video_paths.append(fn)
                path, ext = os.path.splitext(filename)
                if not ext == '.mp4' or not ext == '.avi':
                    continue
                paths = [(re.search("[0-9]+", x).group(), x) for x in self.video_paths]
            paths.sort(key=lambda x: int(x[0]))
            for i, p in enumerate(paths):
                self.video_paths[i] = p[1]
            del root
        except:
            choice = messagebox.askquestion('Error : Can\'t find file', '動画ファイルが見つかりませんでした\n'
                                                                        '終了しますか?')
            if choice == 'yes':
                sys.exit()
        if len(self.video_paths) == 0:
            self.IsChoice = False

    def __select_save(self):
        root = tk.Tk()
        root.withdraw()
        self.save_path = filedialog.askdirectory(initialdir='C:/', title='保存フォルダを選択してください')
        if self.save_path == '':
            choice = messagebox.askquestion('Error', '保存先が選択されていません\n'
                                                     '終了しますか?')
            if choice == 'yes':
                sys.exit()
        self.IsChoice = True

    def __cut(self, filename):

        def change(p):
            result = ''
            for w in p:
                if w == '\\':
                    result += '/'
                else:
                    result += w
            return result

        def face_cut():
            gray = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.equalizeHist(gray)

            cascade = cv2.CascadeClassifier(self.cascade_path)
            facerect = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=3, minSize=(50, 50))

            print(facerect)
            return facerect

        self.cap = cv2.VideoCapture(filename)
        self.num += 1
        frame_num = 0
        save_num = 1
        other_num = 0
        output_dir = change(os.path.abspath(self.save_path))
        if not os.path.exists(output_dir + '/faces'):
            os.mkdir(output_dir + '/faces')
        output_dir += '/faces'

        other_dir = change(os.path.abspath(self.save_path))
        if not os.path.exists(other_dir + '/other'):
            os.mkdir(other_dir + '/other')
        other_dir += '/other'

        while self.cap.isOpened():

            frame_num += 1
            ret, self.frame = self.cap.read()
            if not ret:
                break
            if frame_num % 50 == 0:
                face = face_cut()

                if len(face) == 0:
                    other_path = os.path.join(other_dir, 'd_{0}_{1}.jpg'.format(self.num, other_num))
                    other_num += 1
                    try:
                        cv2.imwrite(other_path, self.frame)
                    except:
                        messagebox.showerror('Error', '保存に失敗しました')

                for j, (x, y, w, h) in enumerate(face):
                    face_img = self.frame[y: y + h, x: x + w]
                    # face_img = cv2.resize(face_img, (28, 28))
                    output_path = str(output_dir) + '/' + '{0}_{1}_{2}.jpg'.format(self.num, save_num, j)
                    save_num += 1
                    try:
                        cv2.imwrite(output_path, face_img)
                        print(output_path)
                    except:
                        messagebox.showerror('Error', '保存に失敗しました')

        self.cap.release()

    def __select(self):
        while not self.IsChoice:
            self.__select_make()
        self.IsChoice = False
        while not self.IsChoice:
            self.__select_save()

    def run(self):
        self.__select()
        for path in self.video_paths:
            self.__cut(path)


# main function
if __name__ == '__main__':
    cutter = Cutter()
    cutter.run()

試したこと

joblibを使いrunを

Parallel(n_jobs=-1)(delayed(self.cut(path) for path in self.video_paths))


としたら

TypeError: can't pickle generator objects


と表示されてしまった。
いまいちjoblib.Parallelに関して理解できてないのでエラーの理由も教えていただけたら嬉しいです。

補足情報(言語/FW/ツール等のバージョンなど)

python 3.6.3

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 1

checkベストアンサー

+4

Pythonに不慣れなうちは標準ライブラリで並列化したほうが良いですよ。

PythonにはJavaみたいなThreadPoolExecutorと、それをマルチプロセスでやってしまおうというProcessPoolExecutorがあります。

詳細は以下のコードとその実行結果、ドキュメントを参照してください。

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from concurrent.futures import ThreadPoolExecutor
from os import getpid
from time import sleep
from threading import current_thread


def func_result_as_str(n):
    sleep(1)  # あまりにはやいと並列化されないのでsleep
    return "n=%d, Process ID=%d, Thread ID=%d" %(
        n, getpid(), current_thread().ident)


def func_result_as_generator(n):
    yield from func_result_as_str(n)


def func_result_as_tuple(n):
    return tuple(func_result_as_generator(n))


def main():
    N_jobs = 10

    # スレッドベースの並列化
    # メリット - 基本的になんでも簡単に並列化できる
    # デメリット - GILの制限から解放されない
    with ThreadPoolExecutor() as pool:
        print("*" * 10, "func_result_as_strの戻り値たち: Thread IDがばらばら")
        for result in pool.map(func_result_as_str, range(N_jobs)):
            print(result)

        print("*" * 10, "func_result_as_generatorの戻り値たち: generatorが返ってくる、"
              "でも結局generator生成だけが並列化されてそれを回すところが並列されない")
        for result in pool.map(func_result_as_generator, range(N_jobs)):
            print(result, "".join(result))

    # プロセスベースの並列化
    # メリット - GILの制限から解放される
    # デメリット - プロセス間でpicklable(シリアライズ可能)なオブジェクトしかやり取りできない
    with ProcessPoolExecutor() as pool:
        print("*" * 10, "strはpicklable(シリアライズ可能)なのでProcessPoolExecutor"
              "でも並列化OK、Process IDがばらばら")
        for result in pool.map(func_result_as_str, range(N_jobs)):
            print(result)

        print("*" * 10, "generatorはpicklable(シリアライズ可能)ではないのでなので"
              "ProcessPoolExecutorでは並列化NG")
        try:
            for result in pool.map(func_result_as_generator, range(N_jobs)):
                print(result, "".join(result))
        except BrokenProcessPool as e:
            print(e)

    with ProcessPoolExecutor() as pool:
        print("*" * 10, "tupleはpicklable(シリアライズ可能)なのでなので"
              "ProcessPoolExecutorでも並列化OK")
        for result in pool.map(func_result_as_tuple, range(N_jobs)):
            print(result, "".join(result))


if __name__ == "__main__":  # マルチプロセス化には必須
    main()
********** func_result_as_strの戻り値たち: Thread IDがばらばら
n=0, Process ID=13768, Thread ID=7692
n=1, Process ID=13768, Thread ID=15164
n=2, Process ID=13768, Thread ID=13164
n=3, Process ID=13768, Thread ID=1312
n=4, Process ID=13768, Thread ID=2292
n=5, Process ID=13768, Thread ID=7260
n=6, Process ID=13768, Thread ID=6204
n=7, Process ID=13768, Thread ID=1296
n=8, Process ID=13768, Thread ID=16284
n=9, Process ID=13768, Thread ID=14668
********** func_result_as_generatorの戻り値たち: generatorが返ってくる、でも結局generator生成だけが並列化されてそれを回すところが並列されない
<generator object func_result_as_generator at 0x0000019061330410> n=0, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330468> n=1, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330570> n=2, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x00000190613305C8> n=3, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x00000190613304C0> n=4, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330620> n=5, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330518> n=6, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330780> n=7, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x0000019061330728> n=8, Process ID=13768, Thread ID=12788
<generator object func_result_as_generator at 0x00000190613306D0> n=9, Process ID=13768, Thread ID=12788
********** strはpicklable(シリアライズ可能)なのでProcessPoolExecutorでも並列化OK、Process IDがばらばら
n=0, Process ID=8180, Thread ID=8664
n=1, Process ID=3628, Thread ID=15880
n=2, Process ID=8920, Thread ID=12636
n=3, Process ID=6400, Thread ID=908
n=4, Process ID=8180, Thread ID=8664
n=5, Process ID=3628, Thread ID=15880
n=6, Process ID=8920, Thread ID=12636
n=7, Process ID=6400, Thread ID=908
n=8, Process ID=8180, Thread ID=8664
n=9, Process ID=3628, Thread ID=15880
********** generatorはpicklable(シリアライズ可能)ではないのでなのでProcessPoolExecutorでは並列化NG
Process Process-4:
Process Process-3:
Process Process-1:
Process Process-2:
Traceback (most recent call last):
  File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker
    result=r))
  File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put
    obj = _ForkingPickler.dumps(obj)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects
Traceback (most recent call last):
  File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker
    result=r))
  File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put
    obj = _ForkingPickler.dumps(obj)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects
Traceback (most recent call last):
  File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker
    result=r))
  File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put
    obj = _ForkingPickler.dumps(obj)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
Traceback (most recent call last):
TypeError: can't pickle generator objects
  File "C:\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python36\lib\concurrent\futures\process.py", line 181, in _process_worker
    result=r))
  File "C:\Python36\lib\multiprocessing\queues.py", line 349, in put
    obj = _ForkingPickler.dumps(obj)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle generator objects
A process in the process pool was terminated abruptly while the future was running or pending.
********** tupleはpicklable(シリアライズ可能)なのでなのでProcessPoolExecutorでも並列化OK
('n', '=', '0', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=0, Process ID=12512, Thread ID=9924
('n', '=', '1', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=1, Process ID=13808, Thread ID=15596
('n', '=', '2', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '9', '7', '8', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '1', '2', '7', '6') n=2, Process ID=9788, Thread ID=11276
('n', '=', '3', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '4', '9', '9', '6', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '3', '5', '2', '4') n=3, Process ID=4996, Thread ID=13524
('n', '=', '4', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=4, Process ID=12512, Thread ID=9924
('n', '=', '5', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=5, Process ID=13808, Thread ID=15596
('n', '=', '6', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '9', '7', '8', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '1', '2', '7', '6') n=6, Process ID=9788, Thread ID=11276
('n', '=', '7', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '4', '9', '9', '6', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '3', '5', '2', '4') n=7, Process ID=4996, Thread ID=13524
('n', '=', '8', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '2', '5', '1', '2', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '9', '9', '2', '4') n=8, Process ID=12512, Thread ID=9924
('n', '=', '9', ',', ' ', 'P', 'r', 'o', 'c', 'e', 's', 's', ' ', 'I', 'D', '=', '1', '3', '8', '0', '8', ',', ' ', 'T', 'h', 'r', 'e', 'a', 'd', ' ', 'I', 'D', '=', '1', '5', '5', '9', '6') n=9, Process ID=13808, Thread ID=15596

ちなみに多重度の初期値はProcessPoolExecutorよりもThreadPoolExecutorのほうが多くなっています。

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2017/11/12 19:22

    subprocessを使用することによる性能劣化はそんなにないと思います。

    subprocessの主なディスアドバンテージは、shellをTrueにした場合にshell起動分のオーバーヘッドがでる、exceptionが呼び出し元に伝搬されない、複数のsubprocessの結果を順不同に処理する仕組みがない、の3点だと思います。(他にもあるかもしれませんが)

    multiprocessing.Processとsubprocess.Popenで厳密にどれくらい性能に違いが出るかは、OSによっても結果が違いそうですし、何より難しすぎて詳細が追えません。multiprocessing.Processのほうがsubprocess.Popenよりも若干効率が良いくらいだと予想しますが、それ以上は分かりません。

    キャンセル

  • 2017/11/13 08:30

    ご丁寧に回答いただきありがとうございました。素直に標準ライブラリを使用してみます。

    キャンセル

  • 2017/11/13 09:06

    勉強になります。
    おもしろい話ありがとうございました。

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 87.61%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る