質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Socket.IO

Socket.IOはNode.js上で動くライブラリであり、すべてのブラウザとモバイルデバイスでリアルタイムのアプリを作動させる事を目的としています。

Q&A

解決済

1回答

5349閲覧

Python 並列処理によるソケット通信 高速化

goki_gottan

総合スコア168

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Socket.IO

Socket.IOはNode.js上で動くライブラリであり、すべてのブラウザとモバイルデバイスでリアルタイムのアプリを作動させる事を目的としています。

0グッド

1クリップ

投稿2021/02/10 08:37

編集2021/02/17 02:54

回答がなかなか得られなかったので、ちょっと質問を変えさせていただきます。
ソケットが込み合っている可能性があり、
https://qiita.com/maueki/items/8f1e190681682ea11c98
↑こちらのサイトのようにブロッキング、ノンブロッキングを入れる必要があるのでしょうか。

Pythonで、ソケット通信にて、ソフト2個を並列処理にて動作させようとしておりますが、
並列処理はできておるのですが、非常に重く、遅いです。また、固まり、最終的にソフトが落ちるときがあります。

1個では落ちません。おそらく、並列処理が悪さしてると思うのですが、通信をどのようにしたら良いか、socket以外のライブラリー、または通信待機など入れるべきでしょうか?

import glob from pathlib import Path import shutil import os,sys,time from concurrent import futures import socket from collections import namedtuple r_1=r"*****" input_dir=Path(r_1) target_ip1 = "localhost" target_port1 = int(9494) target_ip2 = "localhost" target_port2 = int(9595) def func1(path1): global j1 j1=j1+1 if j==1: tcp_client1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_client1.connect((target_ip1, target_port1)) v = "***," + str(path1) tcp_client1.send(v.encode(encoding='utf-8')) response1 = tcp_client1.recv(buffer_size) response1 = response1.decode(encoding='utf-8') if j1==last: tcp_client1.shutdown(socket.SHUT_RDWR) tcp_client1.close() def func2(path2): global j2 j2 = j2+1 if j2==1: tcp_client2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_client2.connect((target_ip2, target_port2)) v = "***," + str(path2) tcp_client2.send(v.encode(encoding='utf-8')) response2 = tcp_client2.recv(buffer_size) response2 = response2.decode(encoding='utf-8') if j2==last: tcp_client2.shutdown(socket.SHUT_RDWR) tcp_client2.close() paths = list(input_dir.glob("**/*.jpg")) border, last = len(paths)//2, len(paths)-len(paths)%2 buffer_size = 4096 t1=time.time() future_list = [] global j1 global j2 j1=0 j2=0 with futures.ThreadPoolExecutor(2) as executor: for path1, path2 in zip(paths[:border], paths[border:last]): # タスクを追加する。 future1 = executor.submit(func1, str(path1)) future2 = executor.submit(func2, str(path2)) # Future オブジェクトを記録する。 future_list.append(future1) future_list.append(future2) t2 = time.time() elapsed_time = t2-t1 print(f"経過時間:{elapsed_time}")

落ちないようにどこを改善し、高速通信できますでしょうか。

宜しくお願いいたします。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

ブロッキング、ノンブロッキングを入れる必要があるのでしょうか。

見た感じの問題点は、ソケットの扱いで send/recv は、データの送受信を保証しません。
バッファに対する操作の為、期待するデータが送受信できていない可能性があります。
(ローカルホスト内のテストでは問題が起こりにくく、発覚しない事があるので注意)

詳しくは、Python FAQ: ソケットプログラミング HOWTO

ブロッキングで行う場合は、送受信完了を待つ場合は、
sendall や makefile と read/write を使います。

ノンブロッキングの場合は、select 等の通知の仕組を併用するのが一般的な使い方です。
(それをより使いやすくしたものが asyncio で、select は、ライブラリ内部で使われてます。)
URL の記事に関しては asyncio でソケットを扱う場合なので、質問のコードに関しては該当しません。


他の問題に関しては、ThreadPoolExecutor と socket の使い方、

複数のスレッドから2つのsocket 接続を共有して使ってますが、これはスレッドセーフな操作では有りません。

複数のスレッドから同じ接続を利用しようとしてるので、スレッドが有効活用できてない。(混雑の原因?)
通常は、(サーバーが複数の同時接続に対応していれば)スレッド毎に接続をひとつ割り当てます。

解決策については、サーバー側の実装次第な部分もあるので、
クライアント側だけで対応可能かどうかは、質問で提示されている情報だけでは、解りかねます。

サーバー側で データ長に続いてデータ本体 の様な構造のデータを読む場合等、
データ長が想定しない不正な値になり、パフォーマンス低下の原因になる等も考えられます。
ログを取ってデータが期待通りのタイミングで送受信されてることを確認しましょう。


改善方法について、問題が広範囲に渡りそうので具体的な方法は出せませんが、
方向性・案のみ

  • サーバーが2つになってる理由は何ですか?

 もし、サーバーが単一の接続しか受け付けないようであれば、複数接続対応が第一の課題です。

  • executor.submit ではなく、executor.map を使います。

 1接続:1スレッドとして、接続数を制限するには、ThreadPoolExecutor のmax_workers で指定。

  • ThreadPoolExecutor の initializer で、初期化処理を行えます。
  • スレッド別に接続を持つには、

 threading.local を使い、スレッド固有データに socket を初期化・接続

※ 切断時の処理は含みません。大量のデータ&接続数の場合は、他にエラー時のリカバリも必要になってきます。

投稿2021/02/14 03:34

編集2021/02/15 03:51
teamikl

総合スコア8664

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

goki_gottan

2021/02/15 02:14

ありがとうございます。 max_workers=2で、とりあえずは処理中固まらず動きました。複数スレッドが乱立し込み合ってしまっていたのかもしれません。ありがとうございます。 initialzierを入れてよりスレッドをクリーン化するのは良いですね、参考にいたします。 最後にスレッド処理が終わったのかソケット通信は終わったと思われるのですが、 print(f"経過時間:{elapsed_time}") こちらが出力されないようで、スレッドがまだ立ってるからでしょうか。処理が終われば、スレッドをjoinするようなコードを教えていただけないでしょうか。
goki_gottan

2021/02/15 02:42

すみません、ソケットを強制的に切断すると、 print(f"経過時間:{elapsed_time}") こちらが出力されました。ということは、切断が切れていないという事でしょうか。
teamikl

2021/02/15 03:46

先に確認ですが、「サーバーが2つになってる理由は何ですか?」 もしくは、サーバーは複数の同時接続に対応してますか。 回答に書いた他の要素については解消済なのでしょうか。 それとも、max_workers を指定したのみですか? ---- 現状のコードで max_workers=2 にするだけでは、 一時的な問題解消になっても、安全なコードになってません。 処理内容次第では、別スレッドから同ソケットへの 同時アクセスが起こり、問題が再発する可能性があります。 例: スレッドA,B の2個で A:func1, B:func2(時間がかかったと仮定) A:func1, A:func2(時間のかかっている接続への割り込みになる) それぞれのタスクの処理時間が均等だった場合、 偶然うまく交互に処理が噛み合ってるだけになります。 不確定な要素なので、安全なコードとは言えません。 > スレッドをjoinするようなコード with ThreadPoolExecutor(...) as executor: の文脈では、with文を抜ける時に後始末として thread が join されます。 既に スレッドをjoinするようなコードとなってるので、 追加で明示的な threadのjoinは不要です。 > 最後にスレッド処理が終わったのかソケット通信は終わったと思われるのですが、 どのように確認しましたか?(ログを出力する等) 状況から言えることは、スレッドの処理が終わっていないということなので、 スレッドで実行する関数の処理が終わってるかどうか、 詳細なログを取り「どの部分で止まっているかを確認」してください。 送受信では、ブロッキングI/O を用いてるので、よくありそうなのは データの受信で、指定バイト数受信する迄待っているケースがあります。 回答にも書きましたが、ログを取ってデータが期待通りのタイミングで送受信されてることを確認しましょう。
goki_gottan

2021/02/15 04:28

ありがとうございます。 あれから、コードを少し変更し、目的の数値になったら切断と、def func内に設けました。 with構文から出しておりましたが、def func内に入れることで切断は完了できました。 しかし、ご指摘の通り、func1内で重複が見られました。そこで、今回はfunc2内にtime.sleep(0.02)で一応、重複なく解決できました。要するに確実にfunc2とfunc1の同期を避けるようにしました。 しかしながら、func1が込み合った場合は同期するでしょうし、割り込みで最高速を実現することは難しいと思いました。
teamikl

2021/02/15 06:06

sleep での調整は確実な方法ではないので、対応方法としては悪手です。 処理内容が変わると時間も変わってくるし、 再現性の低いバグの原因になるので、お勧めしません。 現状の構造で対応するには、 排他制御(ロックやセマフォ等)といった方法が必要になってきます。 ですが、そもそもスレッドとソケットの接続を1:1 に対応させれば、 割り込みと言ったことを気にする必要はなくなるので、 構造的な見直しをしたほうが良いと思います。 現状の問題点: メインスレッドで生成したソケットを、  サブスレッドで使っている。  スレッドセーフでない点(スレッドを跨ぐリソースの扱い) 改善案: 各スレッドでソケットを生成し、自スレッド内でのみ利用する。  サブスレッドで使うソケットは、そのサブスレッド内で生成する。 ---- 再度の質問になりますが、サーバー側は同時接続に対応してるのでしょうか? (サーバー側の実装情報があれば、もう少し具体的な解決策を提示できます)
goki_gottan

2021/02/15 06:34

サーバー側は対応していないかと思います。分かりません。 ソフト同時起動用の別ソフトが立ち上がって初めて多重起動でき、ソケット通信もポートを設定することで実現できる状態です。 おっしゃる通り、sleepの調整はよくないと思います。 ご指摘の通り、ソケットをそれぞれ別スレッドで作成するようにします。そうすれば、ソケットの跨ぎはないですね。ただ、割り込みができなくなりませんでしょうか。 画像を空いているスレッドに送りこむという割り込みができるという事でしょうか。 ロックやセマフォも学習してみたいところですが、「画像を空いているスレッドに送りこむ」という概念で、各スレッドにそれぞれ固有のソケットを作成するようにいたします。
goki_gottan

2021/02/15 07:11

ソケットを別スレッドで宣言すると、1回目は接続できますが、2回目以降は通信できていないようです。
teamikl

2021/02/15 09:54

なるほど、サーバー側は既成のプログラムなのですね。 サーバー側に関しては、サーバーが同時接続に対応していれば、 max_workers の数を増やすだけで同時接続数を増やす構成にできて プログラムもシンプルにできるので、まずは確認をお勧めします。 現状では、サーバーを別ポートで立てているようですが、 この方式だと、同時に接続出来る数 = サーバー数に制限されてしまいます。 (高速化を目指す上では制限) >画像を空いているスレッドに送りこむという割り込みができるという事でしょうか。 そのスレッドの管理をしてくれるのが、ThreadPoolExecutor の仕事です。 >ソケットを別スレッドで宣言すると、1回目は接続できますが、2回目以降は通信できていないようです。 2回目とは、毎回接続しようとしてますか? ソケット自体にそのような制限はないので、 前回の接続が残っていてサーバー側で接続数に制限があるか、 やりたいことと実際のコードで何か齟齬があると思います。
goki_gottan

2021/02/15 12:56

一回目だけ、ソケット登録、接続をし、二回目以降は登録も接続のコードは入れてないのですが、なぜかスルーするようです。 込み合ってるのかと思い、待機時間を設けたのですが、変化ありませんでした。
teamikl

2021/02/16 00:24 編集

実際のコードがないと、情報不足です。 返答としては、>やりたいことと実際のコードで何か齟齬があると思います。 私の視点からは、話の流れから回答に書いた - initializer で初期化(一度のみ) - threading.local でスレッド固有領域に変数をいれる といったコードを想定してますが、実際のコードはどのようにされてるのでしょう。 - 通信できない理由は?(エラーなのか、送受信の反応がないのか) - 初回、生成したソケットは何処の変数に格納してるのか - 二回目以降、接続しない事はどのように確認したのか 等、不明な点がいくつもあるので、質問のコードを編集して下さい。
goki_gottan

2021/02/17 02:53

質問コードを修正してみました。func内に設定を設け、ある数値になると接続切れを指示しています。 この場合ですと、1回のみ処理して終わります。
teamikl

2021/02/17 04:47

ちなみにどのようなエラーになりました? この場合のtcp_client1 は、「ローカル変数」なので、 変数の生存期間は、他の何処からも参照されていなければその関数の終了まで。 2回目以降の呼び出しでは有効ではありません。 グローバル変数にすれば、変数の生存期間の問題は解消しますが、状況はもう少し複雑で マルチスレッドでは、スレッドを跨ぐ変数の書き換えは安全な操作では有りません。 (別スレッドからアクセスにより、意図しないタイミングでの変数の書き換えが起こります) global j1, j2 についても同様で、 テストではうまくいくが本運用で問題になるといったコードになりやすいです。 解決策として提示できるのは、回答で提示してる方法 - initializer で呼び出す関数内で初回一回のみ実行(ソケットの生成・接続) - threading.local() で、スレッド毎のデータ 但し、もうひとつ課題があって、(優先度的には、こちらが先) 質問のコードでは func1, func2 其々別スレッドで動かす必要がありますが、 ThreadPoolExecutor では、どちらのスレッドでどの関数を実行するかを選べません。 手の空いてるスレッドから使われるため、複数のスレッドから同じソケットを使う可能性があります。 これを解消するには、コードの構造から見直す必要があります。 そこで話が戻るのですが、 > サーバー側は対応していないかと思います。分かりません。 ここの、サーバー側の実装情報が必要になってきます。 理由は、サーバー側が複数同時接続に対応してるかどうかで、アプローチが変わってくる為。 どのように確認してわからなかったのか、等の詳細な情報が必要です。 実際にテストして確かめて下さい。 (同じポートに2つのソケットを接続して、2つめが成功するかどうかを確認する) もしくは、テスト用のサーバーを作り、サーバー側のコードを提示してください。
goki_gottan

2021/02/17 08:44

サーバー側1:1対応のようです。ですので、スレッドで複数アクセスした場合はフリーズする恐れがあります。 threading.local()気になります。
teamikl

2021/02/17 09:35

サーバー側の情報が不明瞭なので、現状の情報では 質問のコードを元にこれ以上詰めるのは難しいと思います。 >サーバー側1:1対応のようです。ですので、スレッドで複数アクセスした場合はフリーズする恐れがあります。 一応、確認しておきたいのは、どのように確認したのか。 「フリーズの恐れがある」→ 実際に試して確認して下さい。繰り返しますが、 実際のサーバーでテスト出来ない場合は、テスト用のサーバーを作り試す等、 先にテスト出来る環境を整備した方が良いでしょう。 サーバー側の実装詳細を把握することが、 現状のコードがうまくいくようにどうこう対応するよりも、優先すべき事です。 (何度もサーバーの情報を確認した理由: サーバーの実装に対して、間違った方向で最適化をしようとしてる可能性もあります) 質問自体が「並列処理によるソケット通信」なのに、 「複数アクセスした場合フリーズの恐れがある」では 質問の前提から覆ってきますが、その点の認識は大丈夫でしょうか。 恐らく、1つのポートあたりの接続が 1:1 ということだと思いますが、 その場合、並列処理するには1スレッド毎にひとつのサーバーが必要になってきます。 サーバー側のプログラムを複数同時起動しても安全な設計になってるかどうかも確認が必要です。 (複数起動した時に、複数のプロセスから同一ファイルに同時書き込みがないか等) >threading.local()気になります。 ここは、サンプルコード出せるので、説明より実際のコードを見てもらったほうが早いかな。 解答欄に別回答として投稿します。
teamikl

2021/02/18 07:25 編集

長くなったので外部サイトにて (コードの提示のみ。調べる際の参考程度で。詳細な説明は出来ないので、ご了承ください。) https://repl.it/@MiKLTea/EchoServerClient#README.md 総括しておくと、 私の回答欄で言及した項目は、どちらの場合にも共通する明らかに問題となる部分です。 (ソケットの使い方や、スレッドを跨いだリソースへのアクセス等) 改善案で示した方法は、回答内で記したように「複数接続対応が第一の課題です」 現状提示されてる限りのサーバーの情報(複数同時接続に対応していない)では、 質問のコードのアプローチ(ThreadPoolExecutor)による解決は適してません。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問