Skip to content

Instantly share code, notes, and snippets.

@greggy
Last active February 23, 2017 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save greggy/7d7fa3102d89673019410c6e244650cd to your computer and use it in GitHub Desktop.
Save greggy/7d7fa3102d89673019410c6e244650cd to your computer and use it in GitHub Desktop.
-module(user_update_metadata).
-export([update_metadata/0]).
-include_lib("riak_pipe/include/riak_pipe.hrl").
-include("user_dispatch.hrl").
-define(PERIOD, 86400*10).
%% Default start date, 2008-01-01 00:00:00.
-define(START_DATE, 1199145600).
%%====================================================================
%% API
%%====================================================================
update_metadata() ->
Start = ?START_DATE,
End = ?START_DATE + ?PERIOD,
update_metadata(Start, End).
update_metadata(Start, End) ->
Spec = [
stream_to(mapred_items, fun insert_mapred_items/1, group_to(<<"item">>)),
stream_to(update_users, fun update_users/1, group_to(<<"update_users">>))
],
{ok, Pipe} = riak_pipe:exec(Spec, [{log, lager}]),
ok = riak_pipe:queue_work(Pipe, {Start, End}),
riak_pipe:eoi(Pipe),
{eoi, _Results, _} = riak_pipe:collect_results(Pipe, infinity),
riak_pipe:destroy(Pipe),
case std_util_date:unix_timestamp() > End of
true ->
update_metadata(End, End + ?PERIOD);
false ->
ok
end.
%%====================================================================
%% Internal functions
%%====================================================================
update_users(Item) ->
Value = binary_to_term(riak_object:get_value(Item)),
[{_Key, Value}] = statebox:value(Item),
Record = project_schema_user:new(Value),
NewMetadata = project_schema_user:riak_metadata(Record),
NewItem = riak_object:update_metadata(Item0, NewMetadata),
%% What's next?
[].
insert_mapred_items({Start, End}) ->
Query = [{map, {modfun, riak_kv_mapreduce, map_identity}, none, false},
{reduce, {modfun, riak_kv_mapreduce, reduce_set_union}, none, true}],
case riak_client:mapred({index, ?USER_BUCKET, <<"created-on_int">>, Start, End}, Query, timer:hours(24)) of
{ok, [{_, Results}]} ->
Results;
{ok, _} -> []
end.
group_to(Key) ->
fun(Output) -> riak_core_util:chash_key({Key, term_to_binary(Output)}) end.
stream_to(Name, Fun, ChashFun) ->
#fitting_spec{name=Name, module=riak_pipe_w_xform, arg=streamer(Fun), chashfun=ChashFun}.
streamer(Fun) ->
fun(Args, Partition, Fitting) ->
lists:foreach(fun(El) ->
ok = riak_pipe_vnode_worker:send_output(El, Partition, Fitting)
end, Fun(Args))
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment