Skip to content

Instantly share code, notes, and snippets.

@vishnevskiy
Last active June 28, 2023 10:01
Show Gist options
  • Save vishnevskiy/312d6f3180ff0122b9fb to your computer and use it in GitHub Desktop.
Save vishnevskiy/312d6f3180ff0122b9fb to your computer and use it in GitHub Desktop.
defmodule Snowflake do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
worker(:riak_core_vnode_master, [Snowflake.VNode]),
]
case Supervisor.start_link(children, strategy: :one_for_one) do
{:ok, pid} ->
# HACK: riak_core assume APP_sup for the name of the supervisor.
Process.register(pid, :snowflake_sup)
:ok = :riak_core.register(:snowflake, [vnode_module: Snowflake.VNode])
:ok = :riak_core_ring_events.add_guarded_handler(Snowflake.RingEventHandler, [])
:ok = :riak_core_node_watcher_events.add_guarded_handler(Snowflake.NodeEventHandler, [])
:ok = :riak_core_node_watcher.service_up(:snowflake, pid)
{:ok, pid}
{:error, reason} ->
{:error, reason}
end
end
def next_id, do: command(:next_id)
def ping, do: command(:ping)
defp command(cmd) do
doc_idx = :riak_core_util.chash_key({Atom.to_string(cmd), :erlang.term_to_binary(:erlang.now())})
pref_list = :riak_core_apl.get_primary_apl(doc_idx, 1, :snowflake)
[{index_node, _type}] = pref_list
# HACK: riak_core automatically adds _master for the VNode master name.
:riak_core_vnode_master.sync_spawn_command(index_node, cmd, Snowflake.VNode_master)
end
end
defmodule Snowflake.Console do
def join(node_str) do
try do
case :riak_core.join(node_str) do
:ok ->
IO.puts "Sent join request to #{node_str}"
:ok
{:error, :not_reachable} ->
IO.puts "Node #{node_str} is not reachable!"
:error
{:error, :different_ring_sizes} ->
IO.puts "Failed! #{node_str} has a different ring_creation_size"
:error
end
catch
reason ->
IO.puts "Join failed #{reason}"
:error
end
end
def leave, do: :riak_core.leave()
def remove(node), do: remove_node(:erlang.list_to_atom(node))
def remove_node(node) when is_atom(node) do
# TODO: copy properly
:riak_core.remove_from_cluster(node)
end
def ringready do
:riak_core_status.ringready()
end
end
defmodule Snowflake.RingEventHandler do
use GenEvent
def handle_event({:ring_update, _ring}, state) do
{:ok, state}
end
end
defmodule Snowflake.NodeEventHandler do
use GenEvent
def handle_event({:service_update, _services}, state) do
{:ok, state}
end
end
defmodule Snowflake.VNode do
@behaviour :riak_core_vnode
def start_vnode(i) do
:riak_core_vnode_master.get_vnode_pid(i, __MODULE__)
end
def init([index]) do
ts = :erlang.now()
# This could get ugly if you expect them to be unique across data
# centers, or if you have more than 1024 partitions
<<machine_id :: size(10), _rest :: bits>> = <<index :: size(160)>>
{:ok, {index, machine_id, 0, ts}}
end
def handle_command(:next_id, sender, {index, machine_id, seq, ts}=state) do
case get_next_seq(ts, seq) do
:backwards_clock ->
{:reply, {:fail, :backwards_clock}, state}
:exhausted ->
# Retry after a millisecond
:erlang.sleep(1)
handle_command(:next_id, sender, state)
{:ok, new_ts, new_seq} ->
{:reply, construct_id(new_ts, machine_id, new_seq), {index, machine_id, new_seq, new_ts}}
end
end
def handle_command(:ping, _sender, state) do
{:reply, :pong, state}
end
def handle_command(message, _sender, state) do
{:noreply, state}
end
def handle_exit(_pid, reason, state) do
{:stop, reason, state}
end
def handle_handoff_command({:riak_core_fold_req_v2, _foldfun, acc0, _forwardable, _opts}, _sender, state) do
{:reply, acc0, state}
end
def handle_handoff_command(_message, _sender, state) do
{:forward, state}
end
def handoff_starting(_target_node, state) do
{true, state}
end
def handoff_cancelled(state) do
{:ok, state}
end
def handoff_finished(_target_node, state) do
{:ok, state}
end
def handle_handoff_data(_data, state) do
{:reply, :ok, state}
end
def encode_handoff_item(_object_name, _object_value) do
""
end
def is_empty(state) do
{false, state}
end
def delete(state) do
{:ok, state}
end
def handle_coverage(_req, _key_spaces, _sender, state) do
{:stop, :not_implemented, state}
end
def terminate(_reason, _state) do
:ok
end
defp get_next_seq({megas, secs, micros} = ts, seq) do
now = :erlang.now()
{now_megas, now_secs, now_micros} = now
cond do
# Time is essentially equal at the millisecond
megas == now_megas and secs == now_secs and div(micros, 1000) == div(now_micros, 1000) ->
case rem(seq + 1, 4096) do
0 -> :exhausted
new_seq -> {:ok, now, new_seq}
end
# Woops, clock was moved backwards by NTP
now < ts ->
:backwards_clock
# New millisecond
true ->
{:ok, now, 0}
end
end
@twitter_epoch 1142974214000
defp construct_id({megas, secs, micros}, machine_id, seq) do
combined = div(((megas * 1000000 + secs) * 1000000 + micros), 1000) - @twitter_epoch
<<integer :: [size(64), unsigned, integer]>> = <<
combined :: [size(42), unsigned, integer],
machine_id :: size(10),
seq :: [size(12), integer, unsigned]>>
integer
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment