Skip to content

Instantly share code, notes, and snippets.

@mcobzarenco
Created August 3, 2015 10:57
Show Gist options
  • Save mcobzarenco/5b1e47daadf3aac1c941 to your computer and use it in GitHub Desktop.
Save mcobzarenco/5b1e47daadf3aac1c941 to your computer and use it in GitHub Desktop.
MapReduce Riak job for doing a join
emit_left_id(Obj, _, TargetBucket) ->
try
Key = riak_object:key(Obj),
Triple = mochijson2:decode(riak_object:get_value(Obj)),
case proplists:get_all_values(<<"left_id_s">>, element(2, Triple)) of
[Id] -> [{{{<<"maps">>, TargetBucket}, Id},
{{<<"maps">>, TargetBucket}, Id, Key, Triple}}];
_ -> []
end
catch
throw:invalid_utf8 ->
io:format("ERROR in join_triples b=~p k=~p: invalid JSON, illegal UTF-8 character",
[riak_object:get_bucket(Obj), riak_object:get_key(Obj)]),
[]
end.
merge_triple(_Obj, {Bucket, Id, TripleId, Triple}=_Emited, _) ->
{ok, Client} = riak:local_client(),
Joined = case Client:get(Bucket, Id) of
{error, notfound} -> riak_kv_crdt:new(Bucket, Id, riak_dt_map);
{ok, O} -> O
end,
{{Ctx, _}, _} = riak_kv_crdt:value(Joined, riak_dt_map),
TripleJson = iolist_to_binary(mochijson2:encode(Triple)),
AddTripleOps = [{update, {TripleId, riak_dt_lwwreg},
{assign, TripleJson}}],
IndexOps = indexes_for_triple(Triple),
Ops = {crdt_op, riak_dt_map, {update, AddTripleOps ++ IndexOps}, Ctx},
NewJoined = riak_kv_crdt:update(Joined, TripleId, Ops),
Client:put(NewJoined),
[].
%% Hidden:
indexes_for_triple(Triple) ->
{struct, TripleContent} = Triple,
case proplists:get_value(<<"relation">>, TripleContent) of
<<"has_name">> -> {struct, R} = proplists:get_value(<<"right">>, TripleContent),
[Name|_] = proplists:get_value(<<"text">>, R),
[{update, {<<"has_name_s">>, riak_dt_lwwreg}, {assign, Name}}];
_ -> []
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment