Comet on MochiWeb (and MochiKit)

2008/11/20 に行われる 第2回 Erlang 分散システム勉強会で Comet on MochiWeb について話します。

Comet 実装は基本的なもので XHR を使ってイベントが起きるまでクライアントを待機させ、
イベントが起きたら再度 XHR を送ってきてクライアントを待機状態にするというものです。

プレゼンの時はソースコードを見せる事も無いと思いますので、事前に公開しておきます。

MochiKit(JS) 部分は id:jbking に依頼。
デザイン(HTML/CSS/HTML) は id:nullpobug に依頼。

コードは両方とも修正BSDライセンスです。

JS 部分はわがまま言ってタイムアウト機能まで付けて貰いました。
その割に Erlang 側は特に何もしてないというヒドイ状況。

スクリーンキャスト取ってみました。貼り付けたかったのですがとりあえず URL だけ。
ScreeToaster 便利です:-)

細かいコードが見たい人は bitbucket で公開していますのでどぞ。

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
    "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html>
<head>
<title>It Worked</title>
<script type="text/javascript" src="MochiKit/MochiKit.js"></script>
<script type="text/javascript">
AjaxForm = function(el){
    log("form creating...");

    // for all pythonista
    bindMethods(this);

    // constructor (create form and set handler)
    var input = INPUT(null);
    var form = this.form = FORM(null, [input]);
    appendChildNodes(el, form);
// This dom is looks bad.
//    var reconnect = BUTTON(null, "Reconnect Now");
//    appendChildNodes(el, form, reconnect);

    // for GET, means this.connect.apply(this, ...)
    connect(this, 'connect', this.connect);
    // for POST, means this.submit.apply(this, ...)
    connect(form, 'onsubmit', this.submit);
    // for recconect by user action
//    connect(reconnect, 'onclick', this.reconnectNow);
    // for display
    connect(this, 'log', this.log);
};

AjaxForm.prototype = merge(Object, {
    // Deferred entry point for GET
    connect: function() {
        log("connecting...");

        this.timelineChain = loadJSONDoc("/timeline")
            .addCallback(this.validate)
            .addCallbacks(this.receive, this.wait)
            .addBoth(this.reconnect);
       return this.timelineChain;
    },

    validate: function(data) {
        log("validate...");

        return data.ok ? succeed(data) : fail(data);
    },

    receive: function(data) {
        // reset
        this.waitingTime = 0;

        log("receive... ", data.ok);

        // procedures for a display be added.
        signal(this, 'log', data.ok);
    },

    wait: function(e) {
        if(e instanceof CancelledError) {
            throw e;
        }

        // calculate waiting time (5seconds <= this.waitingTime <= 1minute)
        this.waitingTime = this.waitingTime * 2 || 5;
        if(this.waitingTime > 60)
            this.waitingTime = 60;

        log("waiting... ", this.waitingTime + "seconds");

        return wait(this.waitingTime);
    },

    reconnect: function(e) {
        log("reconnect...");

        if(!(e instanceof CancelledError)) {
            return this.connect();
        } else {
            log("finish.");
        }
    },

    reconnectNow: function(e) {
        log("reconnect now...");

        e.stop();
        this.timelineChain.cancel();
        signal(this, 'connect');
    },

    // Deferred entry point for POST
    submit: function(e) {
        log("submit...");

        e.stop();
        var input = this.form.getElementsByTagName("input")[0];
        input.disabled = true;
        this.updateChain = doXHR("/update", {
            method: 'POST',
            headers: {"Content-Type": "application/x-www-form-urlencoded"},
            sendContent: queryString({status: input.value})})
            .addCallback(this.submitComplete);
        return this.updateChain;
    },

    submitComplete: function() {
        var input = this.form.getElementsByTagName("input")[0];

        log("post submit...");

        input.value = "";
        input.disabled = false;
        input.focus();
    },

    log: function(msg) {
        //appendChildNodes(this.form, P(null, msg));
        render_message(msg);
    },
});

// create input form
connect(window, 'onload', function(){ signal(new AjaxForm($("input")), 'connect') });

// Theme loader
Script = {};
Theme = function(name){
    this.name = name || 'default';
    this.css = 'theme/' + this.name + '/theme.css';
    this.js = 'theme/' + this.name + '/theme.js';
    this.style_id = 'theme-style';
    this.script_id = 'theme-script';
};
Theme.prototype = merge(Object, {
    load: function() {
        log('loading theme...');

        var el_style = createDOM('link', {
            rel: 'stylesheet',
            href: this.css,
            type: 'text/css',
            media: 'screen',
            id: this.style_id
        });

        var el_script = createDOM('script', {
            src: this.js,
            type: 'text/javascript',
            id: this.script_id
        });

        this.unload();
        var el_head = getFirstElementByTagAndClassName('head');
        appendChildNodes(el_head, [el_style, el_script]);
    },

    unload: function() {
        if (Script[this.name]) {
            Script[this.name].tearDown();
        }
        var el_style = $(this.style_id);
        if (el_style) removeElement(el_style);
        var el_script = $(this.script_id);
        if (el_script) removeElement(el_script);
    }
});

connect(window, 'onload', function(){ (new Theme()).load(); });
</script>
</head>
<body>
<div id="options"></div>
<div id="wrap">
    <div id="messages"></div>
</div>
<div id="input"></div>
</body>
</html>

mochiweb 側の実装。

router(Users) ->
  receive
    {From, connect} ->
      From ! connected,
      router([From|Users]);
    {From, disconnect} ->
      From ! disconnected,
      router(lists:delete(From, Users));
    {From, update, Status} ->
      From ! updated,
      lists:foreach(fun(User) -> User ! Status end, Users),
      router([]);
    _ ->
      router(Users)
  end.

get_public_timeline() ->
  Pid = whereis(public_timeline),
  if
    is_pid(Pid) ->
      Pid;
    true ->
      NewPid = spawn(fun() -> router([]) end),
      register(public_timeline, NewPid),
      NewPid
  end.

loop(Req, DocRoot) ->
  "/" ++ Path = Req:get(path),
  case Req:get(method) of
    Method when Method =:= 'GET'; Method =:= 'HEAD' ->
      case Path of
        "timeline" ->
            PublicTimeline = get_public_timeline(),
            PublicTimeline ! {self(), connect},
            receive
              connected ->
                receive
                  Status ->
                    {Type, Status} = {ok, Status}
                after 300000 ->
                  {Type, Status} = {error, <<"timeout">>}
                end
            after 1000 ->
              {Type, Status} = {error, <<"timeout">>}
            end,
            case Type of
              error ->
                PublicTimeline ! {self(), disconnect},
                receive
                  disconnected ->
                    ok
                after 1000 ->
                  ok
                end;
              ok ->
                ok
            end,
            H = mochiweb_headers:make([
                {"Cache-Control", "private"},
                {"Server", "Comet on MochiWeb"}
            ]),
            Req:ok({"application/json", mochiweb_headers:to_list(H),
                mochijson:encode({struct, [{Type, Status}]})
            });
        _ ->
          Req:serve_file(Path, DocRoot)
      end;
    'POST' ->
      case Path of
        "update" ->
          Data = Req:parse_post(),
          PublicTimeline = get_public_timeline(),
          PublicTimeline ! {self(), update, list_to_binary(proplists:get_value("status", Data))},
          receive
            updated ->
              Body = {ok, <<"posted">>}
          after 1000 ->
            Body = {failure, <<"timeout">>}
          end,
          H = mochiweb_headers:make([
              {"Server", "Comet on MochiWeb"}
          ]),
          Req:ok({"application/json", mochiweb_headers:to_list(H),
              mochijson:encode({struct, [Body]})});
        _ ->
          Req:not_found()
      end;
    _ ->
      Req:respond({501, [], []})
  end.