第二回 一人 Multiprocessing 勉強会

続けるつもりは無かったんですが、勉強しておいて損のないだろうなぁと思ったので。
いままでは time.sleep で空かどうかを判定していたんですが、
それはいけてないらしく queue.get(block=True) をすることで read 状態にしてブロックさせてみていました。

また、終了処理は結構適当で process の list から porcess を持ってきて terminate() しています。
その後 event.abort して pyevent 側も終了しています。

queue.get() を無理矢理終わらせているので、キレイに終了処理は出来ていません。
except: pass とかないよなぁ、ここどうにかキレイに終わらせられないかなぁ。

# vim: fileencoding=utf8

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from future_builtins import *

# Only Python 2.6.x

import signal
import socket
import traceback
import multiprocessing
import Queue
import time
import logging

# pyevent
import event

logging.basicConfig(
  level=logging.DEBUG,
  format='%(asctime)s %(pathname)s %(lineno)d %(levelname)-6s %(message)s',
)

class Server(object):
  def __init__(self):
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.sock.bind(('127.0.0.1', 5000))
    self.queue = multiprocessing.Queue()
    self.processes = []

  def f(self, queue):
    while True:
      try:
        (packet, address) = self.queue.get(block=True)
        pid = multiprocessing.current_process().pid
        logging.info('%d:%s' % (pid, address))
        self.sock.sendto(packet, address)
      except Queue.Empty, e:
        logging.error('Queue Empty.')
      except:
        pass

  def start(self):
    logging.info('start')
    for i in range(multiprocessing.cpu_count()):
      p = multiprocessing.Process(target=self.f, args=(self.queue,))
      self.processes.append(p)
      p.start()

    event.init()
    event.event(self.handle_read, handle=self.sock,
        evtype=event.EV_READ | event.EV_PERSIST).add()
    event.signal(signal.SIGINT, self.stop, signal.SIGINT).add()
    event.dispatch()

  def stop(self, sig):
    logging.info('stop')
    for p in self.processes:
      p.terminate()
    event.abort()

  def handle_read(self, ev, sock, entype, args):
    packet, address = sock.recvfrom(8192)
    self.queue.put((packet, address))

if __name__ == '__main__':
  Server().start()