teratail header banner
teratail header banner
質問するログイン新規登録

質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.30%
Python 2.7

Python 2.7は2.xシリーズでは最後のメジャーバージョンです。Python3.1にある機能の多くが含まれています。

Q&A

解決済

1回答

3848閲覧

multiprocessing. join()が終了しない。

spike0726

総合スコア12

Python 2.7

Python 2.7は2.xシリーズでは最後のメジャーバージョンです。Python3.1にある機能の多くが含まれています。

0グッド

0クリップ

投稿2018/06/13 12:51

0

0

以下のサイトにあるmypool.pyを使い並列処理を実装しています。
http://d.hatena.ne.jp/halhorn/20100904/1283612722

処理内容としては、子プロセス(multiprocessing.Process)として、
tshark(wiresharkのCUI版)を起動し、
そのキャプチャ結果をmultiprocessing.Pipeで受け取るものです。

理由があり、終了待ちをしないといけないため、
multiprocessing.Process.joinで終了待ちをしています。
ところが、tsharkのキャプチャ数(データ量)が多くなると、
このjoinが終了しなくなります。(少ない場合はjoinは終了します)

joinの前に行っているPipe経由のrecv関数までは正常に処理できています。
送受信バッファの問題かなと考えましたが、recv関数後にpollを呼んでも
0が返る(受信すべきデータがない)ため原因がわかりません。

抽象的な文面で分かりにくくて申し訳ありませんが、
お気づきの点があり、アドバイスを頂けると助かります。

よろしくお願いします。
以上です。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

umyu

2018/06/18 12:51

>tsharkのキャプチャ数(データ量) これは64KB以上でしょうか? Popen.communicateで、Hangしている可能性もあるのでは。
spike0726

2018/06/19 00:54

コメントありがとうございます。64KBを超えた場合、Popen.communicateは使えないということでしょうか?その場合、どのような処理に置き換えれば回避できるでしょうか?
umyu

2018/06/19 01:26 編集

いえ、普通にstdoutとstderrにファイルオブジェクト)tempfile.TemporaryFileやuniqueな名前)を渡せばよいのでは。10年前の記事ですが、https://thraxil.org/users/anders/posts/2008/03/13/Subprocess-Hanging-PIPE-is-your-enemy/ この問題だとするならば、最初にも提起しましたが、止まる正確なバイト数を特定する所からではないでしょうか。
spike0726

2018/06/19 01:30

参考サイトありがとうございます。この対処と hayataka2049 さんに教えて頂いた対処でうまく行けそうな気がしてます。結果は別途報告させて頂きます!
guest

回答1

0

ベストアンサー

安直ですが、とりあえずほしい結果が得られていて、I/O等のリソース等も触っていないのであれば、強制的に殺してしまっても良いのではないでしょうか。
joinの引数に整数を入れると、タイムアウトさせられます。

17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.5 ドキュメント

投稿2018/06/13 14:03

hayataka2049

総合スコア30939

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

spike0726

2018/06/13 14:08

コメントありがとうございます! joinの引数にtimeoutを指定し、そのあとにis_alive()がtrueであれば、 terminate()をするようにしたのですが、その時のデータの内容を確認すると、データが途中までしか引き取れてません。なんか詰まっているのかデッドロックになっているような感じです。
hayataka2049

2018/06/13 14:16

困りましたね(汗)どうしましょうか。それなりに色々調べてみないとよくわからないかも とりあえずプログラムは載せてみて頂けますか。ここでデータがヘンでしたみたいなコメントも入れていただけると助かります
spike0726

2018/06/13 15:10

ありがとうございます。 掲載のURLページに載っているmypool.pyを使っているコードが、版権問題で提示できないため、問題を整理して再度追加の情報を提示致します。 あの後、こちらでも調べているのですが、親プロセスと子プロセスのデータの受け渡しにmultiprocessing.Queueを使っているのですが、これが大量データのときと少量データ(うまくいくケース)で、挙動が違ってそうで、その辺りを調べています。どちらもQueueのデフォルトMAXサイズの1024は超えていないのは確認しています。
hayataka2049

2018/06/13 15:16

そうですか。リンク先のコードをもう一回読み直してみましたが、これに引っかかってる可能性を考えるべきかも >オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MB+) と、 ValueError 例外が送出されることがあります。 https://docs.python.jp/3/library/multiprocessing.html#multiprocessing.Connection.send
hayataka2049

2018/06/13 15:30

でも別に例外が出てる訳じゃないのか・・・なんだろう?
spike0726

2018/06/13 17:14

ありがとうございます。 キャプチャした情報をキューに入れて、それを配列にしたときにキューの数と合わない時があります。キャプチャした文字列に制御コードが入っているのかもしれませんので、ご指摘のシリアライズ化は的確かもしれません。明日試してみます!
spike0726

2018/06/14 03:28

プロセス間のデータの受け渡しに使っているmultiprocessing.Queueは、 put/getでpickle化を叩いているようです。 putするオブジェクトは辞書型でこれもpickle対応のオブジェクト型でした。
hayataka2049

2018/06/14 06:54

pickle自体は大きくても基本的に問題なく動くはずなんですけどねぇ。何か怪しいことになってるとしか それを使うのをやめて、multiprocessing.Poolで書き直す訳にはいきませんか? なんかこんな感じでできるらしいですけど http://aflc.hatenablog.com/entry/20110825/1314366131
spike0726

2018/06/14 07:17

multiprocessing.Poolのご提案ありがとうございます。 実装に関してはマルチプロセス(スレッドはNG)で実現できれば方法は問いませんので、ご提示のURLを参考にお試し実装してみます。 ありがとうございます。
hayataka2049

2018/06/14 07:24 編集

そもそもトップレベル関数で書ければ、何もしなくてもPoolでできるのですが、それは無理ですか?(コード見ないと判断できないなやっぱり・・・)
hayataka2049

2018/06/14 07:29

あとは、疑うべきはmultiprocessingの他にもtsharkがあった・・・軽く見落としていましたが、コマンドライン引数などの設定が適切でないとけっこうシビアに情報が捨てられるのでは?
spike0726

2018/06/14 07:37

明日、加工したソースを掲載します。 (色々関係ないところを削ったりする必要があるため少し時間が掛かります) その時、お手すきであればみて頂けると大変助かります。
spike0726

2018/06/14 07:47 編集

tsharkに関しては、プログラムで実行している引数で、コマンドラインで実行しましたが、特に問題が見られませんでした。(無線のキャプチャを行っています) tshark -i wXXXX -n -t ad -I -Y 'wlan.fc.type_subtype eq 4 || wlan.fc.type_subtype eq 5 || wlan.fc.type_subtype eq 8' -T fields -E separator=";" -e _ws.col.Time -e radiotap.channel.freq -e radiotap.dbm_antsignal -e wlan.sa -e wlan.da -e _ws.col.Info -e wlan.ht.capabilities -e wlan.tag.length -e wlan.supported_rates -e wlan.extended_supported_rates -e wlan.tag.oui -e wps.manufacturer -e wps.model_name -e wps.model_number -e wps.uuid_e -a duration:10 取れる内容は以下のような感じで最大で500件程度取得できています。 {'fp': ';0,8,1,4,6,1,4,24,9;130,132,139,150,12,18,24,36;48,72,96,108;20722,895;;;;', 'sensorunit': 2, 'dbm': -75.0, 'ssid': 'XXXXX', 'subtype': 'Beacon frame', 'mac': 'XX:XX:XX:XX:XX:XX', 'flags': '........C', 'sn': 524, 'observationtime': '2018-06-14 11:49:02.126062+09:00', 'host': 'XXXXX', 'freq': 2412.0}
hayataka2049

2018/06/15 12:33 編集

ひとこと。コメント欄はmarkdownが効かないので、質問文に追記してくださいな。あとどうしようもなく長いコメントになっちゃったので、再編集して適当に消しといてください(みづらい・・・
spike0726

2018/06/15 12:33

コードが最大文字数に達したため gistに張りつけました。 https://gist.github.com/spike0726/3857f1e4ab9611758054ef7c1bc21ab5 補足説明 このプログラムは常駐プログラムです。 リモートからPyhtonRPCで以下の順序で関数コールして、 tsharkのキャプチャデータを取得します。 setup()→ wlan_capture()→queue_get() お時間あるときに見て頂けたらと思います。
hayataka2049

2018/06/15 12:34

あ、gistに貼ってくれたならそれで良いです(でもやっぱり余計なスクロールが増えるので、コメントは消しといて・・・
spike0726

2018/06/15 12:38

先ほどのコードのコメントは削除依頼を出しました。 またgistに張り付けたソースは部分的に抜粋しています。 よろしくお願いします。
hayataka2049

2018/06/15 12:46 編集

削除依頼も良いんですが、コメントの上にマウスカーソルを持っていくと鉛筆マークが出てきて、それをクリックするとコメントを編集できます。それで全選択→全消しして、代わりに「削除済み」とでも書いておいてくださいな
hayataka2049

2018/06/18 10:50

コードをざっとは見ました。MyPool自体は手元で検証してみましたが、大きめなデータを投げても正常終了するので、これにはあまり問題はないのかもしれません。そうすると怪しいのはQueueですが、これですかね https://docs.python.jp/2.7/library/multiprocessing.html#pipes-and-queues >警告 上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。 >これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。 >マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。
spike0726

2018/06/18 11:50

細かく見て頂きありがとうございます。m(__)m 結論としては、今の実装を極力変えずに改修する場合は、 multiprocessing.Queue()ではなく multiprocessing.manager().Queue() にするとうまくいく、ということになるのでしょうか? (現時点ではmultiprocessing.Queue()がほぼほぼ原因であろうと思っています)
hayataka2049

2018/06/18 11:55

そのまま書き換えて動くかはちょっとわかりませんが、方向性としてはmanagerを使う方向になるのかなと
spike0726

2018/06/18 12:21

おっしゃる通りですね。 managerの使い方をきちんと調べて正しい使い方で実装してみます。 ありがとうございます!
spike0726

2018/06/19 03:32

コードは、multiprocessing.Queue()をmultiprocessing.Manager().Queue()に修正するだけで済みました。それでなんとうまくいきました!もう何か月にも渡って調べていたので大変助かりました。ありがとうございます。 なお、gistに掲載したソースはあまり見せるのはよろしくないもので削除致します。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.30%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問