Skip to content

Instantly share code, notes, and snippets.

@takkkun
Created January 13, 2010 18:41
Show Gist options
  • Save takkkun/276446 to your computer and use it in GitHub Desktop.
Save takkkun/276446 to your computer and use it in GitHub Desktop.
-module(twitter.streaming).
-author("KONDO Takahiro <heartery@gmail.com>").
-export([behaviour_info/1]).
-export([start_link/4, start_link/5, start/4, start/5]).
-export([call/2, call/3, cast/2, reply/2]).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("object.hrl").
-record(args, {user, password, method, params = [], format = record}).
-record(state, {module, id, format, env}).
behaviour_info(callbacks) ->
[{init, 1},
{handle_start, 2},
{handle_info, 2},
{terminate, 2}].
start_link(Module, InitArgs, StreamingArgs, Options) ->
boot(fun(Args) ->
.gen_server:start_link(?MODULE, Args, Options)
end, Module, InitArgs, StreamingArgs).
start_link(ServerName, Module, InitArgs, StreamingArgs, Options) ->
boot(fun(Args) ->
.gen_server:start_link(ServerName, ?MODULE, Args, Options)
end, Module, InitArgs, StreamingArgs).
start(Module, InitArgs, StreamingArgs, Options) ->
boot(fun(Args) ->
.gen_server:start(?MODULE, Args, Options)
end, Module, InitArgs, StreamingArgs).
start(ServerName, Module, InitArgs, StreamingArgs, Options) ->
boot(fun(Args) ->
.gen_server:start(ServerName, ?MODULE, Args, Options)
end, Module, InitArgs, StreamingArgs).
call(ServerRef, Request) ->
call(ServerRef, Request, infinity).
call(ServerRef, Request, Timeout) ->
try .gen_server:call(ServerRef, Request, Timeout) of
Reply -> Reply
catch
exit:{noproc, _} -> {error, closed};
exit:{timeout, _} -> {error, timeout}
end.
cast(ServerRef, Request) ->
.gen_server:cast(ServerRef, Request).
reply(Client, Reply) ->
.gen_server:reply(Client, Reply).
%%% callback functions
init([Module, InitArgs, #args{user = User,
password = Password,
method = Method,
params = Params,
format = Format}]) ->
RequestMethod = if Method == filter -> post; true -> get end,
F = if Format == record -> json; true -> Format end,
Url = api:stream_url("/statuses/" ++ atom_to_list(Method), F),
case api:stream(RequestMethod, Url, Params, {basic, User, Password}) of
{ok, Id} ->
State = #state{module = Module, id = Id, format = Format},
try Module:init(InitArgs) of
{ok, Env} -> {ok, State#state{env = Env}};
{ok, Env, Timeout} -> {ok, State#state{env = Env}, Timeout};
Result -> Result
catch
_:Reason -> {stop, Reason}
end;
{error, Reason} ->
{stop, Reason}
end.
handle_call(Request, From, State) ->
sync_call(handle_call, [Request, From], State).
handle_cast(Request, State) ->
sync_call(handle_call, [Request], State).
handle_info({http, {Id, stream_start, Headers}}, #state{id = Id} = State) ->
async_call(handle_start, [Headers], State),
{noreply, State};
handle_info({http, {Id, stream_end, _}}, #state{id = Id} = State) ->
{stop, disconnect, State};
handle_info({http, {Id, stream, <<"\r\n">>}}, #state{id = Id} = State) ->
{noreply, State};
handle_info({http, {Id, stream, Part}}, #state{id = Id, format = record} = State) ->
parse_and_call(Part, State),
{noreply, State};
handle_info({http, {Id, stream, Part}}, #state{id = Id} = State) ->
async_call(handle_stream, [Part], State),
{noreply, State};
handle_info({http, {Id, {error, Reason}}}, #state{id = Id} = State) ->
{stop, {http_error, Reason}, State};
handle_info(Info, State) ->
async_call(handle_info, [Info], State),
{noreply, State}.
terminate(Reason, #state{module = Module, id = Id, env = Env}) ->
Module:terminate(Reason, Env),
.http:cancel_request(Id).
code_change(_, State, _) ->
{ok, State}.
%%% private functions
boot(Fun, Module, InitArgs, StreamingArgs) ->
Args = case .lists:foldl(fun args/2, #args{}, StreamingArgs) of
#args{method = undefined, params = Params} = As ->
L = length(.lists:filter(fun({Key, _}) ->
Key == follow orelse
Key == track
end, Params)),
As#args{method = if L > 0 -> filter; true -> sample end};
As ->
As
end,
case exported(Module, Args#args.format) of
true -> Fun([Module, InitArgs, Args]);
false -> {error, bad_callback}
end.
exported(Module, record) ->
.erlang:function_exported(Module, handle_status, 2) andalso
.erlang:function_exported(Module, handle_delete, 2) andalso
.erlang:function_exported(Module, handle_limit, 2);
exported(Module, _) ->
.erlang:function_exported(Module, handle_stream, 2).
args({user, User}, Args) -> Args#args{user = User};
args({password, Password}, Args) -> Args#args{password = Password};
args({method, Method}, Args) -> Args#args{method = Method};
args(filter, Args) -> Args#args{method = filter};
args(firehose, Args) -> Args#args{method = firehose};
args(sample, Args) -> Args#args{method = sample};
args({follow, UserIds}, #args{params = Params} = Args) ->
Value = .string:join(.lists:map(fun integer_to_list/1, UserIds), ","),
Args#args{params = [{follow, Value}|Params]};
args({track, Keywords}, #args{params = Params} = Args) ->
Args#args{params = [{track, .string:join(Keywords, ",")}|Params]};
args({count, Count}, #args{params = Params} = Args) ->
Args#args{params = [{count, Count}|Params]};
args({format, Format}, Args) -> Args#args{format = Format};
args(json, Args) -> Args#args{format = json};
args(xml, Args) -> Args#args{format = xml}.
sync_call(Callback, Args, #state{module = Module, env = Env} = State) ->
try apply(Module, Callback, Args ++ [Env]) of
{reply, Reply, NewEnv} ->
{reply, Reply, State#state{env = NewEnv}};
{reply, Reply, NewEnv, Timeout} ->
{reply, Reply, State#state{env = NewEnv}, Timeout};
{noreply, NewEnv} ->
{noreply, State#state{env = NewEnv}};
{noreply, NewEnv, Timeout} ->
{noreply, State#state{env = NewEnv}, Timeout};
{stop, Reason, Reply, NewEnv} ->
{stop, Reason, Reply, State#state{env = NewEnv}};
{stop, Reason, NewEnv} ->
{stop, Reason, State#state{env = NewEnv}};
Result ->
Result
catch
error:undef ->
{noreply, State}
end.
async_call(Callback, Args, #state{module = Module, env = Env}) ->
spawn(Module, Callback, Args ++ [Env]).
parse_and_call(Part, #state{module = Module, env = Env}) ->
spawn(fun() ->
case .rfc4627:decode(Part) of
{ok, Json, _} ->
{Callback, Record} = recognize(Json),
Module:Callback(Record, Env);
{error, _} ->
Module:handle_info({bad_part, Part}, Env)
end
end).
recognize({obj, [{"delete", {obj, [{"status", Delete}]}}]}) ->
{handle_delete, object:to_delete(Delete)};
recognize({obj, [{"limit", Limit}]}) ->
{handle_limit, object:to_limit(Limit)};
recognize(Status) ->
{handle_status, object:to_status(Status)}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment