Last active
July 17, 2019 16:40
-
-
Save elcritch/9ab3aed211abf42e391e0caa02275d07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Cluster.Strategy.BroadcastGossip do | |
@moduledoc """ Here's the module I made for libcluster that uses local network broadcasting. Useful for nerves local devices. """ | |
require Logger | |
use GenServer | |
use Cluster.Strategy | |
alias Cluster.Strategy.State | |
@default_ifname "eth0" | |
@default_port 45893 | |
def start_link(args) do | |
GenServer.start_link(__MODULE__, args, name: __MODULE__) | |
end | |
def ifname_info(ifname) do | |
{:ok, ifaddrs} = :inet.getifaddrs() | |
Map.new(ifaddrs) | |
|> Map.get(to_charlist(ifname), []) | |
end | |
@impl true | |
def init([%State{config: config} = state]) do | |
Process.sleep(1_000) | |
ifname = Keyword.get(config, :ifname, @default_ifname) | |
config = Keyword.merge(ifname_info(ifname), config) | |
ip = Keyword.get(config, :addr, {0, 0, 0, 0}) | |
port = Keyword.get(config, :port, @default_port) | |
broadcast_addr = Keyword.get(config, :broadaddr, {255, 255, 255, 255}) | |
options = [ | |
:binary, | |
active: true, | |
ip: ip, | |
broadcast: true | |
] | |
{:ok, socket} = :gen_udp.open(port, options) | |
secret = Keyword.get(config, :secret, nil) | |
state = %State{state | :meta => {broadcast_addr, port, socket, secret}} | |
info = [ifname: ifname, ip: ip, broadcast: broadcast_addr] | |
Logger.info("Starting broadcast gossip: #{inspect(info)} ") | |
{:ok, state, 0} | |
end | |
# Send stuttered heartbeats | |
@impl true | |
def handle_info(:timeout, state), do: handle_info(:heartbeat, state) | |
def handle_info(:heartbeat, %State{meta: {broadcast_addr, port, socket, _}} = state) do | |
# Logger.debug "#{__MODULE__}:: heartbeat broadcast" | |
:gen_udp.send(socket, broadcast_addr, port, heartbeat(node(), state)) | |
Process.send_after(self(), :heartbeat, :rand.uniform(5_000)) | |
{:noreply, state} | |
end | |
# Handle received heartbeats | |
def handle_info( | |
{:udp, _socket, _ip, _port, <<"heartbeat::", _::binary>> = packet}, | |
%State{meta: {_, _, _, secret}} = state | |
) | |
when is_nil(secret) do | |
handle_heartbeat(state, packet) | |
{:noreply, state} | |
end | |
def handle_info( | |
{:udp, _socket, _ip, _port, <<iv::binary-size(16)>> <> ciphertext}, | |
%State{meta: {_, _, _, secret}} = state | |
) | |
when is_binary(secret) do | |
case decrypt(ciphertext, secret, iv) do | |
{:ok, plaintext} -> | |
handle_heartbeat(state, plaintext) | |
{:noreply, state} | |
_ -> | |
{:noreply, state} | |
end | |
end | |
def handle_info({:udp, _socket, _ip, _port, _}, state) do | |
{:noreply, state} | |
end | |
def terminate(_type, _reason, %State{meta: {_, _, socket, _}}) do | |
:gen_udp.close(socket) | |
:ok | |
end | |
# Construct iodata representing packet to send | |
defp heartbeat(node_name, %State{meta: {_, _, _, secret}}) | |
when is_nil(secret) do | |
["heartbeat::", :erlang.term_to_binary(%{node: node_name})] | |
end | |
defp heartbeat(node_name, %State{meta: {_, _, _, secret}}) when is_binary(secret) do | |
message = "heartbeat::" <> :erlang.term_to_binary(%{node: node_name}) | |
{:ok, iv, msg} = encrypt(message, secret) | |
[iv, msg] | |
end | |
# Upon receipt of a heartbeat, we check to see if the node | |
# is connected to us, and if not, we connect to it. | |
# If the connection fails, it's likely because the cookie | |
# is different, and thus a node we can ignore | |
@spec handle_heartbeat(State.t(), binary) :: :ok | |
defp handle_heartbeat(%State{} = state, <<"heartbeat::", rest::binary>> = _msg) do | |
self = node() | |
connect = state.connect | |
list_nodes = state.list_nodes | |
topology = state.topology | |
case :erlang.binary_to_term(rest) do | |
%{node: ^self} -> | |
:ok | |
%{node: :nonode@nohost} -> | |
# Logger.debug("#{__MODULE__}:: ignore received heartbeat from #{n}") | |
:ok | |
%{node: n} when is_atom(n) -> | |
# Logger.debug("#{__MODULE__}:: received heartbeat from #{n}") | |
Cluster.Strategy.connect_nodes(topology, connect, list_nodes, [n]) | |
:ok | |
_ -> | |
:ok | |
end | |
end | |
defp handle_heartbeat(_state, _packet) do | |
:ok | |
end | |
defp encrypt(plaintext, password) do | |
iv = :crypto.strong_rand_bytes(16) | |
key = :crypto.hash(:sha256, password) | |
ciphertext = :crypto.block_encrypt(:aes_cbc256, key, iv, pkcs7_pad(plaintext)) | |
{:ok, iv, ciphertext} | |
end | |
defp decrypt(ciphertext, password, iv) do | |
key = :crypto.hash(:sha256, password) | |
with {:unpadding, {:ok, padded}} <- {:unpadding, safe_decrypt(key, iv, ciphertext)}, | |
{:decrypt, {:ok, _plaintext} = res} <- {:decrypt, pkcs7_unpad(padded)} do | |
res | |
else | |
{:unpadding, :error} -> {:error, :decrypt} | |
{:decrypt, :error} -> {:error, :unpadding} | |
end | |
end | |
defp safe_decrypt(key, iv, ciphertext) do | |
try do | |
{:ok, :crypto.block_decrypt(:aes_cbc256, key, iv, ciphertext)} | |
rescue | |
ArgumentError -> | |
:error | |
end | |
end | |
# | |
# Pads a message using the PKCS #7 cryptographic message syntax. | |
# | |
# from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex | |
# | |
# See: https://tools.ietf.org/html/rfc2315 | |
# See: `pkcs7_unpad/1` | |
defp pkcs7_pad(message) do | |
bytes_remaining = rem(byte_size(message), 16) | |
padding_size = 16 - bytes_remaining | |
message <> :binary.copy(<<padding_size>>, padding_size) | |
end | |
# | |
# Unpads a message using the PKCS #7 cryptographic message syntax. | |
# | |
# from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex | |
# | |
# See: https://tools.ietf.org/html/rfc2315 | |
# See: `pkcs7_pad/1` | |
defp pkcs7_unpad(<<>>), do: :error | |
defp pkcs7_unpad(message) do | |
padding_size = :binary.last(message) | |
if padding_size <= 16 do | |
message_size = byte_size(message) | |
if binary_part(message, message_size, -padding_size) === | |
:binary.copy(<<padding_size>>, padding_size) do | |
{:ok, binary_part(message, 0, message_size - padding_size)} | |
else | |
:error | |
end | |
else | |
:error | |
end | |
end | |
end | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment