Last active
June 5, 2020 15:30
-
-
Save tgodfrey/1a67753d51cb202ca8eb04b933cec924 to your computer and use it in GitHub Desktop.
Elixir Module for working with MessageBus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.Redix do | |
@pool_size 5 | |
# How long, in seconds, to keep messages in the backlog | |
@max_backlog_age 604800 | |
# How many messages may be kept in the global backlog | |
@max_global_backlog_size 2000 | |
# How many messages may be kep in the per-channel backlog | |
@max_backlog_size 1000 | |
@global_id_key "__mb_global_id_n" | |
@global_backlog_key "__mb_global_backlog_n" | |
# Lua script to publish new messages on Redis. This script comes from | |
# the MessageBus source: | |
# https://github.com/SamSaffron/message_bus/blob/master/lib/message_bus/backends/redis.rb | |
@lua_publish """ | |
local start_payload = ARGV[1] | |
local max_backlog_age = ARGV[2] | |
local max_backlog_size = tonumber(ARGV[3]) | |
local max_global_backlog_size = tonumber(ARGV[4]) | |
local channel = ARGV[5] | |
local global_id_key = KEYS[1] | |
local backlog_id_key = KEYS[2] | |
local backlog_key = KEYS[3] | |
local global_backlog_key = KEYS[4] | |
local redis_channel_name = KEYS[5] | |
local global_id = redis.call("INCR", global_id_key) | |
local backlog_id = redis.call("INCR", backlog_id_key) | |
local payload = string.format("%i|%i|%s", global_id, backlog_id, start_payload) | |
local global_backlog_message = string.format("%i|%s", backlog_id, channel) | |
redis.call("ZADD", backlog_key, backlog_id, payload) | |
redis.call("EXPIRE", backlog_key, max_backlog_age) | |
redis.call("ZADD", global_backlog_key, global_id, global_backlog_message) | |
redis.call("EXPIRE", global_backlog_key, max_backlog_age) | |
redis.call("PUBLISH", redis_channel_name, payload) | |
redis.call("EXPIRE", backlog_id_key, max_backlog_age) | |
if backlog_id > max_backlog_size then | |
redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size) | |
end | |
if global_id > max_global_backlog_size then | |
redis.call("ZREMRANGEBYSCORE", global_backlog_key, 1, global_id - max_global_backlog_size) | |
end | |
return backlog_id | |
""" | |
def child_spec(_args) do | |
# Specs for the Redix connections. | |
children = | |
for i <- 0..(@pool_size - 1) do | |
Supervisor.child_spec({Redix, name: :"redix_#{i}"}, id: {Redix, i}) | |
end | |
%{ | |
id: RedixSupervisor, | |
type: :supervisor, | |
start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]} | |
} | |
end | |
def command(command) do | |
Redix.command(:"redix_#{random_index()}", command) | |
end | |
defp random_index() do | |
rem(System.unique_integer([:positive]), @pool_size) | |
end | |
def publish(channel, message) do | |
# MessageBus requires the message to be the value for the attribute "data" | |
data = Jason.encode!(%{data: message}) | |
# Comments below are the corresponding variable names in the Lua script | |
publish_args = [ | |
encode_without_ids(channel, data), # start_payload | |
@max_backlog_age, # max_backlog_age | |
@max_backlog_size, # max_backlog_size | |
@max_global_backlog_size, # max_global_backlog_size | |
channel # channel | |
] | |
publish_keys = [ | |
@global_id_key, # global_id_key | |
backlog_id_key(channel), # backlog_id_key | |
backlog_key(channel), # backlog_key | |
@global_backlog_key, # global_backlog_key | |
redis_channel_name() # redis_channel_name | |
] | |
cached_eval(@lua_publish, publish_keys, publish_args) | |
end | |
defp encode_without_ids(channel, data) do | |
String.replace(channel, "|", "$$123$$") <> "|" <> data | |
end | |
defp backlog_id_key(channel) do | |
"__mb_backlog_id_n_#{channel}" | |
end | |
defp backlog_key(channel) do | |
"__mb_backlog_n_#{channel}" | |
end | |
defp redis_channel_name() do | |
db = Application.get_env(:my_app, MyAppWeb.Endpoint)[:redis_db] || 0 | |
"_message_bus_#{db}" | |
end | |
# Redis has two commands for executing a Lua script, EVAL and EVALSHA. When a | |
# script is executed on a Redis server, it store the script, and uses the SHA1 | |
# of the script as its key. This means that if you know what the hash is, | |
# you can execute the script, without actually having the text of the script. The | |
# following function copies the technique in MessageBus, where the function first | |
# attempts to use EVALSHA to execute the script by passing in the SHA1 for the | |
# script. If that fails with a NOSCRIPT error, the script is passed instead. | |
defp cached_eval(script, keys, args) do | |
script_sha1 = :crypto.hash(:sha, @lua_publish) | |
# Commands are send to Redis as a list of strings, one each for the command, | |
# and each argument or key or any other value which is to be passed to that | |
# command. List.flatten/1 is used to eliminate the nested lists, but | |
# preserve their values. | |
evalsha_cmd = List.flatten(["EVALSHA", script_sha1, 5, keys, args]) | |
eval_cmd = List.flatten(["EVAL", script, 5, keys, args]) | |
case MyApp.Redix.command(evalsha_cmd) do | |
{:ok, backlog_id} -> {:ok, backlog_id} | |
{ | |
:error, | |
%Redix.Error{message: "NOSCRIPT No matching script. Please use EVAL."} | |
} | |
-> MyApp.Redix.command(eval_cmd) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment