REQ/REPが一方向の場合のブローカ/クライアント/ワーカー(サーバー)例は2.2.7 共有キュー (DEALER and ROUTER sockets) やShared Queue (DEALER and ROUTER sockets)
にあるとおり、比較的簡単に記述できます。
しかし、お互いがREQ/REPしあう場合はちょっと複雑になりそうです。
とりあえず以下のような動作をするサンプルを作成してみました。
Alice
とBob
という2人がいる。
Alice
はclient(REQ)
、Bob
はworker(REP)
であるが、途中で仕事を代わることができる。
client
が送信した数値をworker
は+1して返す。
ブローカはAlice
->Bob
と Bob
->Alice
の両方向を仲介する。
client_worker.py
Python
1 import zmq
2
3 # client動作 : 入力値を送信し結果を取得する
4 # 入力値が0ならworkerになる
5 def clientProc ( client ) :
6 print ( "client start" )
7 send = int ( input ( "input number(0 to change worker):" ) )
8 client . send_pyobj ( send )
9 recv = client . recv_pyobj ( )
10 print ( "client recv[%s] end" % ( str ( recv ) ) )
11 return send
12
13 # worker動作 : 受信値に+1した値を返す
14 # 受信値が0ならclientになる
15 def workerProc ( worker ) :
16 print ( "worker start" )
17 recv = worker . recv_pyobj ( )
18 print ( "worker recv[%s]" % ( str ( recv ) ) )
19 send = recv + 1
20 worker . send_pyobj ( send )
21 print ( "worker end" )
22 return recv
23
24 if __name__ == '__main__' :
25
26 ctx = zmq . Context ( )
27 role = input ( "role ? [a]=Alice or [b]=Bob :" )
28
29 if role == "a" : # Alice : 最初はclient
30 port = [ 6710 , 6721 ] ; job = "client"
31 elif role == "b" : # Bob : 最初はworker
32 port = [ 6720 , 6711 ] ; job = "worker"
33
34 client = ctx . socket ( zmq . REQ ) ; client . connect ( "tcp://127.0.0.1:%d" % ( port [ 0 ] ) )
35 worker = ctx . socket ( zmq . REP ) ; worker . connect ( "tcp://127.0.0.1:%d" % ( port [ 1 ] ) )
36
37 while True :
38 if job == "client" :
39 ret = clientProc ( client )
40 if ret == 0 : job = "worker"
41 else :
42 ret = workerProc ( worker )
43 if ret == 0 : job = "client"
44
45 print ( "end" )
broker.py : Alice/Bob間の相方向にブローカ動作する
Python
1 import zmq
2
3 if __name__ == '__main__' :
4 ctx = zmq . Context ( )
5
6 # Alice -> Bob
7 reqA2B = ctx . socket ( zmq . ROUTER ) ; reqA2B . bind ( "tcp://127.0.0.1:6710" )
8 resA2B = ctx . socket ( zmq . DEALER ) ; resA2B . bind ( "tcp://127.0.0.1:6711" )
9
10 # Bob -> Alice
11 reqB2A = ctx . socket ( zmq . ROUTER ) ; reqB2A . bind ( "tcp://127.0.0.1:6720" )
12 resB2A = ctx . socket ( zmq . DEALER ) ; resB2A . bind ( "tcp://127.0.0.1:6721" )
13
14 print ( "broker start" )
15
16 # Initialize poll set
17 poller = zmq . Poller ( )
18 poller . register ( reqB2A , zmq . POLLIN ) ; poller . register ( resB2A , zmq . POLLIN )
19 poller . register ( reqA2B , zmq . POLLIN ) ; poller . register ( resA2B , zmq . POLLIN )
20
21 # Switch messages between sockets
22 while True :
23 socks = dict ( poller . poll ( ) )
24
25 if socks . get ( reqB2A ) == zmq . POLLIN :
26 msg = reqB2A . recv_multipart ( ) # from Bob
27 print ( "Request : Bob -> Alice[%s]" % ( str ( msg ) ) )
28 resB2A . send_multipart ( msg ) # to Alice
29
30 if socks . get ( resB2A ) == zmq . POLLIN :
31 msg = resB2A . recv_multipart ( ) # from Alice
32 print ( "Response : Alice -> Bob[%s]" % ( str ( msg ) ) )
33 reqB2A . send_multipart ( msg ) # to Bob
34
35 if socks . get ( reqA2B ) == zmq . POLLIN :
36 msg = reqA2B . recv_multipart ( ) # from Alice
37 print ( "Request : Alice -> Bob[%s]" % ( str ( msg ) ) )
38 resA2B . send_multipart ( msg ) # to Bob
39
40 if socks . get ( resA2B ) == zmq . POLLIN :
41 msg = resA2B . recv_multipart ( ) # from Bob
42 print ( "Response : Bob -> Alice[%s]" % ( str ( msg ) ) )
43 reqA2B . send_multipart ( msg ) # to Alice
44
45 print ( "broker end" )
以上のように、ちょっと複雑です。
そもそも、もっとよいメッセージングパターン
があるような気がしますので、ガイドブックを一読することを勧めます。
ブローカ例を追加
REQ/REP一方向だけであれば、以下のように簡潔に記述できます。
両方向分、2プロセス立ち上げる必要ありますが、こちらのほうがお勧めです。
broker2.py : Alice->Bob or Bob->Aliceいずれか一方向のブローカ動作する。起動時に選択する。
Python
1 import zmq
2 if __name__ == '__main__' :
3 ctx = zmq . Context ( )
4 role = input ( "Broker role ? [a]=Alice->Bob or [b]=Bob->Alice :" )
5 if role == "a" : port = [ 6710 , 6711 ]
6 elif role == "b" : port = [ 6720 , 6721 ]
7
8 print ( "broker start" )
9 req = ctx . socket ( zmq . ROUTER ) ; req . bind ( "tcp://127.0.0.1:%d" % ( port [ 0 ] ) )
10 res = ctx . socket ( zmq . DEALER ) ; res . bind ( "tcp://127.0.0.1:%d" % ( port [ 1 ] ) )
11 zmq . proxy ( req , res )
12 print ( "broker end" )
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
退会済みユーザー
2017/03/29 10:35
2017/03/29 11:16
退会済みユーザー
2017/03/29 13:31