amq.direct のサンプル

fanout はサンプルであるので、direct のサンプルを。
メッセージを10個送ってみています。たんに for で回しただけですが。
一応二つ受け取りキューを生成して、送り先一つ、受け取り二つという amq.direct で routing key を使った multicast をやってみました。分かりにくいですが、とりあえずサンプル実行画像を貼り付けておきます。

PyCon 2009 でも AMQP ネタがあがっていて、「Twisted, AMQP and Thrift: Bridging messaging and RPC」という話が出ています。

Thrift についてはまぁ、この辺でしょうか。

pyamqplib
pyamqplib posted by (C)voluntas

amqp_receive.py

# vim: fileencoding=utf8

from amqplib import client_0_8 as amqp

#kUserId = 'guest'
#kPassword = 'guest'
#kHost = 'dev.rabbitmq.com'

def callback(message):
  for key, val in message.properties.items():
    print '%s: %s' % (key, str(val))
  for key, val in message.delivery_info.items():
    print '> %s: %s' % (key, str(val))

  print ''
  print message.body
  print '-------'
  message.channel.basic_ack(message.delivery_tag)

  if message.body == 'quit':
    message.channel.basic_cancel(message.consumer_tag)

def main():
  connection = amqp.Connection(kHost, userid=kUserId, password=kPassword, ssl=False)

  channel = connection.channel()
  channel.access_request('/data', active=True, read=True)

  channel.exchange_declare('mydirect', 'direct', auto_delete=True)
  qname, _, _ = channel.queue_declare()
  channel.queue_bind(qname, 'mydirect', routing_key='multi')
  channel.basic_consume(qname, callback=callback)

  while channel.callbacks:
    channel.wait()

  channel.close()
  connection.close()

if __name__ == '__main__':
  main()

amqp_send.py

# vim: fileencoding=utf8

from datetime import datetime

from amqplib import client_0_8 as amqp

#kUserId = 'guest'
#kPassword = 'guest'
#kHost = 'dev.rabbitmq.com'

def main():
  message_body = datetime.now().isoformat()

  connection = amqp.Connection(kHost, userid=kUserId, password=kPassword, ssl=False)

  channel = connection.channel()
  channel.access_request('/data', active=True, write=True)

  channel.exchange_declare('mydirect', 'direct', auto_delete=True)

  application_headers = {'spam': 7, 'eggs': 'bacon'}

  message = amqp.Message(message_body,
      content_type='text/plain', application_headers=application_headers)

  for i in range(10):
    channel.basic_publish(message, 'mydirect', routing_key='multi')

  channel.close()
  connection.close()

if __name__ == '__main__':
  main()

非同期で取り出したり、非同期で送ったりがまだできてない感じで、さらに tx_commit のようなトランザクションを明示的につかってるかどうかもまだわかりません。おそらく同期しているのので tx 系は使っている ... ような気がしますが。