質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

90.33%

  • Python

    9214questions

    Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

  • Python 3.x

    7404questions

    Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

pyzmq ブローカーを作成したい

解決済

回答 1

投稿

  • 評価
  • クリップ 0
  • VIEW 624
退会済みユーザー

退会済みユーザー

クライアント→サーバー←クライアントという風にクライアントの仲介役を作成したいです

http://pyzmq.readthedocs.io/en/latest/index.html
前回の質問で貼ってくださったこれに目を通してみたものの自分には難しくて手がかりが掴めず

仲介役を挟みたいクライアントのコードはこんな感じです

mport zmq

#クライアント

host="127.0.0.1"
port=6789

context=zmq.Context()
client=context.socket(zmq.REQ)
client.connect("tcp://%s:%s"%(host,port))

y=90
client.send_pyobj(y)

x=client.recv_pyobj()
print(x)
import zmq

#クライアント

host="127.0.0.1"
port=6789

context=zmq.Context()
client=context.socket(zmq.REQ)
client.connect("tcp://%s:%s"%(host,port))

y=50
client.send_pyobj(y)

x=client.recv_pyobj()
print(x)

片方は50の数値を送り、もう片方は90の数値を送るプログラムですが
これらを起動しても、両方が同期要求をする為、何も動きません
この間に、仲介役のサーバーを作成して反対側のクライアントに情報を渡したいわけです

自分がイメージしているサーバーはこんな感じです

import zmq

#サーバー

host="127.0.0.1"
port=6789

context=zmq.Context()
server=context.socket(zmq.REP)
server.bind("tcp://%s:%s"%(host,port))

x=server.recv_pyobj()

server.send_pyobj(x)

受け取った値をそのまま返すサーバーです
後はこれを複数のメッセージや、複数のクライアントとやり取りが出来るように変形すればいいのかなと思っているのですが、その複数のやり取りがどうすればいいのかわからなくて...

わかる方いらしたらよろしくお願いします

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 1

checkベストアンサー

+2

REQ/REPが一方向の場合のブローカ/クライアント/ワーカー(サーバー)例は2.2.7 共有キュー (DEALER and ROUTER sockets)Shared Queue (DEALER and ROUTER sockets)
にあるとおり、比較的簡単に記述できます。

しかし、お互いがREQ/REPしあう場合はちょっと複雑になりそうです。
とりあえず以下のような動作をするサンプルを作成してみました。

  • AliceBobという2人がいる。
  • Aliceclient(REQ)Bobworker(REP)であるが、途中で仕事を代わることができる。
  • clientが送信した数値をworkerは+1して返す。
  • ブローカはAlice->Bob と Bob->Alice の両方向を仲介する。

client_worker.py

import zmq

# client動作 : 入力値を送信し結果を取得する
# 入力値が0ならworkerになる
def clientProc( client):
    print("client start")
    send = int(input("input number(0 to change worker):"))
    client.send_pyobj(send)
    recv = client.recv_pyobj()
    print("client recv[%s] end" %(str(recv)))
    return send

# worker動作 : 受信値に+1した値を返す
# 受信値が0ならclientになる
def workerProc( worker):
    print("worker start")
    recv = worker.recv_pyobj()
    print("worker recv[%s]"%(str(recv)))
    send = recv + 1
    worker.send_pyobj(send)
    print("worker end")
    return recv

if __name__ == '__main__':

    ctx = zmq.Context()
    role = input("role ? [a]=Alice or [b]=Bob :")

    if role == "a":    # Alice : 最初はclient
        port = [6710,6721];  job = "client"
    elif role == "b":  # Bob   : 最初はworker
        port = [6720,6711];  job = "worker"

    client = ctx.socket(zmq.REQ); client.connect("tcp://127.0.0.1:%d"%(port[0]))
    worker = ctx.socket(zmq.REP); worker.connect("tcp://127.0.0.1:%d"%(port[1]))

    while True:
        if job == "client":
            ret = clientProc( client)
            if ret == 0: job = "worker"
        else:
            ret = workerProc(worker)
            if ret == 0: job = "client"

    print("end")

broker.py : Alice/Bob間の相方向にブローカ動作する

import zmq

if __name__ == '__main__':
    ctx = zmq.Context()

    # Alice -> Bob
    reqA2B = ctx.socket(zmq.ROUTER); reqA2B.bind("tcp://127.0.0.1:6710")
    resA2B = ctx.socket(zmq.DEALER); resA2B.bind("tcp://127.0.0.1:6711")

    # Bob -> Alice
    reqB2A = ctx.socket(zmq.ROUTER); reqB2A.bind("tcp://127.0.0.1:6720")
    resB2A = ctx.socket(zmq.DEALER); resB2A.bind("tcp://127.0.0.1:6721")

    print("broker start")

    # Initialize poll set
    poller = zmq.Poller()
    poller.register(reqB2A, zmq.POLLIN);  poller.register(resB2A, zmq.POLLIN)
    poller.register(reqA2B, zmq.POLLIN);  poller.register(resA2B, zmq.POLLIN)

    # Switch messages between sockets
    while True:
        socks = dict(poller.poll())

        if socks.get(reqB2A) == zmq.POLLIN:
            msg = reqB2A.recv_multipart()   # from Bob
            print("Request : Bob -> Alice[%s]"%(str(msg)))
            resB2A.send_multipart(msg)      # to Alice

        if socks.get(resB2A) == zmq.POLLIN:
            msg = resB2A.recv_multipart()   # from Alice
            print("Response : Alice -> Bob[%s]"%(str(msg)))
            reqB2A.send_multipart(msg)      # to Bob

        if socks.get(reqA2B) == zmq.POLLIN:
            msg = reqA2B.recv_multipart()   # from Alice
            print("Request : Alice -> Bob[%s]"%(str(msg)))
            resA2B.send_multipart(msg)      # to Bob

        if socks.get(resA2B) == zmq.POLLIN:
            msg = resA2B.recv_multipart()   # from Bob
            print("Response : Bob -> Alice[%s]"%(str(msg)))
            reqA2B.send_multipart(msg)      # to Alice

    print("broker end")

以上のように、ちょっと複雑です。
そもそも、もっとよいメッセージングパターンがあるような気がしますので、ガイドブックを一読することを勧めます。

ブローカ例を追加

REQ/REP一方向だけであれば、以下のように簡潔に記述できます。
両方向分、2プロセス立ち上げる必要ありますが、こちらのほうがお勧めです。

broker2.py : Alice->Bob or Bob->Aliceいずれか一方向のブローカ動作する。起動時に選択する。

import zmq
if __name__ == '__main__':
    ctx = zmq.Context()
    role = input("Broker role ? [a]=Alice->Bob or [b]=Bob->Alice :")
    if role == "a":     port = [6710,6711]
    elif role == "b":   port = [6720,6721]

    print("broker start")
    req = ctx.socket(zmq.ROUTER); req.bind("tcp://127.0.0.1:%d"%(port[0]))
    res = ctx.socket(zmq.DEALER); res.bind("tcp://127.0.0.1:%d"%(port[1]))
    zmq.proxy( req,res)
    print("broker end")

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2017/03/29 19:35

    なるほど、ありがとうございます 自分が思ってたよりも複雑になるんですね...
    もっと簡単に出来るもんだと思ってました...
    サンプルコードまでありがとうございます
    現状読み解けそうにないので、理解できるまで勉強してみようと思います

    キャンセル

  • 2017/03/29 20:16

    ブローカは一方向でプロセス(.py)毎にすると簡潔になるのでサンプル追加しました。
    実際に動かすと動作は理解しやすいかと思います。

    キャンセル

  • 2017/03/29 22:31

    ありがとうございます!
    元々、色々検索しても英語のサイトばかりだったり、言葉が難しかったりして、入門python3の書籍に書いてあった数少ない説明と、サンプルコードから自分なりに読み解いた程度の知識しかなかったのですが、追記してくださったコードを眺めてたら、色んなもやもやが解けました

    まず、プロセス毎に同期応答やら同期要求やらが一括りになっているものだと思っていましたがメッセージ毎に決められるんですね

    そして複数のメッセージのやり取りの仕方もわかっていなかったのですが、これをするにはもう一つポートを作ってやればいいんですね

    自分が張ったコードの下の3行に関しては決まり事として認識していたんですが、おかげ様で何をしているか理解する事が出来ました

    context=zmq.Context()
    client=context.socket(zmq.REQ)
    client.connect("tcp://%s:%s"%(host,port))

    正直自分が理解できる情報が少なくて、どうすればいいのか手がかりがまったく掴めていなかったのですが、何をすればいいか見えてきました
    改めてありがとうございます 本当に助かりました

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 90.33%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

同じタグがついた質問を見る

  • Python

    9214questions

    Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

  • Python 3.x

    7404questions

    Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。