Skip to content

Instantly share code, notes, and snippets.

@jebu
Created December 1, 2011 06:38
Show Gist options
  • Save jebu/1414361 to your computer and use it in GitHub Desktop.
Save jebu/1414361 to your computer and use it in GitHub Desktop.
ZMQ Streamer
-module(zmq_streamer).
-behaviour(gen_fsm).
-export([start_link/3, start_link/4]).
%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
terminate/3, code_change/4]).
-export([get_count/1, stop/1]).
-record(state, {zmq_context=undefined, pull_socket=undefined, xrep_socket=undefined, next_cons_identity, count=0}).
-define(SERVER, ?MODULE).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% @spec start_link() -> {ok,Pid} | ignore | {error,Error}
%% @doc Starts the server
%% @end
%%--------------------------------------------------------------------
start_link(Name, PullEndpoint, XrepEndpoint) ->
start_link(Name, PullEndpoint, XrepEndpoint, bind).
start_link(Name, PullEndpoint, XrepEndpoint, Mode) ->
gen_fsm:start_link({local, Name}, ?MODULE, [PullEndpoint, XrepEndpoint, Mode], []).
%
get_count(Server) ->
gen_fsm:sync_send_all_state_event(Server, get_count).
%
stop(Server) ->
gen_fsm:sync_send_all_state_event(Server, stop).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @doc Initiates the server
%% @end
%%--------------------------------------------------------------------
init([PullEndpoint, XrepEndpoint, Mode]) ->
{ok, C} = erlzmq:context(),
{ok, S1} = erlzmq:socket(C, [pull, {active, false}]),
{ok, S2} = erlzmq:socket(C, [xrep, {active, true}]),
ok = erlzmq:Mode(S1, PullEndpoint),
ok = erlzmq:Mode(S2, XrepEndpoint),
{ok, initial, #state{zmq_context=C, pull_socket=S1, xrep_socket=S2, count=0, next_cons_identity=undefined}}.
%%--------------------------------------------------------------------
handle_sync_event(get_count, _From, StateName, State) ->
{reply, {ok, State#state.count}, StateName, State};
handle_sync_event(stop, _From, _StateName, State) ->
{stop, "Shutting Down", ok, State};
handle_sync_event(_Event, _From, StateName, State) ->
{reply, ok, StateName, State}.
%%--------------------------------------------------------------------
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
%%--------------------------------------------------------------------
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
handle_info({zmq, S, Id, [rcvmore]}, initial, State=#state{xrep_socket=S}) ->
{next_state, awaiting_null, State#state{next_cons_identity=Id}};
handle_info({zmq, S, <<>>, [rcvmore]}, awaiting_null, State=#state{xrep_socket=S}) ->
{next_state, awaiting_body, State};
handle_info({zmq, S, _, []}, awaiting_body,
State=#state{xrep_socket=S, pull_socket=P, count=C, next_cons_identity=Id}) ->
{ok, Data} = erlzmq:recv(P),
erlzmq:send(S, Id, [sndmore, noblock]),
erlzmq:send(S, <<>>, [sndmore, noblock]),
erlzmq:send(S, Data, [noblock]),
{next_state, initial, State#state{next_cons_identity=undefined, count=C+1}};
handle_info(_Info, StateName, State) ->
{next_state, StateName, State}.
%%--------------------------------------------------------------------
%% @spec terminate(Reason, State) -> void()
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _StateName, #state{zmq_context=C, xrep_socket=R, pull_socket=P}) when C =/= undefined ->
erlzmq:close(R),
erlzmq:close(P),
erlzmq:term(C),
ok;
terminate(_Reason, _StateName, _State) ->
ok.
%%--------------------------------------------------------------------
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @doc Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment