Skip to content

Instantly share code, notes, and snippets.

@elcritch
Last active July 17, 2019 16:40
Show Gist options
  • Save elcritch/9ab3aed211abf42e391e0caa02275d07 to your computer and use it in GitHub Desktop.
Save elcritch/9ab3aed211abf42e391e0caa02275d07 to your computer and use it in GitHub Desktop.
defmodule Cluster.Strategy.BroadcastGossip do
@moduledoc """ Here's the module I made for libcluster that uses local network broadcasting. Useful for nerves local devices. """
require Logger
use GenServer
use Cluster.Strategy
alias Cluster.Strategy.State
@default_ifname "eth0"
@default_port 45893
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
def ifname_info(ifname) do
{:ok, ifaddrs} = :inet.getifaddrs()
Map.new(ifaddrs)
|> Map.get(to_charlist(ifname), [])
end
@impl true
def init([%State{config: config} = state]) do
Process.sleep(1_000)
ifname = Keyword.get(config, :ifname, @default_ifname)
config = Keyword.merge(ifname_info(ifname), config)
ip = Keyword.get(config, :addr, {0, 0, 0, 0})
port = Keyword.get(config, :port, @default_port)
broadcast_addr = Keyword.get(config, :broadaddr, {255, 255, 255, 255})
options = [
:binary,
active: true,
ip: ip,
broadcast: true
]
{:ok, socket} = :gen_udp.open(port, options)
secret = Keyword.get(config, :secret, nil)
state = %State{state | :meta => {broadcast_addr, port, socket, secret}}
info = [ifname: ifname, ip: ip, broadcast: broadcast_addr]
Logger.info("Starting broadcast gossip: #{inspect(info)} ")
{:ok, state, 0}
end
# Send stuttered heartbeats
@impl true
def handle_info(:timeout, state), do: handle_info(:heartbeat, state)
def handle_info(:heartbeat, %State{meta: {broadcast_addr, port, socket, _}} = state) do
# Logger.debug "#{__MODULE__}:: heartbeat broadcast"
:gen_udp.send(socket, broadcast_addr, port, heartbeat(node(), state))
Process.send_after(self(), :heartbeat, :rand.uniform(5_000))
{:noreply, state}
end
# Handle received heartbeats
def handle_info(
{:udp, _socket, _ip, _port, <<"heartbeat::", _::binary>> = packet},
%State{meta: {_, _, _, secret}} = state
)
when is_nil(secret) do
handle_heartbeat(state, packet)
{:noreply, state}
end
def handle_info(
{:udp, _socket, _ip, _port, <<iv::binary-size(16)>> <> ciphertext},
%State{meta: {_, _, _, secret}} = state
)
when is_binary(secret) do
case decrypt(ciphertext, secret, iv) do
{:ok, plaintext} ->
handle_heartbeat(state, plaintext)
{:noreply, state}
_ ->
{:noreply, state}
end
end
def handle_info({:udp, _socket, _ip, _port, _}, state) do
{:noreply, state}
end
def terminate(_type, _reason, %State{meta: {_, _, socket, _}}) do
:gen_udp.close(socket)
:ok
end
# Construct iodata representing packet to send
defp heartbeat(node_name, %State{meta: {_, _, _, secret}})
when is_nil(secret) do
["heartbeat::", :erlang.term_to_binary(%{node: node_name})]
end
defp heartbeat(node_name, %State{meta: {_, _, _, secret}}) when is_binary(secret) do
message = "heartbeat::" <> :erlang.term_to_binary(%{node: node_name})
{:ok, iv, msg} = encrypt(message, secret)
[iv, msg]
end
# Upon receipt of a heartbeat, we check to see if the node
# is connected to us, and if not, we connect to it.
# If the connection fails, it's likely because the cookie
# is different, and thus a node we can ignore
@spec handle_heartbeat(State.t(), binary) :: :ok
defp handle_heartbeat(%State{} = state, <<"heartbeat::", rest::binary>> = _msg) do
self = node()
connect = state.connect
list_nodes = state.list_nodes
topology = state.topology
case :erlang.binary_to_term(rest) do
%{node: ^self} ->
:ok
%{node: :nonode@nohost} ->
# Logger.debug("#{__MODULE__}:: ignore received heartbeat from #{n}")
:ok
%{node: n} when is_atom(n) ->
# Logger.debug("#{__MODULE__}:: received heartbeat from #{n}")
Cluster.Strategy.connect_nodes(topology, connect, list_nodes, [n])
:ok
_ ->
:ok
end
end
defp handle_heartbeat(_state, _packet) do
:ok
end
defp encrypt(plaintext, password) do
iv = :crypto.strong_rand_bytes(16)
key = :crypto.hash(:sha256, password)
ciphertext = :crypto.block_encrypt(:aes_cbc256, key, iv, pkcs7_pad(plaintext))
{:ok, iv, ciphertext}
end
defp decrypt(ciphertext, password, iv) do
key = :crypto.hash(:sha256, password)
with {:unpadding, {:ok, padded}} <- {:unpadding, safe_decrypt(key, iv, ciphertext)},
{:decrypt, {:ok, _plaintext} = res} <- {:decrypt, pkcs7_unpad(padded)} do
res
else
{:unpadding, :error} -> {:error, :decrypt}
{:decrypt, :error} -> {:error, :unpadding}
end
end
defp safe_decrypt(key, iv, ciphertext) do
try do
{:ok, :crypto.block_decrypt(:aes_cbc256, key, iv, ciphertext)}
rescue
ArgumentError ->
:error
end
end
#
# Pads a message using the PKCS #7 cryptographic message syntax.
#
# from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex
#
# See: https://tools.ietf.org/html/rfc2315
# See: `pkcs7_unpad/1`
defp pkcs7_pad(message) do
bytes_remaining = rem(byte_size(message), 16)
padding_size = 16 - bytes_remaining
message <> :binary.copy(<<padding_size>>, padding_size)
end
#
# Unpads a message using the PKCS #7 cryptographic message syntax.
#
# from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex
#
# See: https://tools.ietf.org/html/rfc2315
# See: `pkcs7_pad/1`
defp pkcs7_unpad(<<>>), do: :error
defp pkcs7_unpad(message) do
padding_size = :binary.last(message)
if padding_size <= 16 do
message_size = byte_size(message)
if binary_part(message, message_size, -padding_size) ===
:binary.copy(<<padding_size>>, padding_size) do
{:ok, binary_part(message, 0, message_size - padding_size)}
else
:error
end
else
:error
end
end
end
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment