Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@vjache
Last active August 29, 2015 14:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vjache/895329369b2da1054ac2 to your computer and use it in GitHub Desktop.
Save vjache/895329369b2da1054ac2 to your computer and use it in GitHub Desktop.
Deadly simple Shared Ring Buffer in Erlang.
%% Shared Ring Buffer
-module(srb).
-export([new/2, push/2, pop/2, pop/1]).
-compile(export_all).
-record(counters,
{ clock :: non_neg_integer(),
max_size :: non_neg_integer() }).
new(Id, MaxSize) ->
ets:new(Id, [named_table, ordered_set, public, {write_concurrency, true}]),
ets:insert(Id, #counters{ clock = 0, max_size = MaxSize}),
Id.
push(Id, Object) ->
[Clock, MaxSize] =
ets:update_counter(
Id, 'counters', [{#counters.clock,1}, {#counters.max_size,0}]),
ets:insert(Id, {Clock, Object, 0}),
ets:delete(Id, Clock - MaxSize),
ok.
pop(Id) ->
case pop(Id, 1) of
[] -> empty;
[Obj] -> {ok, Obj}
end.
pop(_Id, 0) ->
[];
pop(Id, N) ->
%% Take an oldest entry
case ets:first(Id) of
'counters' ->
[];
%% No data at the SRB
'$end_of_table' ->
[];
%% Data exist
Key ->
%% 1. Try lock entry
try ets:update_counter(Id, Key, [{3,1}]) of
%% 1.2. Lock success
[1] ->
%% 1.2.1 Try read entry
case ets:lookup(Id, Key) of
%% 1.2.1.1 Data concurrently removed try again to pop data
[] ->
pop(Id, N);
%% 1.2.1.2 Data successfuly read, remove it and proceed pop
[{Key,Obj,_}] ->
ets:delete(Id,Key),
[Obj|pop(Id, N - 1)]
end;
%% 1.2.2 Already locked by concurrent readers
[L] when L > 1 ->
pop(Id, N)
catch
%% 1.2.3 Data concurrently removed try again to pop data
_:badarg ->
pop(Id, N)
end
end.
%%
%% Tests
%%
producer_test() ->
Id = new(srb_pt, 100),
spawn_link(
fun()->
producer_loop(Id, 0, 1)
end).
many_producers_test() ->
Id = new(srb_mpt, 100),
[spawn_link(
fun()->
producer_loop(Id, 0, 1)
end) || _ <- [1,2,3,4] ].
consumer_test() ->
Id = new(srb_ct, 100),
spawn_link(
fun()->
consumer_loop(Id, 1)
end).
concurrent_test() ->
Id = new(srb_cnc, 100),
spawn_link(
fun()->
producer_loop(Id, 0, 1)
end),
spawn_link(
fun()->
consumer_loop(Id,1)
end).
m2m_concurrent_test() ->
Id = new(srb_cnc, 100),
[spawn_link(
fun()->
producer_loop(Id, 0, 1)
end) || _ <- [1,2,3,4]],
[spawn_link(
fun()->
consumer_loop(Id,1)
end) || _ <- [1,2,3,4]].
producer_loop(Id, N, Delay) ->
Obj = {self(), N},
push(Id, Obj),
io:format("~p | Pushed: ~p~n", [self(), Obj]),
timer:sleep(Delay),
producer_loop(Id, N + 1, Delay).
consumer_loop(Id, Delay) ->
Obj = pop(Id),
io:format("~p | Poped: ~p~n", [self(), Obj]),
timer:sleep(Delay),
consumer_loop(Id, Delay).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment