Skip to content

Instantly share code, notes, and snippets.

@gdamjan
Created November 7, 2010 20:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gdamjan/666499 to your computer and use it in GitHub Desktop.
Save gdamjan/666499 to your computer and use it in GitHub Desktop.
connect in to the realtime twitter stream
-module(twit2).
-author('gdamjan@gmail.com').
-export([start/0]).
%%% http://dev.twitter.com/pages/streaming_api_concepts#connecting
%%% TODO:
%%% - handle timeouts
%%% - handle errors
%%% - backoff (exponential, start 10 secs, up to 240 secs)
start() ->
application:start(sasl),
application:start(ibrowse),
Url = "http://stream.twitter.com/1/statuses/filter.json",
Query = ircbot_lib:url_encode([
{"track", "erlang"},
{"follow", "783214"} % id of @twitter
]),
Headers = [{"content-type","application/x-www-form-urlencoded"}],
Options = [{basic_auth, {"USERNAME", "PASSWORD"}}],
Callback = crlf_buffering(fun print_message/1),
request_stream(Url, Headers, post, Query, Options, Callback).
print_message(Json) ->
io:format("~ts~n", [prepare_message(Json)]).
prepare_message(Json) ->
try
{struct, Twit} = mochijson2:decode(Json),
Msg = proplists:get_value(<<"text">>, Twit),
Msg1 = re:replace(Msg, "\\s+", " ", [global, {return, binary}]),
Id = proplists:get_value(<<"id_str">>, Twit), % twitter is confused now; maybe use <<"new_id_str">> here
{struct, User} = proplists:get_value(<<"user">>, Twit),
Name = proplists:get_value(<<"screen_name">>, User),
<<"http://twitter.com/",Name/binary,"/status/",Id/binary," - ",Msg1/binary>>
catch
Err -> io:format("Err: ~p~n", [Err]), []
end.
crlf_buffering(Callback) ->
fun(Chunk) -> crlf_buffering(<<>>, Chunk, Callback) end.
crlf_buffering(PrevData, NewData, Callback) ->
L = re:split(<<PrevData/binary,NewData/binary>>, "\r\n"),
[Rest|Items] = lists:reverse(L),
[ Callback(Item) || Item <- Items, Item /= <<>>],
fun(Chunk) -> crlf_buffering(Rest, Chunk, Callback) end.
request_stream(Url, Headers, Method, Body, Options, Callback) ->
Options1 = [
{response_format, binary},
{stream_to, {self(), once}}
| Options
],
{ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, Method, Body, Options1, infinity),
loop(ReqId, Callback).
loop(ReqId, Callback) ->
receive
{ibrowse_async_headers, ReqId, Status, Headers} ->
io:format("~p ~p~n", [Status, Headers]),
loop(ReqId, Callback);
{ibrowse_async_response, ReqId, {error, Reason}} ->
{error, Reason};
{ibrowse_async_response, ReqId, Chunk} ->
NewCallback = Callback(Chunk),
ibrowse:stream_next(ReqId),
loop(ReqId, NewCallback);
{ibrowse_async_response_end, ReqId} ->
ok;
X -> % catch all
{x, X}
end.
@gdamjan
Copy link
Author

gdamjan commented Sep 8, 2011

see also https://gist.github.com/1202289 which uses the json_stream_parse module instead of waiting for a CRLF separator (which turned out to be unreliable).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment