Last active
August 29, 2015 14:04
-
-
Save vmx/ec04d67416276e69a02d to your computer and use it in GitHub Desktop.
Performing the same actions as the Couchbase view engine updater does
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env escript | |
% This script can be run from any directory with write permission. | |
% You need to set the ERL_LIBS environment variable to the location of the | |
% installed erlang apps. For example: | |
% ERL_LIBS=/opt/couchbase/lib/couchdb/erlang/lib PATH=$PATH:/opt/couchbase/bin escript initial_index.escript | |
-define(UPR_PORT, "11209"). | |
%-define(UPR_PORT, "12000"). | |
-define(STREAM_NAME, <<"faking_the_view-engine">>). | |
-define(BUCKET, <<"bucket-1">>). | |
-define(PASSWORD, <<"password">>). | |
% flow control buffer size 20 MB | |
-define(UPR_CONTROL_BUFFER_SIZE, 20971520). | |
-define(LOCAL_BUCKET_NAME, <<"initial_index">>). | |
-define(UPR_FLAG_DISKONLY, 16#02). | |
-define(UPR_FLAG_USELATEST_ENDSEQNO, 16#04). | |
config_files() -> | |
Local = filename:join([filename:absname("."), "initial_index.ini"]), | |
["/opt/couchbase/etc/couchdb/default.ini", | |
Local]. | |
create_config() -> | |
Local = filename:join([filename:absname("."), "initial_index.ini"]), | |
{ok, Fd} = file:open(Local, [write]), | |
file:write(Fd, <<"[couchdb]\ndatabase_dir = ./\nview_index_dir = ./\n\n[httpd]\nport = 0\nbind_address = 127.0.0.1\n\n[log]\nfile = initial_index.log\n">>), | |
file:close(Fd), | |
ok. | |
init() -> | |
start_server(?LOCAL_BUCKET_NAME), | |
couch_config:set("upr", "port", ?UPR_PORT, false), | |
couch_server_sup:start_link(config_files()), | |
ok. | |
get_active_partitions(UprPid) -> | |
{ok, Partitions} = get_stats(UprPid, <<"vbucket">>, nil), | |
lists:filtermap( | |
fun | |
({Partition, <<"active">>}) -> | |
[<<"vb">>, PartId] = binary:split(Partition, <<"_">>), | |
{true, list_to_integer(binary_to_list(PartId))}; | |
(_) -> | |
false | |
end, Partitions). | |
main(_) -> | |
ok = create_config(), | |
ok = init(), | |
{ok, UprPid} = couch_upr_client:start( | |
?STREAM_NAME, ?BUCKET, ?BUCKET, ?PASSWORD, | |
?UPR_CONTROL_BUFFER_SIZE), | |
Partitions = get_active_partitions(UprPid), | |
{ok, EndSeqs} = couch_upr_client:get_sequence_numbers(UprPid, Partitions), | |
Flags = ?UPR_FLAG_DISKONLY bor ?UPR_FLAG_USELATEST_ENDSEQNO, | |
PartVersions = [{0, 0}], | |
Since = 0, | |
ChangesWrapper = fun(_Mutation, Acc) -> Acc + 1 end, | |
T1 = now(), | |
lists:zipwith( | |
fun(PartId, EndSeq) -> | |
{ok, Count, _} = couch_upr_client:enum_docs_since( | |
UprPid, PartId, PartVersions, Since,EndSeq, | |
Flags, ChangesWrapper, 0), | |
log("vmx: part ~p received ~p messages", [PartId, Count]) | |
end, Partitions, EndSeqs), | |
log("vmx: Receiving all mutations took ~ps", | |
[timer:now_diff(now(), T1)/1000000]), | |
ok. | |
% From couch_set_view_test_util | |
start_server(SetName) -> | |
couch_config:start_link(config_files()), | |
DbDir = couch_config:get("couchdb", "database_dir"), | |
IndexDir = couch_config:get("couchdb", "view_index_dir"), | |
NewDbDir = filename:join([DbDir, binary_to_list(SetName)]), | |
NewIndexDir = filename:join([IndexDir, binary_to_list(SetName)]), | |
case file:make_dir(NewDbDir) of | |
ok -> | |
ok; | |
{error, eexist} -> | |
ok; | |
Error -> | |
throw(Error) | |
end, | |
case file:make_dir(NewIndexDir) of | |
ok -> | |
ok; | |
{error, eexist} -> | |
ok; | |
Error2 -> | |
throw(Error2) | |
end, | |
ok = couch_config:set("couchdb", "database_dir", NewDbDir, false), | |
ok = couch_config:set("couchdb", "view_index_dir", NewIndexDir, false). | |
-spec get_stats_reply(pid(), reference()) -> term(). | |
get_stats_reply(Pid, MRef) -> | |
receive | |
{get_stats, MRef, Reply} -> | |
Reply; | |
{'DOWN', MRef, process, Pid, Reason} -> | |
exit({upr_client_died, Pid, Reason}) | |
after 60000 -> | |
log("upr client (~p): vbucket-seqno stats timed out after ~p seconds." | |
" Waiting...", | |
[Pid, 60000 / 1000]), | |
get_stats_reply(Pid, MRef) | |
end. | |
-spec get_stats(pid(), binary(), non_neg_integer() | nil) -> term(). | |
get_stats(Pid, Name, PartId) -> | |
MRef = erlang:monitor(process, Pid), | |
Pid ! {get_stats, Name, PartId, {MRef, self()}}, | |
Reply = get_stats_reply(Pid, MRef), | |
erlang:demonitor(MRef, [flush]), | |
Reply. | |
log(Raw, Args) -> | |
Msg = io_lib:format(Raw, Args), | |
Time = now(), | |
{{Year, Month, Day}, {Hour, Minute, Second}} = | |
calendar:now_to_local_time(Time), | |
Millis = erlang:element(3, Time) div 1000, | |
io:format("~B-~2.10.0B-~2.10.0BT~B:~2.10.0B:~2.10.0B.~3.10.0B ~s~n", | |
[Year, Month, Day, Hour, Minute, Second, Millis, Msg]). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment