Skip to content

Instantly share code, notes, and snippets.

@erikleitch
Last active April 21, 2016 11:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save erikleitch/4d97cb5ff943a4cc2ee115264ecff246 to your computer and use it in GitHub Desktop.
Save erikleitch/4d97cb5ff943a4cc2ee115264ecff246 to your computer and use it in GitHub Desktop.
-module(mrts).
-compile([export_all]).
%-----------------------------------------------------------------------
% Buckets we know about
%-----------------------------------------------------------------------
bucket() ->
{<<"ts_weather_demo">>,<<"ts_weather_demo">>}.
locationbucket() ->
locationbucket(mr).
locationbucket(mr) ->
{<<"locationupdateevents">>,<<"locationupdateevents">>};
locationbucket(lk) ->
<<"locationupdateevents">>.
geobucket() ->
geobucket(mr).
geobucket(mr) ->
{<<"GeoCheckin">>,<<"GeoCheckin">>};
geobucket(lk) ->
<<"GeoCheckin">>.
%-----------------------------------------------------------------------
% Use map reduce to count all keys in the bucket
%-----------------------------------------------------------------------
count(DevNo) ->
count(geobucket(), DevNo).
count(Bucket, DevNo) ->
mrtsTest(Bucket, count, DevNo).
%-----------------------------------------------------------------------
% Use map reduce to return all keys in the bucket
%-----------------------------------------------------------------------
anonkeys() ->
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017),
riakc_pb_socket:mapred_bucket(Riak, geobucket(), []).
keys(Method, DevNo) ->
keys(Method, geobucket(Method), DevNo).
keys(mr, Bucket, DevNo) ->
mrtsTest(Bucket, keys, DevNo);
keys(lk, Bucket, DevNo) ->
Riak = pb_socket(DevNo),
riakc_ts:stream_list_keys(Riak, Bucket, []),
receive_keys([]).
receive_keys(Keys) ->
receive
{_, {keys, KeyList}} ->
receive_keys(Keys ++ KeyList);
{_, done} ->
Keys;
Ret ->
io:format(user, "Ret = ~p~n", [Ret])
end.
%-----------------------------------------------------------------------
% General MR request
%-----------------------------------------------------------------------
mrtsTest(Bucket, Stat, DevNo) ->
Riak = pb_socket(DevNo),
case Stat of
keys ->
{ok, [{0, Ret}]} = riakc_pb_socket:mapred_bucket(Riak, Bucket,[{map, {modfun, riak_kv_mapreduce, map_object_value},none,true}]),
Ret;
count ->
{ok, [{1, [Nkey]}]} =
riakc_pb_socket:mapred_bucket(Riak, Bucket,
[{map, {qfun, fun(I,_,_) -> [1] end}, none,false},
{reduce, {modfun, riak_kv_mapreduce, reduce_sum}, none, true}]),
Nkey
end.
pb_socket(DevNo) ->
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", dev_port(DevNo)),
Riak.
dev_port(DevNo) ->
10017 + (DevNo-1)*10.
%-----------------------------------------------------------------------
% Issue a formatted query
%-----------------------------------------------------------------------
query(Tmin, Tmax, Weather, Fam) ->
"SELECT * FROM ts_weather_demo "
"WHERE time >= " ++ integer_to_list(Tmin) ++ " AND time <= " ++ integer_to_list(Tmax) ++
"AND weather = '" ++ Weather ++ "' " ++
"AND family = '" ++ Fam ++ "' ".
%-----------------------------------------------------------------------
% Issue an arbitrary query
%-----------------------------------------------------------------------
query(location) ->
Q = "SELECT * FROM locationupdateevents "
"WHERE tstamp >= 1416321188291 AND tstamp <= 1416321228319 "
"AND userid = 'anon:' "
"AND eventid = 'a11fbcae-23bf-4535-b3f8-e5013b35e366' ",
query(Q);
query(default) ->
Q = "SELECT * FROM ts_weather_demo "
"WHERE time >= 0 AND time <= 400 "
"AND weather = 'crap' "
"AND family = 'family' ",
query(Q);
query(native) ->
Q = "SELECT * FROM ts_weather_demo "
"WHERE time >= 0 AND time <= 400 "
"AND weather = 'native' "
"AND family = 'family' ",
query(Q);
query(nonnative) ->
Q = "SELECT * FROM ts_weather_demo "
"WHERE time >= 0 AND time <= 400 "
"AND weather = 'non-native' "
"AND family = 'family' ",
query(Q);
query(Query) ->
io:format("Executing query: ~p~n", [Query]),
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 10017),
riakc_ts:query(Riak, Query).
query(C, Query) ->
io:format("Executing query: ~p~n", [Query]),
riakc_ts:query(C, Query).
%-----------------------------------------------------------------------
% List buckets
%-----------------------------------------------------------------------
listBuckets() ->
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087),
riakc_pb_socket:list_buckets(Riak).
getClient(UseNativeEncoding) ->
C = pb_socket(1),
riakc_pb_socket:use_native_encoding(C, UseNativeEncoding),
C.
putC(C) ->
Data = [[<<"test">>, 54.0, <<"family">>, 0.2, 100, 500.0]],
riakc_ts:put(C, <<"ts_weather_demo">>, Data).
putTs(UseNativeEncoding) ->
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 8087),
riakc_pb_socket:use_native_encoding(C, UseNativeEncoding),
case UseNativeEncoding of
true ->
Data = [[<<"native">>, <<"family">>, 100, 54.0, 0.2, 500.0]];
_ ->
Data = [[<<"non-native">>, <<"family">>, 100, 54.0, 0.2, 500.0]]
end,
riakc_ts:put(C, <<"ts_weather_demo">>, Data).
putTestKeys() ->
{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087),
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey1">>, <<"val1">>)),
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey2">>, <<"val2">>)),
riakc_pb_socket:put(Riak, riakc_obj:new(<<"TestBucket">>, <<"myKey3">>, <<"val3">>)).
listLength(L) ->
docount(L).
docount(L) ->
docount(L, 0).
docount({}, Acc) ->
Acc;
docount([], Acc) ->
Acc;
docount(L, Acc) ->
[H|R] = L,
docount(R,Acc+1).
keycount(Method, _DevNo, 0, _Nexpected) ->
ok;
keycount(Method, DevNo, N, Nexpected) ->
Nexpected = keycount(Method, DevNo),
keycount(Method, DevNo, N-1, Nexpected).
keycount(Method, DevNo) ->
Keys = keys(Method, DevNo),
docount(Keys).
keylist(Method, DevNo) ->
Keys = keys(Method, DevNo),
{ok, Log} = file:open("keys.out", [append]),
listkey(Keys, 0, Log, Method).
listkey({}, Acc, Log, Method) ->
Acc;
listkey([], Acc, Log, Method) ->
Acc;
listkey(L, Acc, Log, mr) ->
[H|R] = L,
{_,Userid} = lists:nth(1,H),
{_,Tstamp} = lists:nth(3,H),
io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]),
listkey(R,Acc+1,Log,mr);
listkey(L, Acc, Log, lk) ->
[H|R] = L,
T = tuple_to_list(H),
Userid = lists:nth(1,T),
Tstamp = lists:nth(3,T),
io:format(Log,"~s~s~n",[Userid,integer_to_list(Tstamp)]),
listkey(R,Acc+1,Log,lk).
clientTest(pb) ->
Cttb = getClient(true),
Cpb = getClient(false),
unlink(Cttb),
unlink(Cpb),
{Cpb, Cttb};
clientTest(ttb) ->
Cpb = getClient(false),
Cttb = getClient(true),
unlink(Cttb),
unlink(Cpb),
{Cpb, Cttb}.
pt() ->
Tuple = process_info(self(), current_function),
io:format("Process info = ~p~n", [Tuple]).
dq() ->
query("SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'").
dq(_UseNative) ->
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017),
%% riakc_pb_socket:use_native_encoding(C, UseNative),
Query = "SELECT * FROM GeoCheckin WHERE time >= 0 AND time < 1000 AND myfamily = 'family1' AND myseries = 'seriesX'",
query(C, Query).
dp(UseNative) ->
dp(UseNative, 1).
dp(_UseNative, Time) ->
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017),
%% riakc_pb_socket:use_native_encoding(C, UseNative),
qfPutC(C, Time).
dp(UseNativeInit, UseNativeSend, Time) ->
{ok, C} = riakc_pb_socket:start_link("127.0.0.1", 10017),
riakc_pb_socket:use_native_encoding(C, UseNativeInit),
qfPutC(C, UseNativeSend, Time).
qfPutC(C, Time) ->
Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]],
riakc_ts:put(C, <<"GeoCheckin">>, Data).
qfPutC(C, UseNativeSend, Time) ->
Data = [[<<"family1">>, <<"seriesX">>, Time, 1, <<"test1">>, 1.0, true]],
riakc_ts:put(C, <<"GeoCheckin">>, Data, UseNativeSend).
queryfailtest() ->
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]],
{ok, Cttb} = riakc_pb_socket:start_link("127.0.0.1", 10017),
riakc_pb_socket:use_native_encoding(Cttb, true),
riakc_ts:put(Cttb, <<"GeoCheckin">>, Data),
{ok, Cpb} = riakc_pb_socket:start_link("127.0.0.1", 10017),
riakc_pb_socket:use_native_encoding(Cpb, false),
riakc_ts:put(Cpb, <<"GeoCheckin">>, Data),
riakc_ts:put(Cttb, <<"GeoCheckin">>, Data).
append_file(Filename, Bytes) ->
case file:open(Filename, [append]) of
{ok, IoDevice} ->
file:write(IoDevice, Bytes),
file:close(IoDevice);
{error, Reason} ->
io:format("~s open error reason:~s~n", [Filename, Reason])
end.
getids(CoverageVNodes) ->
[riak_core_coverage_plan:index_to_id(X,64) || {X, _} <- CoverageVNodes].
putNormal() ->
C = getClient(),
Key = <<"test">>,
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]],
Obj = riakc_obj:new({<<"GeoCheckin">>,<<"GeoCheckin">>}, Key, Data),
riakc_pb_socket:put(C, Obj).
putTs() ->
C = getClient(),
Data = [[<<"family1">>, <<"seriesX">>, 100, 1, <<"test1">>, 1.0, true]],
riakc_ts:put(C, <<"GeoCheckin">>, Data).
%-----------------------------------------------------------------------
% Return the node ids for the current ring
%-----------------------------------------------------------------------
getringnodes() ->
{_,_,_,Tuple,_,_,_,_,_,_,_} = riak_core_ring:fresh(),
{8, NodeList} = Tuple,
[X || {X, _} <- NodeList].
%-----------------------------------------------------------------------
% Get the ID of the vnode to which this Bucket,Key pair will be hashed
%-----------------------------------------------------------------------
getnodeid(Bucket, Key) ->
riak_core_apl:get_primary_apl(riak_core_util:chash_key({Bucket, Key}), 1, riak_kv).
%-----------------------------------------------------------------------
% Find unique keys for the current ring partitions
%-----------------------------------------------------------------------
findkeys() ->
findkeys(getringnodes()).
%-----------------------------------------------------------------------
% Find unique keys for the passed vnode ids
%-----------------------------------------------------------------------
findkeys(Nodes) ->
findkeys(Nodes, Nodes, [], 0).
findkeys(_Nodes, [], Keys, _Acc) ->
Keys;
findkeys(Nodes, RemainingNodes, Keys, Acc) ->
Key = integer_to_binary(Acc),
[{{NodeId,_},primary}] = getnodeid(Key),
case lists:filter(fun(Elem) -> Elem == NodeId end, RemainingNodes) of
[] ->
NodesLeft = RemainingNodes,
NewKeys = Keys;
_ ->
NodesLeft = lists:delete(NodeId, RemainingNodes),
NewKeys = lists:append(Keys, [Key])
end,
findkeys(Nodes, NodesLeft, NewKeys, Acc+1).
%-----------------------------------------------------------------------
% Write keys to the specified bucket (KV write)
%-----------------------------------------------------------------------
writeToKvPartitions(Bucket, Keys) ->
C = getClient(),
PutFun =
fun(Key) ->
Obj = riakc_obj:new(Bucket, Key, Key),
ok = riakc_pb_socket:put(C, Obj)
end,
[PutFun(X) || X <- Keys].
%-----------------------------------------------------------------------
% Write one key per partition to the specified bucket
%-----------------------------------------------------------------------
writeOneKeyPerKvPartition(Bucket) ->
Keys = findkeys(),
writeToKvPartitions(Bucket, Keys).
%%=======================================================================
%% Torben's additions below
%%=======================================================================
%% adapted from riak_test ts_cluster_coverage
quantum_ms() ->
15*60*1000.
step() ->
3124.
upper_bound_excl(QuantaTally) ->
QuantaTally * quantum_ms().
timestamps(QuantaTally) ->
Bound = upper_bound_excl(QuantaTally),
lists:seq(1, Bound-1, step()).
timestamps() ->
timestamps(100).
%% key analysis
key_timestamp({_,_,T}) -> T.
timestamps_of_keys(Keys) ->
lists:map( fun key_timestamp/1, Keys).
timestamp_analysis(TSList) ->
List = lists:sort(TSList),
Expected = timestamps(),
Duplicates = List -- Expected,
UniqueDuplicates = lists:usort(Duplicates),
Missing = Expected -- List,
Unique = lists:usort(List),
UniqueCount = length(Unique),
DuplicatesCount = length(Duplicates),
UniqueDuplicatesCount = length(UniqueDuplicates),
MissingCount = length(Missing),
ReceivedCount = length(List),
[{received_count, ReceivedCount},
{unique_count, UniqueCount},
{missing_count, MissingCount},
{duplicates_count, DuplicatesCount},
{unique_duplicates_count, UniqueDuplicatesCount},
{missing, Missing},
{duplicates, Duplicates},
{unique_duplicates, UniqueDuplicates}].
get_keys_and_analyse(DevNo) ->
TSList = mrts:timestamps_of_keys(mrts:keys(lk, DevNo)),
timestamp_analysis(TSList).
analysis(DurationInSecs) ->
StartTime = current_time(),
EndTime = end_time(StartTime, DurationInSecs),
start_mrts_pick_dev(),
Res = run_analysis(initial_stats(), EndTime),
stop_mrts_pick_dev(),
Res.
run_once() ->
Start = current_time(),
DevNo = pick_dev(),
Res = get_keys_and_analyse(DevNo),
Hash = erlang:phash2(Res),
{Hash, Res, DevNo, Start}.
pick_dev() ->
mrts_pick_dev ! {pick, self()},
receive
DevNo ->
DevNo
end.
devs() ->
lists:seq(1,3).
start_mrts_pick_dev() ->
Pid = spawn( fun() -> pick_dev_loop(devs()) end ),
register(mrts_pick_dev, Pid).
stop_mrts_pick_dev() ->
case whereis(mrts_pick_dev) of
Pid when is_pid(Pid) ->
exit(Pid, kill);
_ ->
ok
end.
pick_dev_loop([]) ->
pick_dev_loop(shuffle(devs()));
pick_dev_loop([Dev|Devs]) ->
receive
{pick, From} ->
From ! Dev,
pick_dev_loop(Devs)
end.
shuffle(L) ->
[X ||
{_,X} <- lists:sort([ {random:uniform(), N}
|| N <- L])].
run_analysis(Stats, EndTime) ->
{_,_,_, Start} = Res = run_once(),
NewStats = update_stats(Stats, Res),
case Start > EndTime of
true ->
NewStats;
false ->
run_analysis(NewStats, EndTime)
end.
initial_stats() ->
E = orddict:new(),
{E, E, E, E, E}.
update_stats({HashRes, DevCounts, DevTimes, Counts, Times}, {Hash, Res, DevNo, Start}) ->
NewHashRes = orddict:store(Hash, Res, HashRes),
Key = {Hash, DevNo},
NewDevCounts = inc_count(Key, DevCounts),
NewDevTimes = add_time(Key, Start, DevTimes),
NewCounts = inc_count(Hash, Counts),
NewTimes = add_time(Hash, Start, Times),
{NewHashRes, NewDevCounts, NewDevTimes, NewCounts, NewTimes}.
inc_count(Key, Counts) ->
orddict:update_counter(Key, 1, Counts).
add_time(Key, Start, Times) ->
orddict:append(Key, Start, Times).
current_time() ->
erlang:localtime().
end_time(StartTime, DurationInSecs) ->
S = calendar:datetime_to_gregorian_seconds(StartTime),
E = S + DurationInSecs,
calendar:gregorian_seconds_to_datetime(E).
write_results(HashRes) ->
List = orddict:to_list(HashRes),
lists:foreach( fun write_res/1,
HashRes ).
write_res({Hash, Res}) ->
Filename = io_lib:format("~B.txt", [Hash]),
ResStr = io_lib:format("~p~n", [Res]),
file:write_file(Filename, ResStr).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment