Skip to content

Instantly share code, notes, and snippets.

@film42
Last active October 8, 2015 03:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save film42/a4bacd261896aa97399f to your computer and use it in GitHub Desktop.
Save film42/a4bacd261896aa97399f to your computer and use it in GitHub Desktop.
defmodule Rpc do
defmodule ServiceDirectory do
defmodule Instance do
defstruct address: nil, port: nil, expires_at: nil, uuid: nil
end
##
# USAGE:
#
# Rpc.ServiceDirectory.start_link(port: 53023)
#
# Rpc.ServiceDirectory.sample("Atlas::Amigo::User")
# #=> %Instance{address: "amgio.mx.com", port: 3240, ...}
#
# Rpc.ServiceDirectory.find("Atlas::Amigo::User")
# #=> [%Instance{address: "amgio.mx.com", port: 3240, ...}, ...]
#
# Rpc.ServiceDirectory.all
# #=> {"Atlas::Amigo::User" => [...],
# "Atlas::Abacus::Transaction" => [...], ...}
#
##
# connection: Port
# services: { "Atlas::Amigo::User" => [%Instance{}] }
defstruct connection: nil, services: %{}
##
# Public Methods
#
def all do
service_directory.services
end
def find(service_name) do
services = service_directory.services
# [{address: instance.address, port: instance.port}]
services[service_name] || []
end
def sample(service_name) do
instances = find(service_name)
:lists.nth(:random.uniform(length(instances)), instances)
end
def start_link, do: start_link(port: 53000, options: default_options)
def start_link(port: port), do: start_link(port: port, options: default_options)
def start_link(options: options), do: start_link(port: 53000, options: options)
def start_link(port: port, options: options) do
Task.start_link fn ->
{:ok, connection} = connect(port, options)
service_directory = %ServiceDirectory{connection: connection}
# Start the service directory daemon
Agent.start_link(fn -> service_directory end, name: :service_directory)
# Start listening for beacons
listen_for_beacons
end
end
##
# Private Methods
#
defp add_or_update_service(beacon) do
# Update function for :service_directory
sd = service_directory
services = sd.services
service_names = beacon.server.services
services = List.foldl(service_names, services, fn(service_name, acc) ->
new_instance = create_instance_from_beacon(beacon)
instances = acc[service_name] || []
instance_exists = Enum.find(instances, fn(instance) ->
instance.uuid == new_instance.uuid
end)
unless instance_exists do
instances = instances ++ [new_instance]
end
Map.put(acc, service_name, instances)
end)
sd = %ServiceDirectory{sd | services: services}
Agent.update(:service_directory, fn(_) -> sd end)
end
defp connect(port, options) do
# Happy-path it!
# {:ok, port}
:gen_udp.open(port, options)
end
defp create_instance_from_beacon(beacon) do
server = beacon.server
%Instance{
address: server.address,
port: String.to_integer(server.port),
expires_at: time_in_n_seconds(server.ttl),
uuid: server.uuid
}
end
defp default_options do
# Constants
sol_socket = 65535
so_resuseport = 512
[
{:raw, sol_socket, so_resuseport, <<1, 0, 0, 0>>},
{:active, false},
{:reuseaddr, true},
:inet,
:binary
]
end
defp listen_for_beacons, do: listen_for_beacons(service_directory.connection)
defp listen_for_beacons(socket) do
# Prevent from getting overloaded
:inet.setopts(socket, [{:active, :once}])
# Happy path
receive do
{:udp, socket, host, port, data} ->
# IO.puts "Received beacon"
beacon = process_beacon(data)
# Remove any expired listings
remove_expired_services
# Add unless it's a flatline beacon
unless Protobuf.DynamicDiscovery.flatline?(beacon) do
add_or_update_service(beacon)
end
listen_for_beacons(socket) # Loop
after
5000 ->
# IO.puts "Timeout waiting for beacon"
remove_expired_services
listen_for_beacons(socket) # Loop
end
end
defp process_beacon(proto) do
Protobuf.DynamicDiscovery.Beacon.decode(proto)
end
defp remove_expired_instances_from_service(instances) do
current_time = Timex.Time.now
Enum.filter(instances, fn(instance) ->
# Keep those that are less than ttl
instance.expires_at >= current_time
end)
end
defp remove_expired_services do
sd = service_directory
services = sd.services
services_names = Map.keys(services)
# Generate new service list
services = Enum.reduce(services_names, %{}, fn(service_name, acc) ->
instances = services[service_name]
instances = remove_expired_instances_from_service(instances)
# Add the service if there are valid instances, otherwise remove service
unless Enum.empty?(instances) do
Map.put(acc, service_name, instances)
else
acc
end
end)
# Save changes to service directory
sd = %ServiceDirectory{sd | services: services}
Agent.update(:service_directory, fn(_) -> sd end)
end
defp service_directory do
Agent.get(:service_directory, fn(sd) -> sd end)
end
defp time_in_n_seconds(n) do
{a, seconds, c} = Timex.Time.now
{a, seconds + n, c}
end
end
end
defmodule Protobuf.DynamicDiscovery do
use Protobuf, from: Path.expand("../../protobuf/dynamic_discovery.proto", __DIR__)
def heartbeat?(beacon) do
beacon.beacon_type == :HEARTBEAT
end
def flatline?(beacon) do
beacon.beacon_type == :FLATLINE
end
end
// Copyright (c) 2013 MoneyDesktop, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// Authors: Devin Christensen
//
// Protobufs needed for dynamic discovery zmq server and client.
package dynamicDiscovery;
enum BeaconType {
HEARTBEAT = 0;
FLATLINE = 1;
}
message Server {
optional string uuid = 1;
optional string address = 2;
optional string port = 3;
optional int32 ttl = 4;
repeated string services = 5;
}
message Beacon {
optional BeaconType beacon_type = 1;
optional Server server = 2;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment