質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

90.75%

  • Python 2.7

    1203questions

    Python 2.7は2.xシリーズでは最後のメジャーバージョンです。Python3.1にある機能の多くが含まれています。

ProcessPoolExecutor(Python2.7) Queue error.

解決済

回答 1

投稿

  • 評価
  • クリップ 0
  • VIEW 92

spike0726

score 2

Python2.7.12の環境で、concurrent.futures.ProcessPoolExecutor を使い
並列処理を実装しています。
(訳あって、multiprocessing の並列処理関数は使っていません。Queue関数は使用。)

class A():
    def __init__(self):
        self.queue =  multiprocessing.Queue(maxsize=1024)


    def proc():
        todo = []
        proc_list = get_classes()  # 処理をさせたいクラス群を取得
        with concurrent.futures.ProcessPoolExecutor() as executor: 
           for p in proc_list:
               future = executor.submit(p.func, self.queue) # ★
               todo.append(future)

        for future in futures.as_completed(todo):
            res = future.result()

上記のコードを実行すると★部分で以下のエラーが発生します。
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
File "/usr/lib/python2.7/multiprocessing/queues.py", line 77, in getstate
assert_spawning(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).name
RuntimeError: Queue objects should only be shared between processes through inheritance

ProcessPoolExecutorをThreadPoolExecutorに変更するとエラーなく正常処理します。

以下の条件で回避方法がありましたらご教授下さい。
(a)マルチコアでもGlobal Interpreter Lock機構の制限を受けるThreadPoolExecutorは使わない
(b)multiprocessingの並列処理も使わない
(c)ProcessPoolExecutorまたは上記(a)(b)以外の手法で並列化を実現したい
(d)multiprocessing.Queueは、並列化する関数(p.func)で参照したいのでできれば使いたいが
使わなくても同等の機能(キュー機構)が使えればそちらでも構わない

よろしくお願い致します。
以上です。

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 1

checkベストアンサー

+1

骨が折れますが、リモートマネージャー経由でmultiprocessing.Queueオブジェクトを共有するのが現実的な回避方法かと。

https://docs.python.org/ja/3/library/multiprocessing.html#using-a-remote-manager

ProcessPoolExecutorでタスクをsubmitする側でリモートマネージャーのサーバを動かしてmultiprocessing.Queueオブジェクトを登録しておき、タスク側でリモートマネージャーのクライアントを使用してそのQueueオブジェクト(のProxyオブジェクト)を取得する流れになると思います。

  • GILフリー => CPythonでやるならマルチプロセス一択
  • マルチプロセス対応のキュー機構 => multiprocessing.Queue or 標準ライブラリ外の何か

なので、multiprocessingのマネージャー機構を使うことを勧めています。ProcessPoolExecutorではありませんが、Windows環境でもLockやValueをプロセス間で共有できたので、他の環境でも動くはずです。

あと余談ですが、concurrentパッケージを使うなら3.5や3.6あたりで試したほうが良いと思います。2.7のconcurrentパッケージはbackportですよね?

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2018/05/30 21:40

    丁寧な回答ありがとうございます。
    ご提案のmultiprocessingのマネージャー機構というのを使ってみます!
    また、Pythonのバージョンについては、開発環境の制限で、2.7に限定されています。
    すみません、backportというのが理解できておりませんが、
    concurrent.futures は、「pip install futures」でインストールしました。

    以上、よろしくお願い致します。

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 90.75%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る

  • Python 2.7

    1203questions

    Python 2.7は2.xシリーズでは最後のメジャーバージョンです。Python3.1にある機能の多くが含まれています。