Skip to content

Instantly share code, notes, and snippets.

@tilgovi
Created August 13, 2010 01:30
Show Gist options
  • Save tilgovi/522084 to your computer and use it in GitHub Desktop.
Save tilgovi/522084 to your computer and use it in GitHub Desktop.
From becdc414824a787c267e608689bf4cfc8e608be2 Mon Sep 17 00:00:00 2001
From: Randall Leeds <randall.leeds@gmail.com>
Date: Thu, 12 Aug 2010 18:22:31 -0700
Subject: [PATCH] a reader queue makes couch_file fair to writers
---
src/couchdb/couch_db_updater.erl | 1 +
src/couchdb/couch_file.erl | 32 ++++++++++++++++++++++++++++++--
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 19a4c16..76d982e 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -21,6 +21,7 @@
init({MainPid, DbName, Filepath, Fd, Options}) ->
process_flag(trap_exit, true),
+ put(io_priority, high),
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 0a89171..057aeb8 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -16,6 +16,7 @@
-include("couch_db.hrl").
-define(SIZE_BLOCK, 4096).
+-define(pid_to_ioqueue(P), list_to_atom(pid_to_list(P))).
-record(file, {
fd,
@@ -85,12 +86,12 @@ append_term_md5(Fd, Term) ->
append_binary(Fd, Bin) ->
Size = iolist_size(Bin),
- gen_server:call(Fd, {append_bin,
+ gen_server:call(get_ioqueue(Fd), {append_bin,
[<<0:1/integer,Size:31/integer>>, Bin]}, infinity).
append_binary_md5(Fd, Bin) ->
Size = iolist_size(Bin),
- gen_server:call(Fd, {append_bin,
+ gen_server:call(get_ioqueue(Fd), {append_bin,
[<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity).
@@ -237,6 +238,8 @@ init_status_error(ReturnPid, Ref, Error) ->
init({Filepath, Options, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
+ IOQueueFun = apply(fun(Fd) -> fun() -> ioqueue_loop(Fd) end end, [self()]),
+ set_ioqueue(self(), spawn_link(IOQueueFun)),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -586,3 +589,28 @@ split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
end;
split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
+
+ioqueue_loop(Fd) ->
+ receive
+ {'$gen_call', From, Data}=Msg ->
+ gen_server:reply(From, gen_server:call(Fd, Data));
+ Msg ->
+ exit({unexpected_message, Msg})
+ end,
+ ioqueue_loop(Fd).
+
+get_ioqueue(Fd) ->
+ case get(io_priority) of
+ high ->
+ Fd;
+ _ ->
+ try
+ [{ioqueue, IOQueue}] = ets:lookup(?pid_to_ioqueue(Fd), ioqueue),
+ IOQueue
+ catch _:_ -> undefined
+ end
+ end.
+
+set_ioqueue(Fd, IOQueue) ->
+ Tid = ets:new(?pid_to_ioqueue(Fd), [named_table]),
+ ets:insert(Tid, {ioqueue, IOQueue}).
--
1.7.0.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment