読者です 読者をやめる 読者になる 読者になる

Broker を使った N:N な REQ/REP パターン (XREQ/XREP)

概要

Broker を間に置いた REQ/REP パターンを作ってみます。
基本的にはどう頑張っても 1:N しか出来ないのですが中間に Broker をおくことで、 N:N を実現しています。

実現したいこと

REQ と REP どちらかの 1:N 構成ではなく、N:N 構成をする

ただ、今回は REQ 側はめんどくさいので 1 構成になっています。

構成

1 つの Broker に対して複数の REP と REQ がぶら下がります。Broker が死ぬと破綻します。

broker.py

Broker を実現するには ROUTER と DEALER を使います。
簡単に言えば集約して、再度配るといった感じです。

Broker は ROUTER と DEALER 両方で bind しています。

import zmq
from zmq.eventloop import ioloop

loop = ioloop.IOLoop.instance()

context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind('tcp://*:6000')
backend.bind('tcp://*:6001')

def frontend_handler(sock, events):
  message = sock.recv()
  more = sock.getsockopt(zmq.RCVMORE)
  if more:
    backend.send(message, zmq.SNDMORE)
  else:
    backend.send(message)

def backend_handler(sock, events):
  message = sock.recv()
  more = sock.getsockopt(zmq.RCVMORE)
  if more:
    frontend.send(message, zmq.SNDMORE)
  else:
    frontend.send(message)

loop.add_handler(frontend, frontend_handler, zmq.POLLIN)
loop.add_handler(backend, backend_handler, zmq.POLLIN)

loop.start()

rep.py

REP 側は Connect で Broker につなぎます。こうすることで複数の REP を立てられます。

import zmq
from zmq.eventloop import ioloop

loop = ioloop.IOLoop.instance()

context = zmq.Context()
sock = context.socket(zmq.REP)
sock.connect("tcp://localhost:6001")

def rep_handler(sock, events):
  [i, message] = sock.recv_json()
  print "Received request ", i, "[", message, "]"
  sock.send("World")

loop.add_handler(sock, rep_handler, zmq.POLLIN)

loop.start()

req.py

REQ 側も Connect で Broker につなぎます。こうすることで複数の REQ を立てられます。

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.REQ)
sock.connect('tcp://127.0.0.1:6000')

for i in range(1,10):
  sock.send_json([i, "Hello"])
  message = sock.recv()
  print "Received reply ", i, "[", message, "]"

実行例

まず broker.py を起動して rep.py を 3 つほど引っかけます。
そこで req.py を実行してみます。

req.py

Received reply 1 [ World ]
Received reply 2 [ World ]
Received reply 3 [ World ]
Received reply 4 [ World ]
Received reply 5 [ World ]
Received reply 6 [ World ]
Received reply 7 [ World ]
Received reply 8 [ World ]
Received reply 9 [ World ]

1: rep.py

Received request 2 [ Hello ]
Received request 5 [ Hello ]
Received request 8 [ Hello ]

2: rep.py

Received request 1 [ Hello ]
Received request 4 [ Hello ]
Received request 7 [ Hello ]

3: rep.py

Received request 3 [ Hello ]
Received request 6 [ Hello ]
Received request 9 [ Hello ]

無事 Broker を介してレスポンスが帰ってきました。ただこれリクエスト側がマルチに送ってないので、どれか一個でもブロックすると残念な感じになります。

なので基本的には REQ 側も複数立てて 1 リクエスト 1 メッセージの仕組みを使うべきです。

感想

Broker と REQ 側は Erlang で書く予定だったのですが、面倒だったので Python で書いてしまいました。ZMQ は API がとてもシンプルになっているので、サクサク書き進められますね。

ZMQ を理解するには色々なルーティングパターンを作って触ってみるのが早いです。

今回の仕組みを使う事で簡単に処理を分散させることが出来ました。

次やりたいこと

Broker 経由 + PUB/SUB を使った仕組みなどをやってみたいと思います。