kqueue -> epoll 版

勉強がてら epoll 版を書いてみましたとりあえずエコーはうまくいっています。
そして sshfs の偉大さを知った。sshfs でマウントして gvim で書いてます。

epoll の方が register/unregister なので、TCP と スレッドプールの相性は良さそう。

# vim: fileencoding=utf8

# Only Python 2.6.x

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

from future_builtins import *

import os
import sys
import socket
import logging
import multiprocessing
import Queue

# Linux Kernel 2.6 Only
from select import *

logging.basicConfig(
  level=logging.DEBUG,
  format='%(asctime)s %(pathname)s %(lineno)d %(levelname)-6s %(message)s',
)

class Server(object):
  def __init__(self, address, port):
    self.address = address
    self.port = port
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.sock.bind((address, port))
    self.queue = multiprocessing.Queue()
    self.processes = []
    self.ep = epoll(1024)
    self.ep.register(self.sock.fileno())

  def f(self, queue):
    while True:
      try:
        (packet, address) = self.queue.get(block=True, timeout=60)
        pid = multiprocessing.current_process().pid
        logging.info('%d:%s ... %s' % (pid, address, packet))
        self.sock.sendto(packet, address)
      except Queue.Empty, e:
        logging.info('queue empty: %d' % (os.getpid()))
      except:
        pass

  def start(self):
    logging.info('start: %s:%d' % (self.address, self.port,))
    for i in range(multiprocessing.cpu_count()):
      p = multiprocessing.Process(target=self.f, args=(self.queue,))
      self.processes.append(p)
      p.start()
    try:
      while True:
        for (fd, ev) in self.ep.poll():
          if ev & EPOLLIN:
            packet, address = self.sock.recvfrom(8192)
            self.queue.put((packet, address))
    except:
      self.stop()

  def stop(self):
    logging.info('stop: %s:%d' % (self.address, self.port,))
    for p in self.processes:
      logging.info('process terminate: %d' % (p.pid,))
      p.terminate()
    self.ep.close()
    logging.info('epoll close')
    self.sock.close()
    logging.info('socket close')

if __name__ == '__main__':
  Server('127.0.0.1', 5000).start()