Skip to content

Instantly share code, notes, and snippets.

@imranismail
Last active January 18, 2019 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imranismail/feb1272b98ea00d658f3f5ff34431ddf to your computer and use it in GitHub Desktop.
Save imranismail/feb1272b98ea00d658f3f5ff34431ddf to your computer and use it in GitHub Desktop.
BEAM cluster formation in EC2
defmodule Cluster do
use GenServer
require Logger
import SweetXml, only: [sigil_x: 2]
def child_spec(_) do
Supervisor.Spec.worker(__MODULE__, [])
end
def start_link do
GenServer.start_link(__MODULE__, MapSet.new(), name: __MODULE__)
end
def init(connected_node_set) do
{:ok, connected_node_set, 0}
end
def handle_info(:timeout, connected_node_set) do
Logger.info(fn -> "Forming cluster with Proxy.Cluster" end)
handle_info(:load, connected_node_set)
end
def handle_info(:load, connected_node_set) do
attached_node_set = fetch_attached_node_set()
detached_node_set = MapSet.difference(connected_node_set, attached_node_set)
detached_node_set
|> MapSet.to_list()
|> disconnect_nodes()
attached_node_set
|> MapSet.difference(connected_node_set)
|> MapSet.to_list()
|> connect_nodes()
form_cluster_after(:timer.seconds(5))
{:noreply, attached_node_set}
end
defp connect_nodes([]), do: :ok
defp connect_nodes([head|tail]) do
case Node.connect(head) do
true ->
Logger.debug(fn -> "Connected to #{inspect head}" end)
:ok
reason ->
Logger.error(fn -> "Attempted to connect to node (#{inspect head}) but failed with reason: #{reason}." end)
end
connect_nodes(tail)
end
defp disconnect_nodes([]), do: :ok
defp disconnect_nodes([head|tail]) do
Logger.debug(fn -> "Disconnected from #{inspect head}" end)
disconnect_nodes(tail)
end
defp fetch_instance_id do
ExAws.InstanceMeta.request(ExAws.Config.new(:ec2), "http://169.254.169.254/latest/meta-data/instance-id")
end
defp fetch_availability_zone do
ExAws.InstanceMeta.request(ExAws.Config.new(:ec2), "http://169.254.169.254/latest/meta-data/placement/availability-zone")
end
defp fetch_region do
fetch_availability_zone()
|> String.slice(0..-2)
end
defp fetch_instance_group do
ExAws.EC2.describe_tags(
filters: [
"resource-id": fetch_instance_id(),
key: "spotinst:aws:ec2:group:name"
]
)
|> ExAws.request!(region: fetch_region())
|> Map.fetch!(:body)
|> SweetXml.xpath(~x"//DescribeTagsResponse/tagSet/item", key: ~x"./key/text()"s, value: ~x"./value/text()"s)
|> Map.fetch!(:value)
end
defp fetch_ips do
instance_group = fetch_instance_group()
ExAws.EC2.describe_instances(
filters: [
"tag:spotinst:aws:ec2:group:name": instance_group,
"instance-state-name": "running"
]
)
|> ExAws.request!(region: fetch_region())
|> Map.fetch!(:body)
|> SweetXml.xpath(~x"//DescribeInstancesResponse/reservationSet/item/instancesSet/item/privateIpAddress/text()"ls)
end
defp fetch_attached_node_set do
fetch_ips()
|> Enum.map(fn ip_address -> :"fave@#{ip_address}" end)
|> MapSet.new()
end
defp form_cluster_after(duration) do
Logger.debug(fn -> "Forming cluster in #{duration}ms" end)
Process.send_after(__MODULE__, :load, duration)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment