読者です 読者をやめる 読者になる 読者になる

amq.topic のサンプル

fanout と direct は理解したので topic をのメモを。
topic は # と * を使って一つの routing key で複数の Queue に格納するような仕組みです。

例えば routing key が 'spam.eggs.bacon' という key で、 queue の binding key が 'spam.*' と '#.eggs.bacon' という binding key の場合は両方にメッセージが格納されれます。

# は一単語にマッチ、 * は複数単語マッチです。メッセージを柔軟に送ることができます。たとえば '#.eggs.#' という binding key を作成することも可能です。

簡単なサンプルを作ってみました、direct とほとんど変わっていません。

amqp_receive.py

# vim: fileencoding=utf8

from amqplib import client_0_8 as amqp

#kUserId = 'guest'
#kPassword = 'guest'
#kHost = 'dev.rabbitmq.com'

def callback(message):
  for key, val in message.properties.items():
    print '%s: %s' % (key, str(val))
  for key, val in message.delivery_info.items():
    print '> %s: %s' % (key, str(val))

  print ''
  print message.body
  print '-------'
  message.channel.basic_ack(message.delivery_tag)

  if message.body == 'quit':
    message.channel.basic_cancel(message.consumer_tag)

def main():
  connection = amqp.Connection(kHost, userid=kUserId, password=kPassword, ssl=False)

  channel = connection.channel()
  channel.access_request('/data', active=True, read=True)

  channel.exchange_declare('mytopic', 'topic', auto_delete=True)
  qname, _, _ = channel.queue_declare()
  channel.queue_bind(qname, 'mytopic', routing_key='#.eggs')
  channel.basic_consume(qname, callback=callback)

  while channel.callbacks:
    channel.wait()

  channel.close()
  connection.close()

if __name__ == '__main__':
  main()

amqp_send.py

# vim: fileencoding=utf8

from datetime import datetime

from amqplib import client_0_8 as amqp

#kUserId = 'guest'
#kPassword = 'guest'
#kHost = 'dev.rabbitmq.com'

def main():
  message_body = datetime.now().isoformat()

  connection = amqp.Connection(kHost, userid=kUserId, password=kPassword, ssl=False)

  channel = connection.channel()
  channel.access_request('/data', active=True, write=True)

  channel.exchange_declare('mytopic', 'topic', auto_delete=True)

  application_headers = {'spam': 7, 'eggs': 'bacon'}

  message = amqp.Message(message_body,
      content_type='text/plain', application_headers=application_headers)

  for i in range(10):
    channel.basic_publish(message, 'mytopic', routing_key='spam.eggs')

  channel.close()
  connection.close()

if __name__ == '__main__':
  main()

amq.topic
amq.topic posted by (C)voluntas