質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
ソケット

TCP/IPにおいて、IPアドレスとサブアドレスであるポート番号を組み合わせたネットワークアドレスのことを呼びます。また、ソフトウェアアプリケーションにおいて、TCP/IP通信を行う為の仮想的なインターフェースという意味もある。

TCP

TCP(Transmission Control Protocol)とは、トランスポート層のプロトコルで、コネクション型のデータサービスです。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Amazon EC2

Amazon EC2は“Amazon Elastic Compute Cloud”の略称です。Amazon Web Services(AWS)の一部であり、仮想化されたWebサーバーのコンピュータリソースをレンタルできるサービスです。

Q&A

0回答

1914閲覧

Pythonでasyncioを使ってP2P通信をTCPで実現したい(お互いのIPアドレスは既知)

attache-case

総合スコア6

ソケット

TCP/IPにおいて、IPアドレスとサブアドレスであるポート番号を組み合わせたネットワークアドレスのことを呼びます。また、ソフトウェアアプリケーションにおいて、TCP/IP通信を行う為の仮想的なインターフェースという意味もある。

TCP

TCP(Transmission Control Protocol)とは、トランスポート層のプロトコルで、コネクション型のデータサービスです。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Amazon EC2

Amazon EC2は“Amazon Elastic Compute Cloud”の略称です。Amazon Web Services(AWS)の一部であり、仮想化されたWebサーバーのコンピュータリソースをレンタルできるサービスです。

0グッド

0クリップ

投稿2020/01/03 07:18

編集2020/01/03 07:45

前提・実現したいこと

  • 決められた台数(2台〜)のAWS ec2インスタンス同士がP2PでTCP通信できるようにしたいです
    ※送受信する情報はPythonで処理するため,通信部分もPythonで実現しようと考えています
    ※お互いのIPアドレス・待ち受けポートは既知とします
  • 各インスタンスは特定のポート(例えば50008)で他のインスタンスからの通信を待ち受けていると同時に,一定の周期で他のインスタンスの特定のポートへ通信をします
    ※待ち受けていたポートにメッセージが来た→メッセージを処理して返信用のメッセージを返す
    ※相手にメッセージを送る→相手がメッセージを処理して返信用のメッセージを返す→帰ってきたメッセージをもとに変数の増減などの処理を行う
  • サーバーとしての受信待ちとクライアントとしての送信待ちが共存して走る形になるため,ブロッキングが起きないようasyncioを使いたいと考えています
  • サーバー的な役割を果たすループでは一定の条件(例えば自分以外の全員から1回以上通信されるなど)を満たしたら止めたい
  • クライアント的な役割を果たすループでは送信しようとして一定時間応答がなければキャンセルする処理をしたい

発生している問題・エラーメッセージ

実現したいことの前段階として
1台 vs 1台で20個のメッセージを相手に送信しつつ,
相手からのメッセージを待ち受けるコードを書いたものの,
①待ち受けしている関数で相手から届いた情報がPrintされない
②待ち受けしている関数の止め方が分からない
(loop.stop()のような記述をどこに埋め込めば良いか分からない)

該当のソースコード

https://docs.python.org/ja/3/library/asyncio-stream.html#tcp-echo-client-using-streams
https://docs.python.org/ja/3/library/asyncio-stream.html#tcp-echo-server-using-streams
などを参考に作成

py

1#encoding: utf-8 2import asyncio 3 4ip_to = 'x.x.x.x' # Public IPv4 Address 5 6async def tcp_echo_client(message): 7 reader, writer = await asyncio.wait_for(asyncio.open_connection( 8 ip_to, 50008), timeout=5) 9 10 print(f'Send: {message!r}') 11 writer.write(message.encode()) 12 13 data = await reader.read(100) 14 print(f'Received: {data.decode()!r}') 15 16 print('Close the connection') 17 writer.close() 18 19async def handle_echo(reader, writer): 20 data = await reader.read(100) 21 message = data.decode() 22 addr = writer.get_extra_info('peername') 23 24 print(f"Received {message!r} from {addr!r}") 25 26 print(f"Send: {message!r}") 27 writer.write(data) 28 await writer.drain() 29 30 print("Close the connection") 31 writer.close() 32 33async def main(): 34 server = await asyncio.start_server( 35 handle_echo, '0.0.0.0', 50008, 36 reuse_address=True, reuse_port=True) 37 38 addr = server.sockets[0].getsockname() 39 print(f'Serving on {addr}') 40 41 async with server: 42 await server.serve_forever() 43 44async def multiple_request(loop): 45 tasks = [] 46 tasks.append(main()) 47 for i in range(20): 48 tasks.append(tcp_echo_client(f'{i}: Hello, World!\n')) 49 done, pending = await asyncio.wait(tasks) 50 51loop = asyncio.get_event_loop() 52loop.run_until_complete(multiple_request(loop)) 53loop.close()

2台のインスタンスで同じコードを実行したところ,出力は以下のようになりました

(python3) [ec2-user@ip-x-x-x-x ~]$ python "/home/ec2-user/p2p_test/p2p.py" Serving on ('0.0.0.0', 50008) Send: '13: Hello, World!\n' Send: '0: Hello, World!\n' Send: '3: Hello, World!\n' Send: '14: Hello, World!\n' Send: '15: Hello, World!\n' ...(略)... Received: '13: Hello, World!\n' Close the connection Received: '0: Hello, World!\n' Close the connection Received: '3: Hello, World!\n' Close the connection Received: '14: Hello, World!\n' Close the connection Received: '15: Hello, World!\n' Close the connection ...(略)... □(待ち受けが続いている)

handle_echo側の関数によって以下のような出力が20個printされるはずですがprintされません

Received '0: Hello,World!\n' from ('x.x.x.x', 61827) Send: '0: Hello,World!\n'

試したこと

asyncioを使う前はThreadingでサーバー的な関数('0.0.0.0', 50008をbind)とクライアント的な関数を並行して実行させていましたが,サーバーの待ち受け部分conn, addr = s.accept()とクライアントの送信待ちs.connect((addr, dest_port))にブロッキングされていたようで,メッセージが相手に届きませんでした.s.setblocking(False)を設定すると,IOErrorとしてポート50008にバインドできないという旨のエラーが出たため,方針を切り替えてasyncioを使ってみようとしたところ,非同期処理でServerとClientを共存させているコード例が見当たらず詰まってしまいました.
初心者ながら,参考になる知識やコードの修正をいただけますと大変ありがたいです.

補足情報(FW/ツールのバージョンなど)

Python 3.7.4

追記[2020/01/03 16:40]:

Threadingで1台vs1台のP2P通信の導通確認をするコードとエラーを補足として載せます
実際には10台以上でこのようなコードを実行し,一定の失敗回数以内で一定数以上のノードに通信できる(send_status='A')&一定時間以内に一定数以上のノードから通信があった(receive_status='A')という結果をハンドリングして,この後に本格的なP2P通信を開始したいというコードになります.
Threadingとasyncioに関わらず,クライアントとサーバーの機能が共存するようなコードを書くヒントを頂きたく思います.

py

1# -*- coding: utf-8 -*- 2import asyncio 3import socket 4import threading 5from time import time 6import queue 7 8 9send_queue = queue.Queue() 10receive_set = set() 11 12send_status = None 13receive_status = None 14 15sendable_ips = [] 16receivable_ips = [] 17 18 19def try_init(my_ip, ips, dest_port=50008): 20 """ 21 他のノードに自身の情報を伝え導通確認をする 22 """ 23 global send_status 24 25 send_status = 'A' 26 fail_count = 0 27 n_nodes = len(ips) - 1 28 for addr in list(set(ips) - {my_ip}): 29 send_queue.put(addr) 30 31 print('[initial send_queue]') 32 print(list(set(ips) - {my_ip})) 33 34 while True: 35 next_queue = queue.Queue() 36 while send_queue.empty() is False: 37 addr = send_queue.get() 38 try: 39 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 40 s.settimeout(3) 41 s.setblocking(False) 42 print('trying to connect: ' + addr + ', ' + str(dest_port)) 43 s.connect((addr, dest_port)) 44 print('connected to: ' + addr + ', ' + str(dest_port)) 45 46 # サーバにメッセージを送る 47 s.sendall(b'test msg') 48 49 # サーバからの文字列を取得する。 50 data = s.recv(4096) 51 if data: 52 sendable_ips.append(addr) 53 except (ConnectionRefusedError, TimeoutError, socket.timeout) as e: 54 print(e) 55 next_queue.put(addr) 56 fail_count += 1 57 except: 58 send_status = 'Z' 59 raise 60 if next_queue.empty(): 61 break 62 elif fail_count < n_nodes//3: 63 while next_queue.empty() is False: 64 send_queue.put(next_queue.get()) 65 continue 66 elif next_queue.qsize() < n_nodes//10: 67 send_status = 'B' 68 break 69 else: 70 send_status = 'C' 71 break 72 73 print('send_status: ' + send_status) 74 return 75 76 77def listen_init(my_ip, ips, listen_ip='0.0.0.0', listen_port=50008): 78 """ 79 他のノードから通信が来るのを待ち、受信する 80 """ 81 global receive_status 82 83 n_nodes = len(ips) - 1 84 receive_status = 'A' 85 t_listen_start_sec = time() 86 87 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 88 # IPアドレスとポートを指定してbindする 89 # FWやセキュリティポリシーで解放されているIP/Portにするべきである 90 s.bind((listen_ip, listen_port)) 91 # 接続待ち受け 20秒何も来なかったら抜ける 92 s.settimeout(20) 93 s.listen(10) 94 s.setblocking(False) 95 96 # connectionするまで待つ 97 while True: 98 try: 99 # 接続 100 print('listening at: ' + listen_ip + ', ' + str(listen_port)) 101 conn, addr = s.accept() 102 print('got connection from: ' + addr[0] + ', ' + str(addr[1])) 103 data = conn.recv(4096) 104 if data: 105 conn.sendall(b'OK: Received your info.') 106 receive_set.add(addr[0]) # 実際は相手にPublic IPv4 Addressをメッセージに入れてもらっている 107 else: 108 conn.sendall(b'NG: Nothing received.') 109 except socket.timeout: 110 print('listen timeout') 111 pass 112 except: 113 receive_status = 'Z' 114 raise 115 116 if len(receive_set) == n_nodes: 117 break 118 else: 119 t_listen_current_sec = time() 120 t_listen_elapsed_sec = t_listen_current_sec - t_listen_start_sec 121 if t_listen_elapsed_sec < 100: 122 continue 123 elif len(receive_set) > 9*n_nodes//10: 124 receive_status = 'B' 125 break 126 else: 127 receive_status = 'C' 128 break 129 130 for addr in receive_set: 131 receivable_ips.append(addr) 132 133 print('receive_status: ' + receive_status) 134 return 135 136 137def p2p_setup_main(): 138 my_ip = 'x.x.x.x' 139 ips = ['x.x.x.x', 'y.y.y.y'] 140 try: 141 t_try_init = threading.Thread(target=try_init, args=(my_ip, ips)) 142 t_listen_init = threading.Thread(target=listen_init, args=(my_ip, ips)) 143 t_try_init.start() 144 t_listen_init.start() 145 t_try_init.join() 146 t_listen_init.join() 147 148 except: 149 raise 150 151 return 152 153if __name__ == '__main__': 154 p2p_setup_main()

出力(エラー)

(python3) [ec2-user@ip-x-x-x-x ~]$ /home/ec2-user/venv/python3/bin/python /home/ec2-user/p2p_test/threading_p2p.py [initial send_queue] ['y.y.y.y'] trying to connect: y.y.y.y, 50008 Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib64/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/usr/lib64/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/home/ec2-user/p2p_test/threading_p2p.py", line 43, in try_init s.connect((addr, dest_port)) BlockingIOError: [Errno 115] Operation now in progress listening at: 0.0.0.0, 50008 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python3.7/threading.py", line 926, in _bootstrap_inner self.run() File "/usr/lib64/python3.7/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/home/ec2-user/p2p_test/threading_p2p.py", line 101, in listen_init conn, addr = s.accept() File "/usr/lib64/python3.7/socket.py", line 212, in accept fd, addr = self._accept() BlockingIOError: [Errno 11] Resource temporarily unavailable

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだ回答がついていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問