Multiprocessing の使いどころ。

Pool はなんか上手いこと行かなそうな雰囲気だったので素直に Process と Queue で書いてみた。
pyevent は svn 版。約 1 ミリセック間隔パケット投げて取りこぼし0なのでまぁ大丈夫か?
プロセス管理もしてないし、いろいろ怖いなぁ。いかに Erlang がこれ系の処理に特化していて書きやすいかを思い知らされる。

# vim: fileencoding=utf8

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from future_builtins import *

# Only Python 2.6.x

import signal
import socket
import traceback
import multiprocessing
import Queue
import time
import logging

# pyevent
import event

logging.basicConfig(
  level=logging.DEBUG,
  format='%(asctime)s %(pathname)s %(lineno)d %(levelname)-6s %(message)s',
)

class Server(object):
  def __init__(self):
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.sock.bind(('127.0.0.1', 5000))
    self.queue = multiprocessing.Queue()
    #self.manager = multiprocessing.Manager()
    self.processes = []

  def f(self, queue):
    while True:
      if self.queue.empty():
        time.sleep(0.01)
        continue
      try:
        (packet, address) = self.queue.get(timeout=1)
      except Queue.Empty, e:
        logging.error('Queue Empty.')
      except:
        logging.error('Error.')

      pid = multiprocessing.current_process().pid
      logging.info('%d:%s' % (pid, address))
      self.sock.sendto(packet, address)

  def start(self):
    logging.info('start')
    for i in range(multiprocessing.cpu_count()):
      p = multiprocessing.Process(target=self.f, args=(self.queue,),
          name=i)
      self.processes.append(p)
      p.start()

    event.init()
    event.event(self.handle_read, handle=self.sock,
        evtype=event.EV_READ | event.EV_PERSIST).add()
    event.signal(signal.SIGINT, self.stop, signal.SIGINT).add()
    event.dispatch()

  def stop(self, sig):
    logging.info('stop')
    event.abort()
    #raise KeyboardInterrupt()

  def handle_read(self, ev, sock, entype, args):
    packet, address = sock.recvfrom(8192)
    self.queue.put((packet, address))

if __name__ == '__main__':
  Server().start()