Created
June 16, 2011 12:16
-
-
Save krestenkrab/1029117 to your computer and use it in GitHub Desktop.
Ramblings on a gen_pipe DSL for riak_pipe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% | |
%% Generic riak_pipe_vnode_worker worker, so you can write multiple fittings in one file. | |
%% | |
%% Each fitting is a function which must accept | |
%% | |
%% ImplFun(init, InitArg) -> {ok, State} | |
%% ImplFun({process, Input}, State) -> {ok, State} | forward_preflist | |
%% ImplFun(done, State) -> ok | |
%% | |
%% gen_pipe also has some more friendly API | |
%% | |
%% gen_pipe:exec(Module, [PipeInitArgs]) -> #gen_pipe | |
%% gen_pipe:enqueue(#gen_pipe, Data) %% enqueue data | |
%% gen_pipe:eoi(#gen_pipe) %% mark end of input-data | |
%% gen_pipe:emit(Data) -> ok %% send Data to next fitting | |
%% | |
%% gen_pipe:exec/2 will invoke Module:pipeline(PipeInitArgs) to get the gen_pipe-style | |
%% list of fitting specs. That's a list of | |
%% | |
%% {Name, InitArg, Options} | |
%% | |
%% By default, ImplFun is assumed to be Module:Name, unless one of the | |
%% options is {function, ImplFun}. | |
%% | |
-module(gen_pipe). | |
-behavior(riak_pipe_vnode_worker). | |
-export([exec/0, exec/1, exec/2]). | |
-record(state,{ implfun :: fun(), | |
istate :: term(), | |
partition, | |
fitting_details }). | |
-record(gen_pipe, { head, sink }). | |
exec(Module) -> | |
exec(Module,[],[]). | |
exec(Module,InitArgs) -> | |
exec(Module,InitArgs,[]). | |
exec(Module,InitArgs,Options) -> | |
Fittings = erlang:apply(Module, pipeline, InitArgs), | |
Specs = [ make_fitting_spec(Name) || Name <- Fittings ], | |
{ok, Head, Sink} = riak_pipe:exec(Specs, Options), | |
{ok, #gen_pipe{ head=Head, sink=Sink }}. | |
%% | |
%% emit Output from inside a gen_pipe fitting | |
%% | |
emit(Output) -> | |
case erlang:get(gen_pipe_state) of | |
#state{ partition=Partition, fitting_details=FittingDetails } -> | |
riak_pipe_vnode_worker:send_output(Output, Partition, FittingDetails), | |
_ -> | |
exit({error, "emit can only be called from gen_pipe process callback"}) | |
end. | |
enqueue(#gen_pipe{head=Head}, Value) -> | |
riak_pipe_vnode:queue_work(Head, Value). | |
eoi(#gen_pipe{head=Head}) -> | |
riak_pipe_fitting:eoi(Head). | |
sink_ref(#gen_pipe{sink=#fitting{ ref=Ref }}) -> | |
Ref. | |
last_in_preflist() -> | |
case erlang:get(gen_pipe_last_in_preflist) of | |
true -> | |
true; | |
false -> | |
false | |
end. | |
make_fitting_spec(Module,Name) when is_atom(Name) -> | |
make_fitting_spec(Module,{Name,[],[]}); | |
make_fitting_spec(Module,{Name,InitArgs}) -> | |
make_fitting_spec(Module,{Name,InitArgs,[]}); | |
make_fitting_spec(Module,{Name,InitArgs,Options}) -> | |
case proplists:lookup(follow,Options) of | |
{follow,true} -> | |
CHashFun = follow; | |
_ -> | |
case proplists:lookup(chash,Options) of | |
{chash, Fun} -> | |
CHashFun = Fun; | |
_ -> | |
CHashFun = fun chash:key_of/1 | |
end | |
end, | |
case proplists:lookup(nval,Options) of | |
{nval, NVal} -> | |
ok; | |
_ -> | |
NVal = 1 | |
end, | |
case proplists:lookup(function,Options) of | |
{function, ImplFun} when is_function(ImplFun,2) -> | |
ok; | |
{function, FunName} when is_atom(ImplFun) -> | |
ImplFun = Module:FunName/2 | |
_ -> | |
ImplFun = Module:Name/2 | |
end, | |
#fitting_spec{ name = Name, | |
module = gen_pipe_fitting, | |
arg = { ImplFun, InitArg }, | |
chashfun = CHashFun | |
nval = NVal }. | |
%% | |
%% callbacks for riak_pipe_vnode_worker | |
%% | |
init(Partition, FittingDetails=#fitting_details{ arg={ImplFun,InitArg} }) -> | |
{ok, IState} = ImplFun(init,InitArg), | |
{ok, #state{ istate=IState, | |
implfun=ImplFun, | |
partition=Partition, | |
fitting_details=FittingDetails }}. | |
process(Input,LastInPreflist,State=#state{ implfun=ImplFun, istate=IState }) -> | |
erlang:put(gen_pipe_last_in_preflist, LastInPreflist), | |
erlang:put(gen_pipe_state, State), | |
begin | |
case ImplFun({process,Input}, IState) of | |
{ok, NewIState} -> | |
{ok, State#state{ istate=NewIState }}; | |
forward_preflist -> | |
forward_preflist | |
end | |
after | |
erlang:put(gen_pipe_state, undefined) | |
end. | |
done(State#state{ implfun=ImplFun, istate=IState }) -> | |
erlang:put(gen_pipe_state, State), | |
begin | |
ImplFun(done, IState) | |
after | |
erlang:put(gen_pipe_state, undefined) | |
end. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% | |
%% Example using gen_pipe | |
%% | |
%% Beware: this is all pseudo code which does not compile ... | |
%% | |
-module(mypipe). | |
%% export callbacks for gen_pipe | |
-export([pipeline/2,kv_get/2,kv_map/2]). | |
%% Example: Iterate BKeys (list of {Bucket,Key} pairs), | |
%% returning a list of {NumSiblings, Count} | |
%% giving a histogram of how many (Count) objects have NumSiblings. | |
countsiblings(BKeys) -> | |
mapreduce(BKeys, | |
fun(RObj) -> | |
Values = riak_object:values(RObj), | |
NumSiblings = length(Values), | |
{NumSiblings, 1} | |
end, | |
fun erlang:'+'/2). | |
mapreduce(BKeys,MapFun,RedFun) -> | |
{ok, Pipe} = gen_pipe:exec(?MODULE, [MapFun,RedFun], [{log,sasl}]), | |
[ gen_pipe:enqueue(Pipe, BKey) || BKey <- BKeys ], | |
gen_pipe:eoi(Pipe), | |
collect(gen_pipe:sink_ref(Pipe), []). | |
collect(Ref, Tail) -> | |
receive | |
#pipe_result{ ref=Ref, result=Result } -> | |
collect(Ref, [Result | Tail]); | |
#pipe_eoi{ ref=Ref } -> | |
lists:reverse(Tail) | |
end. | |
%% | |
%% Describe the pipeline generated by this module | |
%% | |
pipeline(MapFun,RedFun) -> | |
[{kv_get, [], [{chash, fun(KV) -> riak_core_util:chash_key(KV) end}, {nval, 3}]}, | |
{kv_map, MapFun, [{chash, follow}]}, | |
{reduce1, RedFun, [{chash, follow}, | |
{function, fun reduce/2}]} | |
{reduce2, RedFun, [{chash, fun({K,_}) -> chash:key_of(K) end}, | |
{function, fun reduce/2} | |
]}]. | |
kv_get(init,[]) -> | |
{ok, riak:local_client()}; | |
kv_get({process, {K,V}}, Client) -> | |
gen_pipe:emit( Client:get(K,V) ), | |
{ok, Client}; | |
kv_get(done,_) -> | |
ok. | |
%% | |
%% The map phase applies a function to the object we got above | |
%% | |
%% fun(riak_object() -> {Key,Value}) | |
%% | |
kv_map(init, MapFun) -> | |
{ok, MapFun}. | |
kv_map({process, {ok, RObj}}, MapFun) -> | |
case MapFun(RObj) of | |
{Key,Value}=Pair -> | |
gen_pipe:emit(Pair); | |
_ -> | |
ignore | |
{ok, MapFun}; | |
kv_map({process, _Error}, MapFun) -> | |
%% ignore | |
{ok, MapFun}; | |
kv_map(done, _) -> | |
ok. | |
%% | |
%% The reduce phase is described by this | |
%% | |
-record(redstate, { reduce :: fun(), | |
dict :: dict() }). | |
reduce(init,Fun) when is_function(Fun,2) -> | |
{ok, #redstate{ reduce=Fun, dict=dict:new() }}. | |
reduce({process, {Key,Value}}, State=#redstate{ reduce=Fun, dict=Dict }) -> | |
case dict:find( Key,Dict ) of | |
{ok, Old} -> | |
RedVal = Fun(Value,Old); | |
error -> | |
RedVal = Value | |
end, | |
{ok, State#redstate{ dict=dict:store(Key, RedVal, Dict) }}. | |
reduce(done, #redstate{ dict=Dict }) -> | |
[ gen_pipe:emit({Key,Value}) || {Key,Value} <- dict:to_list(Dict) ], | |
ok. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment