Last active February 23, 2017 18:31
-define(PERIOD, 86400*10).
%% Default start date, 2008-01-01 00:00:00.
-define(START_DATE, 1199145600).
%% API
update_metadata() ->
Start = ?START_DATE,
update_metadata(Start, End).
update_metadata(Start, End) ->
Spec = [
stream_to(mapred_items, fun insert_mapred_items/1, group_to(<<"item">>)),
stream_to(update_users, fun update_users/1, group_to(<<"update_users">>))
{ok, Pipe} = riak_pipe:exec(Spec, [{log, lager}]),
ok = riak_pipe:queue_work(Pipe, {Start, End}),
{eoi, _Results, _} = riak_pipe:collect_results(Pipe, infinity),
case std_util_date:unix_timestamp() > End of
true ->
update_metadata(End, End + ?PERIOD);
false ->
%% Internal functions
update_users(Item) ->
Value = binary_to_term(riak_object:get_value(Item)),
[{_Key, Value}] = statebox:value(Item),
Record = project_schema_user:new(Value),
NewMetadata = project_schema_user:riak_metadata(Record),
NewItem = riak_object:update_metadata(Item0, NewMetadata),
%% What's next?
insert_mapred_items({Start, End}) ->
Query = [{map, {modfun, riak_kv_mapreduce, map_identity}, none, false},
{reduce, {modfun, riak_kv_mapreduce, reduce_set_union}, none, true}],
case riak_client:mapred({index, ?USER_BUCKET, <<"created-on_int">>, Start, End}, Query, timer:hours(24)) of
{ok, [{_, Results}]} ->
{ok, _} -> []
group_to(Key) ->
fun(Output) -> riak_core_util:chash_key({Key, term_to_binary(Output)}) end.
stream_to(Name, Fun, ChashFun) ->
#fitting_spec{name=Name, module=riak_pipe_w_xform, arg=streamer(Fun), chashfun=ChashFun}.
streamer(Fun) ->
fun(Args, Partition, Fitting) ->
lists:foreach(fun(El) ->
ok = riak_pipe_vnode_worker:send_output(El, Partition, Fitting)
end, Fun(Args))
