Created
January 13, 2010 18:41
-
-
Save takkkun/276446 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(twitter.streaming). | |
-author("KONDO Takahiro <heartery@gmail.com>"). | |
-export([behaviour_info/1]). | |
-export([start_link/4, start_link/5, start/4, start/5]). | |
-export([call/2, call/3, cast/2, reply/2]). | |
-behaviour(gen_server). | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, | |
terminate/2, code_change/3]). | |
-include("object.hrl"). | |
-record(args, {user, password, method, params = [], format = record}). | |
-record(state, {module, id, format, env}). | |
behaviour_info(callbacks) -> | |
[{init, 1}, | |
{handle_start, 2}, | |
{handle_info, 2}, | |
{terminate, 2}]. | |
start_link(Module, InitArgs, StreamingArgs, Options) -> | |
boot(fun(Args) -> | |
.gen_server:start_link(?MODULE, Args, Options) | |
end, Module, InitArgs, StreamingArgs). | |
start_link(ServerName, Module, InitArgs, StreamingArgs, Options) -> | |
boot(fun(Args) -> | |
.gen_server:start_link(ServerName, ?MODULE, Args, Options) | |
end, Module, InitArgs, StreamingArgs). | |
start(Module, InitArgs, StreamingArgs, Options) -> | |
boot(fun(Args) -> | |
.gen_server:start(?MODULE, Args, Options) | |
end, Module, InitArgs, StreamingArgs). | |
start(ServerName, Module, InitArgs, StreamingArgs, Options) -> | |
boot(fun(Args) -> | |
.gen_server:start(ServerName, ?MODULE, Args, Options) | |
end, Module, InitArgs, StreamingArgs). | |
call(ServerRef, Request) -> | |
call(ServerRef, Request, infinity). | |
call(ServerRef, Request, Timeout) -> | |
try .gen_server:call(ServerRef, Request, Timeout) of | |
Reply -> Reply | |
catch | |
exit:{noproc, _} -> {error, closed}; | |
exit:{timeout, _} -> {error, timeout} | |
end. | |
cast(ServerRef, Request) -> | |
.gen_server:cast(ServerRef, Request). | |
reply(Client, Reply) -> | |
.gen_server:reply(Client, Reply). | |
%%% callback functions | |
init([Module, InitArgs, #args{user = User, | |
password = Password, | |
method = Method, | |
params = Params, | |
format = Format}]) -> | |
RequestMethod = if Method == filter -> post; true -> get end, | |
F = if Format == record -> json; true -> Format end, | |
Url = api:stream_url("/statuses/" ++ atom_to_list(Method), F), | |
case api:stream(RequestMethod, Url, Params, {basic, User, Password}) of | |
{ok, Id} -> | |
State = #state{module = Module, id = Id, format = Format}, | |
try Module:init(InitArgs) of | |
{ok, Env} -> {ok, State#state{env = Env}}; | |
{ok, Env, Timeout} -> {ok, State#state{env = Env}, Timeout}; | |
Result -> Result | |
catch | |
_:Reason -> {stop, Reason} | |
end; | |
{error, Reason} -> | |
{stop, Reason} | |
end. | |
handle_call(Request, From, State) -> | |
sync_call(handle_call, [Request, From], State). | |
handle_cast(Request, State) -> | |
sync_call(handle_call, [Request], State). | |
handle_info({http, {Id, stream_start, Headers}}, #state{id = Id} = State) -> | |
async_call(handle_start, [Headers], State), | |
{noreply, State}; | |
handle_info({http, {Id, stream_end, _}}, #state{id = Id} = State) -> | |
{stop, disconnect, State}; | |
handle_info({http, {Id, stream, <<"\r\n">>}}, #state{id = Id} = State) -> | |
{noreply, State}; | |
handle_info({http, {Id, stream, Part}}, #state{id = Id, format = record} = State) -> | |
parse_and_call(Part, State), | |
{noreply, State}; | |
handle_info({http, {Id, stream, Part}}, #state{id = Id} = State) -> | |
async_call(handle_stream, [Part], State), | |
{noreply, State}; | |
handle_info({http, {Id, {error, Reason}}}, #state{id = Id} = State) -> | |
{stop, {http_error, Reason}, State}; | |
handle_info(Info, State) -> | |
async_call(handle_info, [Info], State), | |
{noreply, State}. | |
terminate(Reason, #state{module = Module, id = Id, env = Env}) -> | |
Module:terminate(Reason, Env), | |
.http:cancel_request(Id). | |
code_change(_, State, _) -> | |
{ok, State}. | |
%%% private functions | |
boot(Fun, Module, InitArgs, StreamingArgs) -> | |
Args = case .lists:foldl(fun args/2, #args{}, StreamingArgs) of | |
#args{method = undefined, params = Params} = As -> | |
L = length(.lists:filter(fun({Key, _}) -> | |
Key == follow orelse | |
Key == track | |
end, Params)), | |
As#args{method = if L > 0 -> filter; true -> sample end}; | |
As -> | |
As | |
end, | |
case exported(Module, Args#args.format) of | |
true -> Fun([Module, InitArgs, Args]); | |
false -> {error, bad_callback} | |
end. | |
exported(Module, record) -> | |
.erlang:function_exported(Module, handle_status, 2) andalso | |
.erlang:function_exported(Module, handle_delete, 2) andalso | |
.erlang:function_exported(Module, handle_limit, 2); | |
exported(Module, _) -> | |
.erlang:function_exported(Module, handle_stream, 2). | |
args({user, User}, Args) -> Args#args{user = User}; | |
args({password, Password}, Args) -> Args#args{password = Password}; | |
args({method, Method}, Args) -> Args#args{method = Method}; | |
args(filter, Args) -> Args#args{method = filter}; | |
args(firehose, Args) -> Args#args{method = firehose}; | |
args(sample, Args) -> Args#args{method = sample}; | |
args({follow, UserIds}, #args{params = Params} = Args) -> | |
Value = .string:join(.lists:map(fun integer_to_list/1, UserIds), ","), | |
Args#args{params = [{follow, Value}|Params]}; | |
args({track, Keywords}, #args{params = Params} = Args) -> | |
Args#args{params = [{track, .string:join(Keywords, ",")}|Params]}; | |
args({count, Count}, #args{params = Params} = Args) -> | |
Args#args{params = [{count, Count}|Params]}; | |
args({format, Format}, Args) -> Args#args{format = Format}; | |
args(json, Args) -> Args#args{format = json}; | |
args(xml, Args) -> Args#args{format = xml}. | |
sync_call(Callback, Args, #state{module = Module, env = Env} = State) -> | |
try apply(Module, Callback, Args ++ [Env]) of | |
{reply, Reply, NewEnv} -> | |
{reply, Reply, State#state{env = NewEnv}}; | |
{reply, Reply, NewEnv, Timeout} -> | |
{reply, Reply, State#state{env = NewEnv}, Timeout}; | |
{noreply, NewEnv} -> | |
{noreply, State#state{env = NewEnv}}; | |
{noreply, NewEnv, Timeout} -> | |
{noreply, State#state{env = NewEnv}, Timeout}; | |
{stop, Reason, Reply, NewEnv} -> | |
{stop, Reason, Reply, State#state{env = NewEnv}}; | |
{stop, Reason, NewEnv} -> | |
{stop, Reason, State#state{env = NewEnv}}; | |
Result -> | |
Result | |
catch | |
error:undef -> | |
{noreply, State} | |
end. | |
async_call(Callback, Args, #state{module = Module, env = Env}) -> | |
spawn(Module, Callback, Args ++ [Env]). | |
parse_and_call(Part, #state{module = Module, env = Env}) -> | |
spawn(fun() -> | |
case .rfc4627:decode(Part) of | |
{ok, Json, _} -> | |
{Callback, Record} = recognize(Json), | |
Module:Callback(Record, Env); | |
{error, _} -> | |
Module:handle_info({bad_part, Part}, Env) | |
end | |
end). | |
recognize({obj, [{"delete", {obj, [{"status", Delete}]}}]}) -> | |
{handle_delete, object:to_delete(Delete)}; | |
recognize({obj, [{"limit", Limit}]}) -> | |
{handle_limit, object:to_limit(Limit)}; | |
recognize(Status) -> | |
{handle_status, object:to_status(Status)}. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment