Skip to content

Instantly share code, notes, and snippets.

@gdamjan
Created September 8, 2011 00:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save gdamjan/1202289 to your computer and use it in GitHub Desktop.
Save gdamjan/1202289 to your computer and use it in GitHub Desktop.
twitter stream : ibrowse + json_stream_parse + mochiweb
-module(twitter_stream).
-author("gdamjan@gmail.com").
%% Depends on:
%% ibrowse for http
%% couchbeam for couchbeam_json_stream (a fork of damienkatz json_stream_parse)
%% mochiweb for mochiweb_util:urlencode
-export([start/0]).
% replace this with whatever you need
print_message(Json) ->
% io:format("~p~n--------~n", [Json]),
Text = proplists:get_value(<<"text">>, Json),
TextStriped = re:replace(Text, "\\s+", " ", [global, {return, binary}]),
Id = proplists:get_value(<<"id_str">>, Json),
{User} = proplists:get_value(<<"user">>, Json),
UserName = proplists:get_value(<<"screen_name">>, User),
io:format("~ts~n", [<<"https://twitter.com/", UserName/binary, "/status/",
Id/binary, " | ", TextStriped/binary>>]).
event_fun(Data, UserFun) ->
UserFun(Data),
fun({Data1}) -> event_fun(Data1, UserFun) end.
start() ->
ibrowse:start(),
Url = "http://stream.twitter.com/1/statuses/filter.json",
Query = mochiweb_util:urlencode([
{"track", "erlang,python,ruby,c++"},
{"follow", "15804774,50393960,187371887"} % some random twitter users
]),
Headers = [{"content-type","application/x-www-form-urlencoded"},
{"accept", "*/*"}, {"user-agent", "ibrowse"} ],
Options = [{basic_auth, {"MY_USER_NAME", "AND_PASSWORD"}},
{response_format, binary},
{stream_to, {self(), once}},
{inactivity_timeout, 60000}
],
{ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, post, Query, Options, infinity),
stream_loop(
fun ()-> data_fun(ReqId) end,
fun({Data}) -> event_fun(Data, fun print_message/1) end
).
stream_loop(DataFun, UserFun) ->
{DataFun2, _, Rest} = couchbeam_json_stream:events(
DataFun,
fun(Ev) -> collect_object(Ev, UserFun) end),
stream_loop(fun() -> {Rest, DataFun2} end, UserFun).
collect_object(object_start, UserFun) ->
fun(Ev) ->
couchbeam_json_stream:collect_object(Ev,
fun(Obj) -> UserFun(Obj) end)
end.
data_fun(ReqId) ->
receive
{ibrowse_async_headers, ReqId, "200", Headers} ->
io:format("~p ~n", [Headers]),
data_fun(ReqId);
{ibrowse_async_headers, ReqId, [$4|_], Headers} ->
io:format("~p ~n", [Headers]),
done;
{ibrowse_async_headers, ReqId, [$5|_], Headers} ->
io:format("~p ~n", [Headers]),
done;
{ibrowse_async_response, ReqId, {error, Reason}} ->
io:format("~p ~n", [{error, Reason}]),
done;
{ibrowse_async_response, ReqId, Chunk} ->
ibrowse:stream_next(ReqId),
{Chunk, fun () -> data_fun(ReqId) end};
{ibrowse_async_response_end, ReqId} ->
done
end.
@gdamjan
Copy link
Author

gdamjan commented Sep 8, 2011

TODO:
from http://dev.twitter.com/pages/streaming_api_concepts#connecting :

  • handle timeouts
  • handle errors
  • backoff (exponential, start 10 secs, up to 240 secs)

@gdamjan
Copy link
Author

gdamjan commented Sep 8, 2011

And I should really use oauth

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment