読者です 読者をやめる 読者になる 読者になる

Erlang gen_server(SUB) に対して Python (PUB) からメッセージを送ってみる

zmq erlang python

Erlang 側で gen_server を複数立てておき、全てを Subscriber として待ち受け状態にして、Python 側からメッセージを送ってみます。メッセージはルートは使わないでファンアウトです。

メッセージは JSON でシリアライズしています。

ソース

$ git clone git://github.com/voluntas/snowflake.git
$ git checkout 239bdca5fbad92c2b7be8408a3c01e7a18ef5082

Python

Python 側は JSON データを投げるだけで、簡単なモノです。

import time

import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)

sock.bind('tcp://127.0.0.1:5555')

time.sleep(1.0)

sock.send_json({'foo': ['bing', 2.3, True]})

Erlang

supervisor に gen_server な subscriber ワーカーを 10 個ぶら下げてます。
簡単に simple_one_for_one で supervisor:start_child/2 してるだけです。

socket 作る際 {active, true} にして handle_info にメッセージが上がってくるようにしています。
また、ファンアウトなので setsockopt の subscribe 値は <<>> に指定。

送られてくるメッセージが JSON 前提なので jiffy で decode して、画面に表示しています。

-module(zmq_subscriber).

-behaviour(gen_server).

-include_lib("eunit/include/eunit.hrl").

-define(SERVER, ?MODULE).

-define(DEFAULT_PUB, "tcp://127.0.0.1:5555").

-record(state, {zmq_context :: any(),
                zmq_socket :: any()}).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([start_link/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

start_link() ->
  gen_server:start_link(?MODULE, [], []).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

init(_Args) ->
  {ok, Context} = erlzmq:context(),
  {ok, Socket} = erlzmq:socket(Context, [sub, {active, true}]),
  %% ok = erlzmq:setsockopt(Socket, identity,  pid_to_list(self())),
  ok = erlzmq:setsockopt(Socket, subscribe, <<>>),
  ok = erlzmq:connect(Socket, ?DEFAULT_PUB),
  {ok, #state{zmq_context = Context, zmq_socket = Socket}}.

handle_call(_Request, _From, State) ->
  {reply, ok, State}.

handle_cast(_Msg, State) ->
  {noreply, State}.

handle_info({zmq, _S, Msg, []}, State) ->
  Data = jiffy:decode(Msg),
  ?debugVal(Data),
  {noreply, State};
handle_info(_Info, State) ->
  {noreply, State}.

terminate(_Reason, State) ->
  ok = erlzmq:close(State#state.zmq_socket),
  ok = erlzmq:term(State#state.zmq_context),
  ok.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

動作確認

Erlang 側を起動します

% erl -pa ebin deps/*/ebin
Eshell V5.9 (abort with ^G)
1> application:start(snowflake).
ok

あとは Python 側を実行して Erlang 側にメッセージが出ることを確認します

$ python pub.py

Erlang 側に出力されるメッセージ

src/zmq_subscriber.erl:54:<0.50.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.43.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.48.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.44.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.49.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.46.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.45.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.42.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.47.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}
src/zmq_subscriber.erl:54:<0.40.0>: Data = {[{<<"foo">>,[<<"bing">>,2.3,true]}]}

全てのErlang 側のワーカーに配信されていることが確認出来ました。

ルーティングあたりでしょうか