Skip to content

Instantly share code, notes, and snippets.

@vmx
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmx/ec04d67416276e69a02d to your computer and use it in GitHub Desktop.
Save vmx/ec04d67416276e69a02d to your computer and use it in GitHub Desktop.
Performing the same actions as the Couchbase view engine updater does
#!/usr/bin/env escript
% This script can be run from any directory with write permission.
% You need to set the ERL_LIBS environment variable to the location of the
% installed erlang apps. For example:
% ERL_LIBS=/opt/couchbase/lib/couchdb/erlang/lib PATH=$PATH:/opt/couchbase/bin escript initial_index.escript
-define(UPR_PORT, "11209").
%-define(UPR_PORT, "12000").
-define(STREAM_NAME, <<"faking_the_view-engine">>).
-define(BUCKET, <<"bucket-1">>).
-define(PASSWORD, <<"password">>).
% flow control buffer size 20 MB
-define(UPR_CONTROL_BUFFER_SIZE, 20971520).
-define(LOCAL_BUCKET_NAME, <<"initial_index">>).
-define(UPR_FLAG_DISKONLY, 16#02).
-define(UPR_FLAG_USELATEST_ENDSEQNO, 16#04).
config_files() ->
Local = filename:join([filename:absname("."), "initial_index.ini"]),
["/opt/couchbase/etc/couchdb/default.ini",
Local].
create_config() ->
Local = filename:join([filename:absname("."), "initial_index.ini"]),
{ok, Fd} = file:open(Local, [write]),
file:write(Fd, <<"[couchdb]\ndatabase_dir = ./\nview_index_dir = ./\n\n[httpd]\nport = 0\nbind_address = 127.0.0.1\n\n[log]\nfile = initial_index.log\n">>),
file:close(Fd),
ok.
init() ->
start_server(?LOCAL_BUCKET_NAME),
couch_config:set("upr", "port", ?UPR_PORT, false),
couch_server_sup:start_link(config_files()),
ok.
get_active_partitions(UprPid) ->
{ok, Partitions} = get_stats(UprPid, <<"vbucket">>, nil),
lists:filtermap(
fun
({Partition, <<"active">>}) ->
[<<"vb">>, PartId] = binary:split(Partition, <<"_">>),
{true, list_to_integer(binary_to_list(PartId))};
(_) ->
false
end, Partitions).
main(_) ->
ok = create_config(),
ok = init(),
{ok, UprPid} = couch_upr_client:start(
?STREAM_NAME, ?BUCKET, ?BUCKET, ?PASSWORD,
?UPR_CONTROL_BUFFER_SIZE),
Partitions = get_active_partitions(UprPid),
{ok, EndSeqs} = couch_upr_client:get_sequence_numbers(UprPid, Partitions),
Flags = ?UPR_FLAG_DISKONLY bor ?UPR_FLAG_USELATEST_ENDSEQNO,
PartVersions = [{0, 0}],
Since = 0,
ChangesWrapper = fun(_Mutation, Acc) -> Acc + 1 end,
T1 = now(),
lists:zipwith(
fun(PartId, EndSeq) ->
{ok, Count, _} = couch_upr_client:enum_docs_since(
UprPid, PartId, PartVersions, Since,EndSeq,
Flags, ChangesWrapper, 0),
log("vmx: part ~p received ~p messages", [PartId, Count])
end, Partitions, EndSeqs),
log("vmx: Receiving all mutations took ~ps",
[timer:now_diff(now(), T1)/1000000]),
ok.
% From couch_set_view_test_util
start_server(SetName) ->
couch_config:start_link(config_files()),
DbDir = couch_config:get("couchdb", "database_dir"),
IndexDir = couch_config:get("couchdb", "view_index_dir"),
NewDbDir = filename:join([DbDir, binary_to_list(SetName)]),
NewIndexDir = filename:join([IndexDir, binary_to_list(SetName)]),
case file:make_dir(NewDbDir) of
ok ->
ok;
{error, eexist} ->
ok;
Error ->
throw(Error)
end,
case file:make_dir(NewIndexDir) of
ok ->
ok;
{error, eexist} ->
ok;
Error2 ->
throw(Error2)
end,
ok = couch_config:set("couchdb", "database_dir", NewDbDir, false),
ok = couch_config:set("couchdb", "view_index_dir", NewIndexDir, false).
-spec get_stats_reply(pid(), reference()) -> term().
get_stats_reply(Pid, MRef) ->
receive
{get_stats, MRef, Reply} ->
Reply;
{'DOWN', MRef, process, Pid, Reason} ->
exit({upr_client_died, Pid, Reason})
after 60000 ->
log("upr client (~p): vbucket-seqno stats timed out after ~p seconds."
" Waiting...",
[Pid, 60000 / 1000]),
get_stats_reply(Pid, MRef)
end.
-spec get_stats(pid(), binary(), non_neg_integer() | nil) -> term().
get_stats(Pid, Name, PartId) ->
MRef = erlang:monitor(process, Pid),
Pid ! {get_stats, Name, PartId, {MRef, self()}},
Reply = get_stats_reply(Pid, MRef),
erlang:demonitor(MRef, [flush]),
Reply.
log(Raw, Args) ->
Msg = io_lib:format(Raw, Args),
Time = now(),
{{Year, Month, Day}, {Hour, Minute, Second}} =
calendar:now_to_local_time(Time),
Millis = erlang:element(3, Time) div 1000,
io:format("~B-~2.10.0B-~2.10.0BT~B:~2.10.0B:~2.10.0B.~3.10.0B ~s~n",
[Year, Month, Day, Hour, Minute, Second, Millis, Msg]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment