質問編集履歴

1

threadingでP2P通信を実現しようとして失敗した際のコードを追記

2020/01/03 07:45

投稿

attache-case
attache-case

スコア6

test CHANGED
File without changes
test CHANGED
@@ -180,35 +180,7 @@
180
180
 
181
181
  Send: '15: Hello, World!\n'
182
182
 
183
- Send: '16: Hello, World!\n'
184
-
185
- Send: '17: Hello, World!\n'
186
-
187
- Send: '18: Hello, World!\n'
188
-
189
- Send: '19: Hello, World!\n'
190
-
191
- Send: '6: Hello, World!\n'
183
+ ...(略)...
192
-
193
- Send: '2: Hello, World!\n'
194
-
195
- Send: '4: Hello, World!\n'
196
-
197
- Send: '7: Hello, World!\n'
198
-
199
- Send: '1: Hello, World!\n'
200
-
201
- Send: '8: Hello, World!\n'
202
-
203
- Send: '9: Hello, World!\n'
204
-
205
- Send: '10: Hello, World!\n'
206
-
207
- Send: '5: Hello, World!\n'
208
-
209
- Send: '11: Hello, World!\n'
210
-
211
- Send: '12: Hello, World!\n'
212
184
 
213
185
  Received: '13: Hello, World!\n'
214
186
 
@@ -230,65 +202,7 @@
230
202
 
231
203
  Close the connection
232
204
 
233
- Received: '16: Hello, World!\n'
234
-
235
- Close the connection
205
+ ...(略)...
236
-
237
- Received: '17: Hello, World!\n'
238
-
239
- Close the connection
240
-
241
- Received: '18: Hello, World!\n'
242
-
243
- Close the connection
244
-
245
- Received: '19: Hello, World!\n'
246
-
247
- Close the connection
248
-
249
- Received: '6: Hello, World!\n'
250
-
251
- Close the connection
252
-
253
- Received: '2: Hello, World!\n'
254
-
255
- Close the connection
256
-
257
- Received: '4: Hello, World!\n'
258
-
259
- Close the connection
260
-
261
- Received: '7: Hello, World!\n'
262
-
263
- Close the connection
264
-
265
- Received: '1: Hello, World!\n'
266
-
267
- Close the connection
268
-
269
- Received: '8: Hello, World!\n'
270
-
271
- Close the connection
272
-
273
- Received: '9: Hello, World!\n'
274
-
275
- Close the connection
276
-
277
- Received: '10: Hello, World!\n'
278
-
279
- Close the connection
280
-
281
- Received: '5: Hello, World!\n'
282
-
283
- Close the connection
284
-
285
- Received: '11: Hello, World!\n'
286
-
287
- Close the connection
288
-
289
- Received: '12: Hello, World!\n'
290
-
291
- Close the connection
292
206
 
293
207
  □(待ち受けが続いている)
294
208
 
@@ -323,3 +237,387 @@
323
237
 
324
238
 
325
239
  Python 3.7.4
240
+
241
+
242
+
243
+ 追記[2020/01/03 16:40]:
244
+
245
+
246
+
247
+ Threadingで1台vs1台のP2P通信の導通確認をするコードとエラーを補足として載せます
248
+
249
+ 実際には10台以上でこのようなコードを実行し,一定の失敗回数以内で一定数以上のノードに通信できる(send_status='A')&一定時間以内に一定数以上のノードから通信があった(receive_status='A')という結果をハンドリングして,この後に本格的なP2P通信を開始したいというコードになります.
250
+
251
+ Threadingとasyncioに関わらず,クライアントとサーバーの機能が共存するようなコードを書くヒントを頂きたく思います.
252
+
253
+ ```py
254
+
255
+ # -*- coding: utf-8 -*-
256
+
257
+ import asyncio
258
+
259
+ import socket
260
+
261
+ import threading
262
+
263
+ from time import time
264
+
265
+ import queue
266
+
267
+
268
+
269
+
270
+
271
+ send_queue = queue.Queue()
272
+
273
+ receive_set = set()
274
+
275
+
276
+
277
+ send_status = None
278
+
279
+ receive_status = None
280
+
281
+
282
+
283
+ sendable_ips = []
284
+
285
+ receivable_ips = []
286
+
287
+
288
+
289
+
290
+
291
+ def try_init(my_ip, ips, dest_port=50008):
292
+
293
+ """
294
+
295
+ 他のノードに自身の情報を伝え導通確認をする
296
+
297
+ """
298
+
299
+ global send_status
300
+
301
+
302
+
303
+ send_status = 'A'
304
+
305
+ fail_count = 0
306
+
307
+ n_nodes = len(ips) - 1
308
+
309
+ for addr in list(set(ips) - {my_ip}):
310
+
311
+ send_queue.put(addr)
312
+
313
+
314
+
315
+ print('[initial send_queue]')
316
+
317
+ print(list(set(ips) - {my_ip}))
318
+
319
+
320
+
321
+ while True:
322
+
323
+ next_queue = queue.Queue()
324
+
325
+ while send_queue.empty() is False:
326
+
327
+ addr = send_queue.get()
328
+
329
+ try:
330
+
331
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
332
+
333
+ s.settimeout(3)
334
+
335
+ s.setblocking(False)
336
+
337
+ print('trying to connect: ' + addr + ', ' + str(dest_port))
338
+
339
+ s.connect((addr, dest_port))
340
+
341
+ print('connected to: ' + addr + ', ' + str(dest_port))
342
+
343
+
344
+
345
+ # サーバにメッセージを送る
346
+
347
+ s.sendall(b'test msg')
348
+
349
+
350
+
351
+ # サーバからの文字列を取得する。
352
+
353
+ data = s.recv(4096)
354
+
355
+ if data:
356
+
357
+ sendable_ips.append(addr)
358
+
359
+ except (ConnectionRefusedError, TimeoutError, socket.timeout) as e:
360
+
361
+ print(e)
362
+
363
+ next_queue.put(addr)
364
+
365
+ fail_count += 1
366
+
367
+ except:
368
+
369
+ send_status = 'Z'
370
+
371
+ raise
372
+
373
+ if next_queue.empty():
374
+
375
+ break
376
+
377
+ elif fail_count < n_nodes//3:
378
+
379
+ while next_queue.empty() is False:
380
+
381
+ send_queue.put(next_queue.get())
382
+
383
+ continue
384
+
385
+ elif next_queue.qsize() < n_nodes//10:
386
+
387
+ send_status = 'B'
388
+
389
+ break
390
+
391
+ else:
392
+
393
+ send_status = 'C'
394
+
395
+ break
396
+
397
+
398
+
399
+ print('send_status: ' + send_status)
400
+
401
+ return
402
+
403
+
404
+
405
+
406
+
407
+ def listen_init(my_ip, ips, listen_ip='0.0.0.0', listen_port=50008):
408
+
409
+ """
410
+
411
+ 他のノードから通信が来るのを待ち、受信する
412
+
413
+ """
414
+
415
+ global receive_status
416
+
417
+
418
+
419
+ n_nodes = len(ips) - 1
420
+
421
+ receive_status = 'A'
422
+
423
+ t_listen_start_sec = time()
424
+
425
+
426
+
427
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
428
+
429
+ # IPアドレスとポートを指定してbindする
430
+
431
+ # FWやセキュリティポリシーで解放されているIP/Portにするべきである
432
+
433
+ s.bind((listen_ip, listen_port))
434
+
435
+ # 接続待ち受け 20秒何も来なかったら抜ける
436
+
437
+ s.settimeout(20)
438
+
439
+ s.listen(10)
440
+
441
+ s.setblocking(False)
442
+
443
+
444
+
445
+ # connectionするまで待つ
446
+
447
+ while True:
448
+
449
+ try:
450
+
451
+ # 接続
452
+
453
+ print('listening at: ' + listen_ip + ', ' + str(listen_port))
454
+
455
+ conn, addr = s.accept()
456
+
457
+ print('got connection from: ' + addr[0] + ', ' + str(addr[1]))
458
+
459
+ data = conn.recv(4096)
460
+
461
+ if data:
462
+
463
+ conn.sendall(b'OK: Received your info.')
464
+
465
+ receive_set.add(addr[0]) # 実際は相手にPublic IPv4 Addressをメッセージに入れてもらっている
466
+
467
+ else:
468
+
469
+ conn.sendall(b'NG: Nothing received.')
470
+
471
+ except socket.timeout:
472
+
473
+ print('listen timeout')
474
+
475
+ pass
476
+
477
+ except:
478
+
479
+ receive_status = 'Z'
480
+
481
+ raise
482
+
483
+
484
+
485
+ if len(receive_set) == n_nodes:
486
+
487
+ break
488
+
489
+ else:
490
+
491
+ t_listen_current_sec = time()
492
+
493
+ t_listen_elapsed_sec = t_listen_current_sec - t_listen_start_sec
494
+
495
+ if t_listen_elapsed_sec < 100:
496
+
497
+ continue
498
+
499
+ elif len(receive_set) > 9*n_nodes//10:
500
+
501
+ receive_status = 'B'
502
+
503
+ break
504
+
505
+ else:
506
+
507
+ receive_status = 'C'
508
+
509
+ break
510
+
511
+
512
+
513
+ for addr in receive_set:
514
+
515
+ receivable_ips.append(addr)
516
+
517
+
518
+
519
+ print('receive_status: ' + receive_status)
520
+
521
+ return
522
+
523
+
524
+
525
+
526
+
527
+ def p2p_setup_main():
528
+
529
+ my_ip = 'x.x.x.x'
530
+
531
+ ips = ['x.x.x.x', 'y.y.y.y']
532
+
533
+ try:
534
+
535
+ t_try_init = threading.Thread(target=try_init, args=(my_ip, ips))
536
+
537
+ t_listen_init = threading.Thread(target=listen_init, args=(my_ip, ips))
538
+
539
+ t_try_init.start()
540
+
541
+ t_listen_init.start()
542
+
543
+ t_try_init.join()
544
+
545
+ t_listen_init.join()
546
+
547
+
548
+
549
+ except:
550
+
551
+ raise
552
+
553
+
554
+
555
+ return
556
+
557
+
558
+
559
+ if __name__ == '__main__':
560
+
561
+ p2p_setup_main()
562
+
563
+ ```
564
+
565
+
566
+
567
+ 出力(エラー)
568
+
569
+ ```
570
+
571
+ (python3) [ec2-user@ip-x-x-x-x ~]$ /home/ec2-user/venv/python3/bin/python /home/ec2-user/p2p_test/threading_p2p.py
572
+
573
+ [initial send_queue]
574
+
575
+ ['y.y.y.y']
576
+
577
+ trying to connect: y.y.y.y, 50008
578
+
579
+ Exception in thread Thread-1:
580
+
581
+ Traceback (most recent call last):
582
+
583
+ File "/usr/lib64/python3.7/threading.py", line 926, in _bootstrap_inner
584
+
585
+ self.run()
586
+
587
+ File "/usr/lib64/python3.7/threading.py", line 870, in run
588
+
589
+ self._target(*self._args, **self._kwargs)
590
+
591
+ File "/home/ec2-user/p2p_test/threading_p2p.py", line 43, in try_init
592
+
593
+ s.connect((addr, dest_port))
594
+
595
+ BlockingIOError: [Errno 115] Operation now in progress
596
+
597
+
598
+
599
+ listening at: 0.0.0.0, 50008
600
+
601
+ Exception in thread Thread-2:
602
+
603
+ Traceback (most recent call last):
604
+
605
+ File "/usr/lib64/python3.7/threading.py", line 926, in _bootstrap_inner
606
+
607
+ self.run()
608
+
609
+ File "/usr/lib64/python3.7/threading.py", line 870, in run
610
+
611
+ self._target(*self._args, **self._kwargs)
612
+
613
+ File "/home/ec2-user/p2p_test/threading_p2p.py", line 101, in listen_init
614
+
615
+ conn, addr = s.accept()
616
+
617
+ File "/usr/lib64/python3.7/socket.py", line 212, in accept
618
+
619
+ fd, addr = self._accept()
620
+
621
+ BlockingIOError: [Errno 11] Resource temporarily unavailable
622
+
623
+ ```