kqueue のマルチソケット版

毎回 socket.fromfd やってるコストが気になることは気になる。

# vim: fileencoding=utf8

# Only Python 2.6.x

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
#from __future__ import unicode_literals

from future_builtins import *

import os
import sys
import socket
import logging

import multiprocessing
import Queue

# FreeBSD / NetBSD / OpenBSD / MacOSX only
from select import *

logging.basicConfig(
  level=logging.DEBUG,
  format='%(asctime)s %(pathname)s %(lineno)d %(levelname)-6s %(message)s',
)

class Server(object):
  def new_socket(self):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    return sock

  def __init__(self, address, port_first=5000, port_second=5001):
    self.address = address

    self.port_first = port_first
    self.port_second = port_second

    self.sock_first = self.new_socket()
    self.sock_second = self.new_socket()

    self.sock_first.bind((self.address, port_first))
    self.sock_second.bind((self.address, port_second))

    self.kq = kqueue()
    self.ke_first = kevent(ident=self.sock_first.fileno())
    self.ke_second = kevent(ident=self.sock_second.fileno())
    self.kq.control([self.ke_first], 0)
    self.kq.control([self.ke_second], 0)

    self.queue = multiprocessing.Queue()
    self.processes = []

  def run(self):
    while True:
      try:
        (ident, packet, address, port) = self.queue.get(block=True, timeout=60)
        sock = socket.fromfd(ident, socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(packet, (address, port))
      except Queue.Empty, e:
        logging.info('quque empty: %d' % os.getpid())
      except Exception, e:
        logging.error(e)
      except:
        pass

  def start(self):
    logging.info('start: %s:%d' % (self.address, self.port_first,))
    logging.info('start: %s:%d' % (self.address, self.port_second,))
    for i in range(multiprocessing.cpu_count()):
      p = multiprocessing.Process(target=self.run)
      self.processes.append(p)
      p.start()
    try:
      while True:
        for event in self.kq.control([], 1):
          if event.filter & KQ_FILTER_READ:
            sock = socket.fromfd(event.ident, socket.AF_INET, socket.SOCK_DGRAM)
            packet, (address, port) = sock.recvfrom(8192)
            self.queue.put((event.ident, packet, address, port))
    except Exception, e:
      logging.error(e)
    except KeyboardInterrupt, e:
      logging.warning('KeyboardInterrupt')
    finally:
      self.stop()

  def stop(self):
    logging.info('stop')
    self.kq.close()
    logging.info('kqueue close')
    self.sock_first.close()
    self.sock_second.close()
    logging.info('socket close')

if __name__ == '__main__':
  Server('127.0.0.1', 5000, 5001).start()