Skip to content

Instantly share code, notes, and snippets.

@binarytemple
Last active February 2, 2016 21:47
Show Gist options
  • Save binarytemple/31ce133fb703e620c592 to your computer and use it in GitHub Desktop.
Save binarytemple/31ce133fb703e620c592 to your computer and use it in GitHub Desktop.
Query objects on the real time queue (with sizes)

Load 10Mb of data into /tmp/foo:

sudo dd if=/dev/disk0 of=/tmp/foo bs=1m count=10

Start loading it into dev1 node

for i in `seq 1 10000`
do curl -XPUT "http://localhost:10018/buckets/food3/keys/ooopp$i" -d@/tmp/foo  
done

Kill the dev4 (replication target cluster (single node))

ps awux | grep dev4 | awk '{print $2}' |xargs kill -9 

Now the queue should be building up...

Load in the queue querying function:

DumpRtQ = fun(Dest) -> 

Results = lists:map( fun({_,_,Y,Z}) -> 
LocalForwards  = proplists:get_value( local_forwards ,  Z),
RoutedClusters = proplists:get_value( routed_clusters,  Z),
RO = riak_repl_util:from_wire(    lists:nth(1,  erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters} 
end,  riak_repl2_rtq:dumpq()  ),


OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) -> 
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++ 
binary:bin_to_list(Bucket) ++ "/" ++ 
binary:bin_to_list(Key) ++ " " ++ 
io_lib:format("~p",[LocalForwards]) ++ "," ++ 
io_lib:format("~p",[RoutedClusters]) 
end,"", Results),

file:write_file(Dest,OutputText)

end.

Execute it

DumpRtQ("/tmp/foo.txt").
ok

Example output

head -n5 /tmp/foo.txt

534:foo/3177bar1 ["riak2"],undefined
534:foo/3178bar1 ["riak2"],undefined
534:foo/3179bar1 ["riak2"],undefined
534:foo/3180bar1 ["riak2"],undefined
534:foo/3181bar1 ["riak2"],undefined
534:foo/3182bar1 ["riak2"],undefined
534:foo/3183bar1 ["riak2"],undefined
534:foo/3184bar1 ["riak2"],undefined
532:foo/3185bar1 ["riak2"],undefined

Analysis of the utility performance:

I modified the utility slightly, so it could be passed in the queue, I did this because the queue only lives for a while before being thrown away, so I needed to grap a copy I could play with. Anyhow, I did the following:



(dev1@127.0.0.1)1> Q = riak_repl2_rtq:dumpq().
[{9775,1,
  <<131,108,0,0,0,1,109,0,0,0,230,42,1,0,0,0,3,102,111,111,
    0,0,0,8,57,...>>,
.......
 {...}|...]


DumpRtQ = fun(Queue,Dest) ->

Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards  = proplists:get_value( local_forwards ,  Z),
RoutedClusters = proplists:get_value( routed_clusters,  Z),
RO = riak_repl_util:from_wire(    lists:nth(1,  erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end,  Queue  ),


OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),

file:write_file(Dest,OutputText)

end.
#Fun<erl_eval.12.82930912>
(dev1@127.0.0.1)3> eprof:start().
{ok,<0.31204.3>}

(dev1@127.0.0.1)11> eprof:profile( fun() -> DumpRtQ(Q,"/tmp/stats"), ok end ).
{ok,ok}

(dev1@127.0.0.1)12> eprof:analyze().

The result

FUNCTION                                     CALLS      %   TIME  [uS / CALLS]
--------                                     -----    ---   ----  [----------]
orddict:'-from_list/1-fun-0-'/2                  5   0.00      0  [      0.00]
gb_sets:is_element/2                             2   0.00      0  [      0.00]
gb_sets:is_member/2                              2   0.00      0  [      0.00]
erl_eval:'-expr/5-fun-1-'/4                      1   0.00      0  [      0.00]
sets:mk_seg/1                                    2   0.00      0  [      0.00]
erl_lint:guard/3                                 2   0.00      0  [      0.00]
erl_lint:unused_vars/3                           4   0.00      0  [      0.00]
erl_lint:canonicalize_string/1                   2   0.00      0  [      0.00]
erl_lint:check_format_3/2                        2   0.00      0  [      0.00]
erl_lint:extract_sequences/2                     4   0.00      0  [      0.00]
string:substr2/2                                 4   0.00      0  [      0.00]
file:write_file/2                                1   0.00      0  [      0.00]
gb_sets:is_member_1/2                            2   0.00      1  [      0.50]
gb_sets:from_ordset/1                            4   0.00      1  [      0.25]
gen:call/4                                       1   0.00      1  [      1.00]
gen:do_call/4                                    1   0.00      1  [      1.00]
erl_eval:'-expr/5-fun-0-'/3                      5   0.00      1  [      0.20]
lists:rumergel/3                                 2   0.00      1  [      0.50]
dict:get_bucket/2                               13   0.00      1  [      0.08]
packages:concat_1/1                             10   0.00      1  [      0.10]
erl_lint:start/2                                 2   0.00      1  [      0.50]
erl_lint:pattern/3                               6   0.00      1  [      0.17]
erl_lint:reject_bin_alias_expr/3                 6   0.00      1  [      0.17]
erl_lint:default_types/0                         2   0.00      1  [      0.50]
erl_lint:fun_clauses/3                           2   0.00      1  [      0.50]
erl_lint:fun_clause/3                            2   0.00      1  [      0.50]
erl_lint:pat_var/5                              14   0.00      1  [      0.07]
erl_lint:shadow_vars/4                           2   0.00      1  [      0.50]
erl_lint:check_old_unused_vars/3                 2   0.00      1  [      0.50]
erl_lint:warn_unused_vars/3                      4   0.00      1  [      0.25]
erl_lint:vtnew/2                                 8   0.00      1  [      0.13]
erl_lint:vtsubtract/2                            4   0.00      1  [      0.25]
erl_lint:vt_no_unsafe/1                          2   0.00      1  [      0.50]
erl_lint:check_format_1/1                        2   0.00      1  [      0.50]
erl_lint:check_format_2/2                        2   0.00      1  [      0.50]
erl_lint:check_format_2a/2                       2   0.00      1  [      0.50]
erl_lint:args_list/1                             4   0.00      1  [      0.25]
erl_lint:check_format_string/1                   2   0.00      1  [      0.50]
erl_lint:extract_sequence/3                      8   0.00      1  [      0.13]
erl_lint:control_type/2                          2   0.00      1  [      0.50]
erl_lint:is_local_function/2                     1   0.00      1  [      1.00]
erl_lint:is_autoimport_suppressed/2              1   0.00      1  [      1.00]
erl_lint:bif_clash_specifically_disabled/2       1   0.00      1  [      1.00]
erl_lint:'-fun_clauses/3-fun-0-'/3               2   0.00      1  [      0.50]
erl_lint:'-nowarn_function/2-lc$^0/1-0-'/2       1   0.00      1  [      1.00]
erl_lint:'-used_vars/2-fun-1-'/2                 5   0.00      1  [      0.20]
erl_lint:'-used_vars/2-fun-0-'/2                 5   0.00      1  [      0.20]
string:chr/2                                     4   0.00      1  [      0.25]
string:chr/3                                     4   0.00      1  [      0.25]
file:file_name/1                                 1   0.00      1  [      1.00]
file:make_binary/1                               1   0.00      1  [      1.00]
file:call/2                                      1   0.00      1  [      1.00]
file:check_and_call/2                            1   0.00      1  [      1.00]
file:check_args/1                                3   0.00      1  [      0.33]
file:native_name_encoding/0                      1   0.00      1  [      1.00]
orddict:from_list/1                              2   0.00      2  [      1.00]
ordsets:is_element/2                            19   0.00      2  [      0.11]
erl_eval:hide_calls/2                            2   0.00      2  [      1.00]
sets:new/0                                       2   0.00      2  [      1.00]
erl_lint:value_option/7                          2   0.00      2  [      1.00]
erl_lint:pseudolocals/0                          6   0.00      2  [      0.33]
erl_lint:used_vars/2                             2   0.00      2  [      1.00]
erl_lint:start/0                                 2   0.00      2  [      1.00]
erl_lint:is_warn_enabled/2                       3   0.00      2  [      0.67]
erl_lint:zip_file_and_line/2                     2   0.00      2  [      1.00]
erl_lint:nowarn_function/2                       1   0.00      2  [      2.00]
erl_lint:imported/3                              1   0.00      2  [      2.00]
erl_lint:pattern_list/5                          2   0.00      2  [      1.00]
erl_lint:guard_tests/3                           2   0.00      2  [      1.00]
erl_lint:modify_line/2                           2   0.00      2  [      1.00]
erl_lint:is_format_function/2                   13   0.00      2  [      0.15]
erl_lint:args_length/1                           4   0.00      2  [      0.50]
erl_lint:'-vt_no_unsafe/1-lc$^0/1-0-'/1          7   0.00      2  [      0.29]
gen_server:call/3                                1   0.00      2  [      2.00]
file:file_name_1/2                              11   0.00      2  [      0.18]
lists:member/2                                   6   0.00      2  [      0.33]
erlang:whereis/1                                 1   0.00      2  [      2.00]
gb_sets:empty/0                                  6   0.00      3  [      0.50]
gb_sets:balance_list/2                           4   0.00      3  [      0.75]
gb_sets:from_list/1                              4   0.00      3  [      0.75]
ordsets:from_list/1                              7   0.00      3  [      0.43]
lists:usplit_2_1/6                               4   0.00      3  [      0.75]
lists:umerge2_1/5                                8   0.00      3  [      0.38]
lists:umerge2_2/4                                6   0.00      3  [      0.50]
dict:find_val/2                                 13   0.00      3  [      0.23]
packages:concat/1                               10   0.00      3  [      0.30]
erl_lint:format_function/5                      13   0.00      3  [      0.23]
erl_lint:'-pattern_list/5-fun-0-'/5              9   0.00      3  [      0.33]
erl_lint:'-start/2-lc$^1/1-1-'/1                 8   0.00      3  [      0.38]
otp_internal:obsolete_1/3                       14   0.00      3  [      0.21]
string:substr/2                                  2   0.00      3  [      1.50]
erlang:demonitor/2                               1   0.00      3  [      3.00]
lists:usort/1                                    7   0.00      4  [      0.57]
lists:usplit_2/5                                 8   0.00      4  [      0.50]
lists:umergel/3                                  6   0.00      4  [      0.67]
erl_lint:head/4                                  5   0.00      4  [      0.80]
erl_lint:expr_var/4                             17   0.00      4  [      0.24]
erl_lint:check_unused_vars/3                     2   0.00      4  [      2.00]
erl_lint:'-unused_vars/3-fun-0-'/2              19   0.00      4  [      0.21]
otp_internal:obsolete/3                         14   0.00      4  [      0.29]
gb_sets:balance_list_1/2                        12   0.00      5  [      0.42]
dict:get_bucket_s/2                             13   0.00      5  [      0.38]
packages:concat/2                               10   0.00      5  [      0.50]
erl_lint:check_qlc_hrl/5                        13   0.00      5  [      0.38]
erlang:monitor/2                                 1   0.00      5  [      5.00]
erlang:fun_info/2                                3   0.00      5  [      1.67]
erl_lint:exprs/3                                14   0.00      6  [      0.43]
erl_lint:expr_list/3                            27   0.00      6  [      0.22]
erl_lint:vtold/2                                 8   0.00      6  [      0.75]
erl_lint:expand_package/2                       13   0.00      6  [      0.46]
dict:find/2                                     13   0.00      7  [      0.54]
packages:is_valid_1/1                           93   0.00      7  [      0.08]
erl_lint:'-start/2-lc$^0/1-0-'/1                28   0.00      7  [      0.25]
packages:is_valid/1                             10   0.00      8  [      0.80]
erl_lint:bool_option/4                          26   0.00      8  [      0.31]
erl_lint:pattern/5                              18   0.00      8  [      0.44]
erl_lint:check_remote_function/5                13   0.00      8  [      0.62]
erl_lint:deprecated_function/5                  14   0.00      8  [      0.57]
erl_lint:'-vtnew/2-fun-0-'/3                    29   0.00      9  [      0.31]
erl_lint:merge_used/2                           50   0.00     10  [      0.20]
erl_lint:'-expr_list/3-fun-0-'/3                48   0.00     10  [      0.21]
erlang:apply/2                                   2   0.00     11  [      5.50]
erlang:tuple_to_list/1                         124   0.00     11  [      0.09]
erlang:list_to_atom/1                           10   0.00     11  [      1.10]
ordsets:union/2                                 90   0.00     13  [      0.14]
erl_lint:vtupdate/2                             42   0.00     13  [      0.31]
erl_lint:vtmerge/2                              58   0.00     13  [      0.22]
erlang:send/3                                    1   0.00     13  [     13.00]
erl_scan:set_attr/3                            124   0.00     14  [      0.11]
erl_lint:'-vtold/2-fun-0-'/3                    51   0.00     14  [      0.27]
orddict:is_key/2                               140   0.01     15  [      0.11]
erl_lint:vtmerge_pat/2                          24   0.01     15  [      0.63]
erl_lint:merge_lines/2                          50   0.01     15  [      0.30]
erl_scan:set_attribute/3                       124   0.01     18  [      0.15]
erl_parse:set_line/2                           124   0.01     20  [      0.16]
erl_lint:'-vtupdate/2-fun-0-'/3                 50   0.01     21  [      0.42]
orddict:filter/2                               126   0.01     23  [      0.18]
erl_lint:'-default_types/0-lc$^0/1-0-'/1       112   0.01     23  [      0.21]
erl_eval:do_apply/5                              1   0.01     25  [     25.00]
erl_lint:'-zip_file_and_line/2-fun-0-'/2       124   0.01     25  [      0.20]
erl_lint:'-zip_file_and_line/2-fun-1-'/2       124   0.01     31  [      0.25]
erl_lint:expr/3                                 64   0.01     36  [      0.56]
riak_object:bucket/1                           594   0.02     54  [      0.09]
erl_lint:modify_line1/2                        299   0.02     61  [      0.20]
io_lib:write/1                                 594   0.02     63  [      0.11]
riak_object:key/1                              594   0.02     64  [      0.11]
io_lib_pretty:print_length_list/3              594   0.02     70  [      0.12]
lists:nth/2                                    594   0.03     72  [      0.12]
orddict:merge/3                                318   0.03     74  [      0.23]
io_lib:write_atom/1                            594   0.03     78  [      0.13]
io_lib_pretty:list_length_tail/2               594   0.03     85  [      0.14]
erlang:iolist_to_binary/1                        1   0.03     97  [     97.00]
io_lib:write/2                                 594   0.03     99  [      0.17]
riak_object:sibs_of_binary/2                   594   0.04    101  [      0.17]
lists:reverse/2                                599   0.04    110  [      0.18]
erlang:integer_to_list/1                       594   0.04    110  [      0.19]
erl_eval:eval_fun/5                           1189   0.04    111  [      0.09]
proplists:get_value/2                         1188   0.04    114  [      0.10]
io_lib_pretty:write_tail/2                     594   0.04    118  [      0.20]
io_lib_pretty:print/4                         1188   0.04    120  [      0.10]
erl_lint:keyword_warning/3                      29   0.04    123  [      4.24]
orddict:new/0                                 1190   0.04    125  [      0.11]
erl_eval:guard0/4                             1190   0.04    126  [      0.11]
erlang:list_to_tuple/1                         719   0.05    130  [      0.18]
io_lib:format/2                               1188   0.05    138  [      0.12]
lists:map/2                                    595   0.05    140  [      0.24]
erl_eval:'-expr/5-fun-2-'/5                    594   0.05    141  [      0.24]
io_lib:quote_atom/2                            594   0.05    142  [      0.24]
io_lib_pretty:write_list/2                     594   0.05    144  [      0.24]
io_lib_pretty:max_cs/2                         594   0.05    154  [      0.26]
riak_object:last_mod_meta/2                    594   0.06    165  [      0.28]
dict:from_list/1                               600   0.06    165  [      0.28]
io_lib_pretty:list_length/2                    594   0.06    169  [      0.28]
riak_object:deleted_meta/2                     594   0.06    170  [      0.29]
io_lib:write_string/2                          594   0.06    171  [      0.29]
riak_object:vtag_meta/2                        594   0.06    171  [      0.29]
erl_scan:reserved_word/1                       594   0.06    178  [      0.30]
riak_object:from_binary/3                      594   0.06    183  [      0.31]
riak_object:sibs_of_binary/3                  1188   0.07    193  [      0.16]
io_lib_format:encoding/2                      1188   0.07    200  [      0.17]
io_lib_format:pad_char/2                      1188   0.07    210  [      0.18]
io_lib_format:collect_cseq/2                  1188   0.08    218  [      0.18]
io_lib_format:field_width/3                   1188   0.08    223  [      0.19]
io_lib_format:pcount/1                        1188   0.08    224  [      0.19]
erl_eval:eval_fun/6                           1190   0.08    235  [      0.20]
erl_eval:hide/3                               1089   0.08    240  [      0.22]
io_lib_pretty:write/1                         1188   0.09    245  [      0.21]
erl_eval:new_bindings/0                       1190   0.09    247  [      0.21]
dict:mk_seg/1                                 1204   0.09    248  [      0.21]
io_lib_format:precision/2                     1188   0.09    251  [      0.21]
binary:bin_to_list/1                          1188   0.09    254  [      0.21]
erl_eval:'-expr/5-fun-3-'/6                    594   0.09    261  [      0.44]
io_lib_pretty:print/6                         1188   0.09    264  [      0.22]
io_lib_pretty:print_length_list1/3            1188   0.09    267  [      0.22]
erl_eval:guard/4                              1190   0.10    278  [      0.23]
io_lib_format:field_width/2                   1188   0.10    279  [      0.23]
io_lib_format:decr_pc/2                       1188   0.10    281  [      0.24]
riak_repl_util:from_wire/1                     594   0.10    283  [      0.48]
dict:new/0                                    1204   0.10    288  [      0.24]
io_lib_format:fwrite/2                        1188   0.10    293  [      0.25]
io_lib_format:pcount/2                        2376   0.10    301  [      0.13]
io_lib_pretty:print_length/3                  1188   0.11    305  [      0.26]
erl_eval:bif/5                                2376   0.11    317  [      0.13]
io_lib_pretty:printable_list/2                1188   0.11    319  [      0.27]
erl_eval:add_bindings/2                       1190   0.12    334  [      0.28]
io_lib_format:control/8                       1188   0.12    346  [      0.29]
riak_object:sib_of_binary/1                    594   0.12    352  [      0.59]
io_lib_format:field_value/2                   1188   0.12    358  [      0.30]
dict:maybe_expand/2                           4273   0.13    374  [      0.09]
io_lib:string_char/4                          2970   0.13    375  [      0.13]
io_lib_format:collect_cc/2                    1188   0.13    375  [      0.32]
io_lib_format:print/7                         3564   0.14    389  [      0.11]
erl_internal:bif/2                            2378   0.14    391  [      0.16]
proplists:get_value/3                         1782   0.14    399  [      0.22]
io_lib_format:build/3                         2376   0.15    430  [      0.18]
erlang:binary_to_list/1                        594   0.15    439  [      0.74]
io_lib_format:collect/2                       2376   0.16    473  [      0.20]
io_lib:printable_list/1                       4158   0.17    499  [      0.12]
erl_eval:match_list/4                         2974   0.17    500  [      0.17]
io_lib:name_chars/1                           5346   0.18    527  [      0.10]
dict:store/3                                  4273   0.18    528  [      0.12]
io_lib:write_string1/3                        3564   0.22    623  [      0.17]
erl_eval:eval_op/6                            5940   0.23    651  [      0.11]
riak_object:meta_of_binary/2                  2970   0.23    652  [      0.22]
erl_eval:exprs/5                              4757   0.24    680  [      0.14]
erl_eval:match/3                              3566   0.25    722  [      0.20]
erl_eval:match/4                              5350   0.25    723  [      0.14]
dict:'-from_list/1-fun-0-'/2                  3678   0.25    724  [      0.20]
dict:maybe_expand_aux/2                       4273   0.27    780  [      0.18]
erlang:phash/2                                4286   0.29    825  [      0.19]
io_lib:name_char/1                            4752   0.29    826  [      0.17]
erl_eval:'-add_bindings/2-fun-0-'/2           4754   0.29    849  [      0.18]
dict:'-store/3-fun-0-'/3                      4273   0.30    867  [      0.20]
erl_eval:match_tuple/5                        6534   0.32    908  [      0.14]
dict:store_bkt_val/3                          5153   0.33    941  [      0.18]
riak_object:decode_maybe_binary/1             5346   0.41   1179  [      0.22]
dict:get_slot/2                               4286   0.48   1386  [      0.32]
erl_eval:add_binding/3                        8320   0.48   1395  [      0.17]
erlang:binary_to_term/1                       3564   0.49   1406  [      0.39]
dict:on_bucket/3                              4273   0.49   1414  [      0.33]
packages:is_segmented/1                       7735   0.59   1712  [      0.22]
erl_internal:bif/3                            7725   0.62   1777  [      0.23]
erlang:atom_to_list/1                         8334   0.65   1859  [      0.22]
lists:reverse/1                               9510   0.68   1950  [      0.21]
erl_eval:expand_module_name/2                 7725   0.71   2032  [      0.26]
erlang:setelement/3                          17096   0.76   2199  [      0.13]
erl_eval:do_apply/6                          14259   0.78   2255  [      0.16]
erl_eval:expr_list/4                          8914   0.79   2267  [      0.25]
shell:'-eval_loop/3-fun-0-'/3                14259   0.81   2342  [      0.16]
shell:apply_fun/3                            14259   0.92   2648  [      0.19]
erlang:term_to_binary/1                        594   0.98   2813  [      4.74]
erl_eval:match1/4                            10696   1.00   2881  [      0.27]
erl_eval:expr_list/6                         23179   1.09   3138  [      0.14]
erl_eval:binding/2                           26149   1.73   4987  [      0.19]
erl_eval:ret_expr/3                          51700   1.79   5162  [      0.10]
orddict:to_list/1                            30308   1.88   5404  [      0.18]
orddict:store/3                              32685   1.94   5577  [      0.17]
packages:is_segmented_1/1                    67826   2.21   6352  [      0.09]
erl_eval:merge_bindings/2                    29118   2.45   7061  [      0.24]
erl_eval:expr/5                              52295   4.85  13955  [      0.27]
lists:foldl/3                               199302   6.83  19674  [      0.10]
erl_eval:'-merge_bindings/2-fun-0-'/2       159226   9.82  28281  [      0.18]
erlang:'++'/2                                 5940  11.27  32441  [      5.46]
orddict:find/2                              645307  32.23  92821  [      0.14]
ok
(dev1@127.0.0.1)13>

Continuing on, I had dumped the queue to a file yesterday, I edited it quickly (adding a full stop to the end of the file) so it could be read in by file:consult/1. This saves me from having to fire up replication every time I want to get my hands on that queue data.

Lets read that back in again:

(....)39> Sample=lists:sublist(hd(hd(tl( tuple_to_list(     file:consult("./dumped.queue.erl")  ) ))),2).
[{9775,1,
  <<131,108,0,0,0,1,109,0,0,0,230,42,1,0,0,0,3,102,111,111,
    0,0,0,8,57,...>>,
  [{local_forwards,["riak2"]}]},
 {9776,1,
  <<131,108,0,0,0,1,109,0,0,0,231,42,1,0,0,0,3,102,111,111,
    0,0,0,8,...>>,
  [{local_forwards,["riak2"]}]}]

And I test it with the jerry-rigged anonymous function:

DumpRtQ = fun(Queue,Dest,Count) ->

MaybeQueue = case Count of 
all -> Queue;
Num -> lists:sublist(Queue,Num)
end,


Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards  = proplists:get_value( local_forwards ,  Z),
RoutedClusters = proplists:get_value( routed_clusters,  Z),
RO = riak_repl_util:from_wire(    lists:nth(1,  erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end,  MaybeQueue  ),


OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),

file:write_file(Dest,OutputText)

end.
(dev1@127.0.0.1)61> DumpRtQ(Sample,"/tmp/test.all",all).
ok
(dev1@127.0.0.1)62> DumpRtQ(Sample,"/tmp/test.1",1).
ok

I check on the console that the function operates as expected.

[~%]cat /tmp/test.all

532:foo/9407bar1 ["riak2"],undefined
534:foo/9408bar1 ["riak2"],undefined%                                                                            [~%]cat /tmp/test.1

532:foo/9407bar1 ["riak2"],undefined%

Then I rewrite the function to use the riak_repl2_rtq:dumpq/0 function. And I test it one last time as before.

DumpRtQ = fun(Dest,Count) ->

Queue = riak_repl2_rtq:dumpq(),

MaybeQueue = case Count of 
all -> Queue;
Num -> lists:sublist(Queue,Num)
end,

Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards  = proplists:get_value( local_forwards ,  Z),
RoutedClusters = proplists:get_value( routed_clusters,  Z),
RO = riak_repl_util:from_wire(    lists:nth(1,  erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end,  MaybeQueue  ),


OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),

file:write_file(Dest,OutputText)

end.
(dev1@127.0.0.1)3> DumpRtQ("/tmp/all.txt",all).
ok
(dev1@127.0.0.1)4> DumpRtQ("/tmp/1.txt",1).
ok
[%]wc -l /tmp/all.txt
     469 /tmp/all.txt
[%]wc -l /tmp/1.txt
       1 /tmp/1.txt

Success!

Worth noting that I also scripted the 2 cluster (2 node) creation and setup like so:

riak_remove_data_dir_contents () {
        rm -rf ./dev/dev*/data/(anti_entropy|bitcask|kv_vnode|leveldb|riak_kv_exchange_fsm|riak_repl|ring)(N)
}

riak_stop_dev_nodes () {
        for i in ./**/dev[0-9]/bin/riak(:h)(N)
        do
                $i/riak stop
        done
}

riak_make_two_node_rt_repl () {
        riak_stop_dev_nodes
        riak_remove_data_dir_contents
        make devrel DEVNODES=6
        sed -i'' -e '/sfwi/{d;}' **/vm.args
        ./dev/dev1/bin/riak start
        ./dev/dev4/bin/riak start
        ./dev/dev1/bin/riak-admin wait-for-service riak_kv
        ./dev/dev4/bin/riak-admin wait-for-service riak_kv
        ./dev/dev1/bin/riak-repl clustername riak1
        ./dev/dev4/bin/riak-repl clustername riak2
        ./dev/dev1/bin/riak-repl connect 127.0.0.1:10046
        ./dev/dev1/bin/riak-repl connections
        ./dev/dev1/bin/riak-repl realtime enable
        ./dev/dev1/bin/riak-repl realtime enable riak2
        ./dev/dev1/bin/riak-repl realtime start riak2
}

That code only runs in the Z shell, adopt and survive!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment