Skip to content

Instantly share code, notes, and snippets.

@redink
Last active March 29, 2017 08:54
Show Gist options
  • Save redink/6411d0084e89a62a01a2de0104cbb5ef to your computer and use it in GitHub Desktop.
Save redink/6411d0084e89a62a01a2de0104cbb5ef to your computer and use it in GitHub Desktop.
%%%-------------------------------------------------------------------
%%% @author redink
%%% @copyright (C) , redink
%%% @doc
%%%
%%% @end
%%% Created : by redink
%%%-------------------------------------------------------------------
-module(rcursor).
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([ new_group_cursor/2
, write_msg/2
, read_msg/2
, batch_read_msg/3
, ack_msg/3
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(HIBERNATE_TIMEOUT, 10000).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
new_group_cursor(GroupID, MsgNumLimit) ->
gen_server:call(?SERVER, {new_group_cursor, GroupID, MsgNumLimit}).
write_msg(GroupID, MsgID) ->
case ets:lookup(?SERVER, GroupID) of
[] ->
{error, group_not_found};
[{GroupID, _, MsgIDTable, MsgNumLimit}] ->
ets:insert(MsgIDTable, {MsgID}),
case ets:info(MsgIDTable, size) > MsgNumLimit of
true ->
ets:delete(MsgIDTable, ets:first(MsgIDTable));
_ ->
ignore
end,
{ok, ets:info(MsgIDTable, size)}
end.
read_msg(GroupID, UserID) ->
case ets:lookup(?SERVER, GroupID) of
[] ->
{error, group_not_found};
[{_, UserCursorTable, MsgIDTable, _}] ->
case read_msg(UserCursorTable, UserID, MsgIDTable) of
{ok, '$end_of_table'} ->
{error, empty};
Any ->
Any
end
end.
batch_read_msg(GroupID, UserID, Num) ->
case ets:lookup(?SERVER, GroupID) of
[] ->
{error, group_not_found};
[{_, UserCursorTable, MsgIDTable, _}] ->
case read_msg(UserCursorTable, UserID, MsgIDTable) of
{ok, '$end_of_table'} ->
[];
{ok, CurrentMsgID} ->
batch_read_msg(MsgIDTable, Num - 1,
CurrentMsgID, [CurrentMsgID])
end
end.
ack_msg(GroupID, UserID, MsgID) ->
case ets:lookup(?SERVER, GroupID) of
[] ->
{error, group_not_found};
[{_, UserCursorTable, MsgIDTable, _}] ->
ack_msg(MsgIDTable, MsgID, UserCursorTable, UserID)
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
ets:new(?SERVER, [named_table, set, public]),
{ok, #state{}, ?HIBERNATE_TIMEOUT}.
%%--------------------------------------------------------------------
handle_call({new_group_cursor, GroupID, MsgNumLimit}, _From, State) ->
Res =
case ets:lookup(?SERVER, GroupID) of
[] ->
new_group_cursor_do(GroupID, MsgNumLimit),
ok;
_ ->
already_exist
end,
{reply, Res, State, ?HIBERNATE_TIMEOUT};
handle_call(_Request, _From, State) ->
{reply, ok, State, ?HIBERNATE_TIMEOUT}.
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State, ?HIBERNATE_TIMEOUT}.
%%--------------------------------------------------------------------
handle_info(timeout, State) ->
proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]),
{noreply, State, ?HIBERNATE_TIMEOUT};
handle_info(_Info, State) ->
{noreply, State, ?HIBERNATE_TIMEOUT}.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
read_msg(UserCursorTable, UserID, MsgIDTable) ->
case ets:lookup(UserCursorTable, UserID) of
[] ->
{ok, ets_first(MsgIDTable)};
[{UserID, UserCursorIndex}] ->
{ok, ets_next(MsgIDTable, UserCursorIndex)}
end.
batch_read_msg(_MsgIDTable, 0, _, Res) ->
lists:reverse(Res);
batch_read_msg(MsgIDTable, Num, Key, Res) ->
case ets_next(MsgIDTable, Key) of
'$end_of_table' ->
lists:reverse(Res);
NewKey ->
batch_read_msg(MsgIDTable, Num - 1, NewKey, [NewKey | Res])
end.
ack_msg(MsgIDTable, MsgID, UserCursorTable, UserID) ->
case ets:lookup(MsgIDTable, MsgID) of
[] ->
{error, msgid_not_found};
_ ->
{ok, ets:insert(UserCursorTable, {UserID, MsgID})}
end.
new_group_cursor_do(GroupID, MsgNumLimit) ->
UserCursorTable = ets:new(user_cursor, [set, public]),
MsgIDTable = ets:new(msgid, [ordered_set, public]),
ets:insert(?SERVER, {GroupID, UserCursorTable, MsgIDTable, MsgNumLimit}),
ok.
-ifdef(ASCENDING).
ets_first(Table) ->
ets:first(Table).
ets_next(Table, Key) ->
ets:next(Table, Key).
-else.
ets_first(Table) ->
ets:last(Table).
ets_next(Table, Key) ->
ets:prev(Table, Key).
-endif.
@redink
Copy link
Author

redink commented Mar 29, 2017

one group include:

  • user cursor table
    用于存储 {groupmember, msgidindex} 映射关系,groupmember 为group 群成员,msgidindex 为该成员最后ack 的消息ID
  • msgid table
    用于存储msgid,假设msgid 是有时序性的
---------------------------------------------------------------------------------------------------
|  1   |  2   |  3   |  4   |  5   |  6   |  7   |  8   |      |      |      |      |      |  n   |
---------------------------------------------------------------------------------------------------
   |                                                |       
   |                                                |
   |                                                |
user1 acked                                      userx acked  

所以,对于群离线消息的存储:

  • 群消息只需要保存一份
  • 群成员只需要保存各自ack 的msgid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment