Created
November 7, 2010 20:41
-
-
Save gdamjan/666499 to your computer and use it in GitHub Desktop.
connect in to the realtime twitter stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(twit2). | |
-author('gdamjan@gmail.com'). | |
-export([start/0]). | |
%%% http://dev.twitter.com/pages/streaming_api_concepts#connecting | |
%%% TODO: | |
%%% - handle timeouts | |
%%% - handle errors | |
%%% - backoff (exponential, start 10 secs, up to 240 secs) | |
start() -> | |
application:start(sasl), | |
application:start(ibrowse), | |
Url = "http://stream.twitter.com/1/statuses/filter.json", | |
Query = ircbot_lib:url_encode([ | |
{"track", "erlang"}, | |
{"follow", "783214"} % id of @twitter | |
]), | |
Headers = [{"content-type","application/x-www-form-urlencoded"}], | |
Options = [{basic_auth, {"USERNAME", "PASSWORD"}}], | |
Callback = crlf_buffering(fun print_message/1), | |
request_stream(Url, Headers, post, Query, Options, Callback). | |
print_message(Json) -> | |
io:format("~ts~n", [prepare_message(Json)]). | |
prepare_message(Json) -> | |
try | |
{struct, Twit} = mochijson2:decode(Json), | |
Msg = proplists:get_value(<<"text">>, Twit), | |
Msg1 = re:replace(Msg, "\\s+", " ", [global, {return, binary}]), | |
Id = proplists:get_value(<<"id_str">>, Twit), % twitter is confused now; maybe use <<"new_id_str">> here | |
{struct, User} = proplists:get_value(<<"user">>, Twit), | |
Name = proplists:get_value(<<"screen_name">>, User), | |
<<"http://twitter.com/",Name/binary,"/status/",Id/binary," - ",Msg1/binary>> | |
catch | |
Err -> io:format("Err: ~p~n", [Err]), [] | |
end. | |
crlf_buffering(Callback) -> | |
fun(Chunk) -> crlf_buffering(<<>>, Chunk, Callback) end. | |
crlf_buffering(PrevData, NewData, Callback) -> | |
L = re:split(<<PrevData/binary,NewData/binary>>, "\r\n"), | |
[Rest|Items] = lists:reverse(L), | |
[ Callback(Item) || Item <- Items, Item /= <<>>], | |
fun(Chunk) -> crlf_buffering(Rest, Chunk, Callback) end. | |
request_stream(Url, Headers, Method, Body, Options, Callback) -> | |
Options1 = [ | |
{response_format, binary}, | |
{stream_to, {self(), once}} | |
| Options | |
], | |
{ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, Method, Body, Options1, infinity), | |
loop(ReqId, Callback). | |
loop(ReqId, Callback) -> | |
receive | |
{ibrowse_async_headers, ReqId, Status, Headers} -> | |
io:format("~p ~p~n", [Status, Headers]), | |
loop(ReqId, Callback); | |
{ibrowse_async_response, ReqId, {error, Reason}} -> | |
{error, Reason}; | |
{ibrowse_async_response, ReqId, Chunk} -> | |
NewCallback = Callback(Chunk), | |
ibrowse:stream_next(ReqId), | |
loop(ReqId, NewCallback); | |
{ibrowse_async_response_end, ReqId} -> | |
ok; | |
X -> % catch all | |
{x, X} | |
end. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
see also https://gist.github.com/1202289 which uses the json_stream_parse module instead of waiting for a CRLF separator (which turned out to be unreliable).