Skip to content

Instantly share code, notes, and snippets.

@aurora
Forked from adamhunter/pbstream.erl
Created October 26, 2012 14:41
Show Gist options
  • Save aurora/3959204 to your computer and use it in GitHub Desktop.
Save aurora/3959204 to your computer and use it in GitHub Desktop.
riak-erlang-client streaming mapreduce examples
-module(pbstream).
-export([load/1, keys/0, bucket/0, map/3, pb_link/0, pb_link/2]).
-define(QUERY, [{map, {modfun, riak_kv_mapreduce, map_object_value}, <<"filter_notfound">>, true}]).
load(HowMany) ->
lists:foreach(fun(Index) ->
BIndex = list_to_binary(integer_to_list(Index)),
RObj = riakc_obj:new(<<"examples">>, <<"key",BIndex/binary>>, <<"Value ",BIndex/binary>>),
riakc_pb_socket:put(pb_link(), RObj)
end, lists:seq(1, HowMany)).
keys() ->
Inputs = [{<<"examples">>, <<"key1">>},
{<<"examples">>, <<"key2">>},
{<<"examples">>, <<"key3">>}],
riakc_pb_socket:mapred_stream(pb_link(), Inputs, ?QUERY, self()),
loop().
bucket() ->
riakc_pb_socket:mapred_bucket_stream(pb_link(), <<"examples">>, ?QUERY, self(), 60000),
loop().
pb_link() ->
pb_link("127.0.0.1", 8087).
pb_link(Host, Port) ->
case get(pb_link) of
undefined ->
put(pb_link, riakc_pb_socket:start_link(Host, Port)),
pb_link(Host, Port);
{ok, Client} ->
Client;
Error ->
io:format("Error linking to pb socket ~p~n", [Error]),
{pb_link_error, Error}
end.
%% @private
loop() ->
receive
{_ReqId, done} ->
ok;
{_ReqId, {mapred,_Phase,Results}} ->
io:format("Streaming Results: ~p~n", [Results]),
loop();
{_ReqId, {error, Reason}} ->
io:format("Something bad happened! ~p~n", [Reason])
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment