Skip to content

Instantly share code, notes, and snippets.

@krestenkrab
Created June 16, 2011 12:16
Show Gist options
  • Save krestenkrab/1029117 to your computer and use it in GitHub Desktop.
Save krestenkrab/1029117 to your computer and use it in GitHub Desktop.
Ramblings on a gen_pipe DSL for riak_pipe
%%
%% Generic riak_pipe_vnode_worker worker, so you can write multiple fittings in one file.
%%
%% Each fitting is a function which must accept
%%
%% ImplFun(init, InitArg) -> {ok, State}
%% ImplFun({process, Input}, State) -> {ok, State} | forward_preflist
%% ImplFun(done, State) -> ok
%%
%% gen_pipe also has some more friendly API
%%
%% gen_pipe:exec(Module, [PipeInitArgs]) -> #gen_pipe
%% gen_pipe:enqueue(#gen_pipe, Data) %% enqueue data
%% gen_pipe:eoi(#gen_pipe) %% mark end of input-data
%% gen_pipe:emit(Data) -> ok %% send Data to next fitting
%%
%% gen_pipe:exec/2 will invoke Module:pipeline(PipeInitArgs) to get the gen_pipe-style
%% list of fitting specs. That's a list of
%%
%% {Name, InitArg, Options}
%%
%% By default, ImplFun is assumed to be Module:Name, unless one of the
%% options is {function, ImplFun}.
%%
-module(gen_pipe).
-behavior(riak_pipe_vnode_worker).
-export([exec/0, exec/1, exec/2]).
-record(state,{ implfun :: fun(),
istate :: term(),
partition,
fitting_details }).
-record(gen_pipe, { head, sink }).
exec(Module) ->
exec(Module,[],[]).
exec(Module,InitArgs) ->
exec(Module,InitArgs,[]).
exec(Module,InitArgs,Options) ->
Fittings = erlang:apply(Module, pipeline, InitArgs),
Specs = [ make_fitting_spec(Name) || Name <- Fittings ],
{ok, Head, Sink} = riak_pipe:exec(Specs, Options),
{ok, #gen_pipe{ head=Head, sink=Sink }}.
%%
%% emit Output from inside a gen_pipe fitting
%%
emit(Output) ->
case erlang:get(gen_pipe_state) of
#state{ partition=Partition, fitting_details=FittingDetails } ->
riak_pipe_vnode_worker:send_output(Output, Partition, FittingDetails),
_ ->
exit({error, "emit can only be called from gen_pipe process callback"})
end.
enqueue(#gen_pipe{head=Head}, Value) ->
riak_pipe_vnode:queue_work(Head, Value).
eoi(#gen_pipe{head=Head}) ->
riak_pipe_fitting:eoi(Head).
sink_ref(#gen_pipe{sink=#fitting{ ref=Ref }}) ->
Ref.
last_in_preflist() ->
case erlang:get(gen_pipe_last_in_preflist) of
true ->
true;
false ->
false
end.
make_fitting_spec(Module,Name) when is_atom(Name) ->
make_fitting_spec(Module,{Name,[],[]});
make_fitting_spec(Module,{Name,InitArgs}) ->
make_fitting_spec(Module,{Name,InitArgs,[]});
make_fitting_spec(Module,{Name,InitArgs,Options}) ->
case proplists:lookup(follow,Options) of
{follow,true} ->
CHashFun = follow;
_ ->
case proplists:lookup(chash,Options) of
{chash, Fun} ->
CHashFun = Fun;
_ ->
CHashFun = fun chash:key_of/1
end
end,
case proplists:lookup(nval,Options) of
{nval, NVal} ->
ok;
_ ->
NVal = 1
end,
case proplists:lookup(function,Options) of
{function, ImplFun} when is_function(ImplFun,2) ->
ok;
{function, FunName} when is_atom(ImplFun) ->
ImplFun = Module:FunName/2
_ ->
ImplFun = Module:Name/2
end,
#fitting_spec{ name = Name,
module = gen_pipe_fitting,
arg = { ImplFun, InitArg },
chashfun = CHashFun
nval = NVal }.
%%
%% callbacks for riak_pipe_vnode_worker
%%
init(Partition, FittingDetails=#fitting_details{ arg={ImplFun,InitArg} }) ->
{ok, IState} = ImplFun(init,InitArg),
{ok, #state{ istate=IState,
implfun=ImplFun,
partition=Partition,
fitting_details=FittingDetails }}.
process(Input,LastInPreflist,State=#state{ implfun=ImplFun, istate=IState }) ->
erlang:put(gen_pipe_last_in_preflist, LastInPreflist),
erlang:put(gen_pipe_state, State),
begin
case ImplFun({process,Input}, IState) of
{ok, NewIState} ->
{ok, State#state{ istate=NewIState }};
forward_preflist ->
forward_preflist
end
after
erlang:put(gen_pipe_state, undefined)
end.
done(State#state{ implfun=ImplFun, istate=IState }) ->
erlang:put(gen_pipe_state, State),
begin
ImplFun(done, IState)
after
erlang:put(gen_pipe_state, undefined)
end.
%%
%% Example using gen_pipe
%%
%% Beware: this is all pseudo code which does not compile ...
%%
-module(mypipe).
%% export callbacks for gen_pipe
-export([pipeline/2,kv_get/2,kv_map/2]).
%% Example: Iterate BKeys (list of {Bucket,Key} pairs),
%% returning a list of {NumSiblings, Count}
%% giving a histogram of how many (Count) objects have NumSiblings.
countsiblings(BKeys) ->
mapreduce(BKeys,
fun(RObj) ->
Values = riak_object:values(RObj),
NumSiblings = length(Values),
{NumSiblings, 1}
end,
fun erlang:'+'/2).
mapreduce(BKeys,MapFun,RedFun) ->
{ok, Pipe} = gen_pipe:exec(?MODULE, [MapFun,RedFun], [{log,sasl}]),
[ gen_pipe:enqueue(Pipe, BKey) || BKey <- BKeys ],
gen_pipe:eoi(Pipe),
collect(gen_pipe:sink_ref(Pipe), []).
collect(Ref, Tail) ->
receive
#pipe_result{ ref=Ref, result=Result } ->
collect(Ref, [Result | Tail]);
#pipe_eoi{ ref=Ref } ->
lists:reverse(Tail)
end.
%%
%% Describe the pipeline generated by this module
%%
pipeline(MapFun,RedFun) ->
[{kv_get, [], [{chash, fun(KV) -> riak_core_util:chash_key(KV) end}, {nval, 3}]},
{kv_map, MapFun, [{chash, follow}]},
{reduce1, RedFun, [{chash, follow},
{function, fun reduce/2}]}
{reduce2, RedFun, [{chash, fun({K,_}) -> chash:key_of(K) end},
{function, fun reduce/2}
]}].
kv_get(init,[]) ->
{ok, riak:local_client()};
kv_get({process, {K,V}}, Client) ->
gen_pipe:emit( Client:get(K,V) ),
{ok, Client};
kv_get(done,_) ->
ok.
%%
%% The map phase applies a function to the object we got above
%%
%% fun(riak_object() -> {Key,Value})
%%
kv_map(init, MapFun) ->
{ok, MapFun}.
kv_map({process, {ok, RObj}}, MapFun) ->
case MapFun(RObj) of
{Key,Value}=Pair ->
gen_pipe:emit(Pair);
_ ->
ignore
{ok, MapFun};
kv_map({process, _Error}, MapFun) ->
%% ignore
{ok, MapFun};
kv_map(done, _) ->
ok.
%%
%% The reduce phase is described by this
%%
-record(redstate, { reduce :: fun(),
dict :: dict() }).
reduce(init,Fun) when is_function(Fun,2) ->
{ok, #redstate{ reduce=Fun, dict=dict:new() }}.
reduce({process, {Key,Value}}, State=#redstate{ reduce=Fun, dict=Dict }) ->
case dict:find( Key,Dict ) of
{ok, Old} ->
RedVal = Fun(Value,Old);
error ->
RedVal = Value
end,
{ok, State#redstate{ dict=dict:store(Key, RedVal, Dict) }}.
reduce(done, #redstate{ dict=Dict }) ->
[ gen_pipe:emit({Key,Value}) || {Key,Value} <- dict:to_list(Dict) ],
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment