質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.47%
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

2回答

1393閲覧

asyncioでの並列処理

taro_yamada

総合スコア55

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2020/12/26 14:51

編集2020/12/26 23:54

websocketsとasyncioを使ってメッセージを受信する非同期処理を作成しています。
一つのメッセージを受け取った後、一定程度の処理をした後にwebに接続という流れのプログラムを考えています。

 今回相談したいのは、前回の処理メッセージを受け取り、処理している最中に、次のメッセージの処理を始めることは可能でしょうかという点です。

 clientの処理でいうと、eventメソッドのwait終わりのメッセージが表示される前に「受け取り直後」というメッセージを表示させたいと思っています。

12/27追記
gatherを使ったコードに変更してみました。
メソッド1とメソッド2が同時に始まるようになったのですが、受け取ったメッセージに対して、メソッド1と2が終わらないと、次のメッセージの受け取りが始まりません。
メソッド1だけ終わったら次のメッセージを受け取りに行くといった形にしたいのですが、可能でしょうか?

client

1import asyncio 2import json 3import websockets 4import urllib.request 5import urllib.error 6import datetime 7 8# --- 9 10 11 async with websockets.connect(uri, ping_timeout=None) as ws: 12 while not ws.closed: 13 14 response = await ws.recv() 15 board = json.loads(response) 16 await asyncio.gather( 17 event(board), 18 get_url() 19 ) 20 21async def event(board): 22 print('これから5秒待つ' + str(board)) 23 await asyncio.sleep(5) 24 print('wait終わり') 25 print(datetime.datetime.now()) 26 27 28async def get_url(): 29 url = 'http://www.fukushizaidan.jp/' 30 req = urllib.request.Request(url) 31 with urllib.request.urlopen(req) as res: 32 print(res.read()) 33 34loop = asyncio.get_event_loop() 35loop.create_task(stream()) 36try: 37 loop.run_forever() 38except KeyboardInterrupt: 39 exit()

server

1#!/usr/bin/env python 2 3# WS server that sends messages at random intervals 4 5import asyncio 6import datetime 7import random 8import websockets 9import json 10 11 12async def time(websocket, path): 13 while True: 14 now = datetime.datetime.utcnow().isoformat() + "Z" 15 char = 'longlongstr' 16 content = {'message': char, 'time': str(now)} 17 j_content = json.dumps(content) 18 await websocket.send(j_content) 19 await asyncio.sleep(random.random() * 3) 20 21start_server = websockets.serve(time, "127.0.0.1", 5678) 22 23asyncio.get_event_loop().run_until_complete(start_server) 24asyncio.get_event_loop().run_forever()

<結果>
これから5秒待つ{'message': 'longlongstr', 'time': '2020-12-26T23:43:58.316069Z'}
b'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
wait終わり
2020-12-27 08:44:03.311604
これから5秒待つ{'message': 'longlongstr', 'time': '2020-12-26T23:44:00.082327Z'}
b'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
wait終わり
2020-12-27 08:44:08.321386
これから5秒待つ{'message': 'longlongstr', 'time': '2020-12-26T23:44:02.372114Z'}
b'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

ベストアンサー

質問の文章とコードだといまいち分かりにくかったので、client.pyとserver.pyを、処理の流れがわかる形で出力するように書き直しました。
質問文に記載の通り**「何らかの処理(ここでは5秒sleep)を行った後、次のメッセージを受け取って同じように『何らかの処理』を行う。ただし裏で逐次ダウンロードを行う」**という単位動作を同時並行にやっているように見せています。

server.py

python

1#!/usr/bin/env python 2 3# WS server that sends messages at random intervals 4 5import asyncio 6import datetime 7import random 8import websockets 9import json 10 11 12async def time(websocket, path): 13 count = 0 14 while True: 15 now = datetime.datetime.utcnow().isoformat() + "Z" 16 cid = f"{count:>6}" 17 char = 'longlongstr' 18 content = {'id':cid, 'message': char, 'time': str(now)} 19 j_content = json.dumps(content) 20 count += 1 21 await websocket.send(j_content) 22 await asyncio.sleep(random.random() * 3) 23 24start_server = websockets.serve(time, "127.0.0.1", 5678) 25 26asyncio.get_event_loop().run_until_complete(start_server) 27asyncio.get_event_loop().run_forever()

client.py

python

1# https://teratail.com/questions/312558#reply-436434 2import asyncio 3import json 4import websockets 5import urllib.request 6import urllib.error 7import datetime 8import asyncio 9import datetime 10# --- 11 12 13def print_t(text): 14 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} {text}") 15 16 17def finish(board): 18 print_t(f"[TASKID:{board.result()}] タスクが終了しました") 19 20 21async def stream(): 22 23 uri = 'ws://127.0.0.1:5678/' 24 async with websockets.connect(uri, ping_timeout=None) as ws: 25 while not ws.closed: 26 response = await ws.recv() 27 board = json.loads(response) 28 await event(board) 29 30 31async def event(board): 32 loop = asyncio.get_event_loop() 33 print_t(f"[TASKID:{board.get('id')}] 受け取ったメッセージの「何らかの処理」を開始します(ダミーとして5秒sleep) {str(board)}") 34 await asyncio.sleep(5) 35 print_t(f"[TASKID:{board.get('id')}] 「何らかの処理」が完了しました。(「wait終わり」)続いてダウンロードを開始します。") 36 task = loop.create_task(get_url(board.get('id'))) 37 task.add_done_callback(finish) 38 39 40async def get_url(cid): 41 url = 'http://www.example.com/' 42 req = urllib.request.Request(url) 43 with urllib.request.urlopen(req) as res: 44 print_t(f"[TASKID:{cid}] ダウンロード完了:先頭20文字{res.read()[:20]}") 45 return cid 46 47 48loop = asyncio.get_event_loop() 49loop.create_task(stream()) 50try: 51 loop.run_forever() 52except KeyboardInterrupt: 53 exit()

タスクは、それぞれ(ほぼ)「5秒毎」で順番に処理されていきますが、同一TASKIDに対応するダウンロード処理はメッセージに対する「何らかの処理」が完了した直後に開始され、裏でダウンロード処理されていることがお分かりかと思います。
(ダウンロード時間が短いためわかりにくいですが)

投稿2020/12/27 02:26

編集2020/12/27 08:33
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2020/12/27 07:48

ありがとうございます。まさに希望通り動いていると思います。 読み方を教えてください。 (まず、defの外にあるloop「create_task(stream()」は残して実行するということでいいと思いますが・・・) async def eventの中でもloopを宣言して、create_task「get_url」を実行し、新たなコルーチンが始まったので、並行して新たな「async def event」が、始まるというイメージでしょうか?
退会済みユーザー

退会済みユーザー

2020/12/27 08:13 編集

下半分を追記し完全なコードにしました。 前提として、質問文は「メソッド1だけ終わったら次のメッセージを受け取りに行くといった形にしたい」という文面になっている。 ですからこの仕様に沿った形にしています。   起点は、defの外にあるloop.create_task(stream())です。 呼び出したstream()のwhileループの中で、event()を呼んでいます。 そして、event()の中でまず、質問文の「メソッド1」こと「asyncio.sleep(5)」という処理を行い、 それが完了した後で、質問文の「メソッド2」こと「get_url()」を新たなtaskとして生成しています。 「メソッド2」ことget_url()は、stream()及びevent()と並行して実行されます。 たとえば、id:0の「メソッド1」(sleep)が終了したあと、serverがid:1のboardを送出していれば、すぐにclientのstream()のwhileループの中でid:1のboardの受け取りが行われ、id:1を持ったboardがevent()に送られ、id:1の「メソッド1」が実行され・・・という流れです。 TASKID:0の「メソッド2」ことダウンロード処理(get_url())は、TASKID:0の「メソッド1」が終了したあと、TASKID:1の「メソッド1」(sleep)と並行して行われています。 >async def eventの中でもloopを宣言して、create_task「get_url」を実行し、新たなコルーチンが始まったので、並行して新たな「async def event」が、始まるというイメージでしょうか? stream()及びevent()と並行して実行されるのは、get_url()です。 stream()及びevent()は、stream()及びevent()と並行して実行されることはありません。
taro_yamada

2020/12/27 08:45

ありがとうございました。非常によくわかりました。 まだ、asyncio関係のコードを書くので、また質問するかもしれません。 その際は、またよろしくお願いします。
guest

0

asyncio.gather を使えば実現できます。

Python

1async def stream(): 2 uri = 'ws://127.0.0.1:5678/' 3 4 async with websockets.connect(uri, ping_timeout=None) as ws: 5 while not ws.closed: 6 response = await ws.recv() 7 board = json.loads(response) 8 await asycio.gather( 9 event(board), 10 get_url() 11 )

ちなみに並行処理と並列処理は別物ですのでご注意ください。

投稿2020/12/26 14:59

A_kirisaki

総合スコア2853

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

taro_yamada

2020/12/26 23:55

ご回答ありがとうございます。 それを受けて、質問を編集しました。 メソッドを実行している間に次のメッセージを受け取りたいです。可能でしょうか?
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.47%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問