Skip to content

Instantly share code, notes, and snippets.

@arjan
Last active June 14, 2023 08:35
Show Gist options
  • Save arjan/6702b4b9fd6b97e9b1a355313ed1620b to your computer and use it in GitHub Desktop.
Save arjan/6702b4b9fd6b97e9b1a355313ed1620b to your computer and use it in GitHub Desktop.
Test script for phoenix_pubsub issue #175
#!/usr/bin/env elixir
Mix.install([{:phoenix_pubsub, "== 2.1.2"}, :phoenix])
start_spec = fn(spec) ->
{m,f,a} = spec.start
apply(m,f,a)
end
{:ok, _} = Application.ensure_all_started(:phoenix_pubsub)
{:ok, _} = Phoenix.PubSub.child_spec(name: MyPubSub) |> start_spec.()
defmodule MyPresence do
use Phoenix.Presence,
otp_app: :hi,
pubsub_server: MyPubSub
end
{:ok, _} = MyPresence.child_spec(name: MyPresence) |> start_spec.()
###
defmodule LocalProcessCounter do
@count __MODULE__
def init(), do: :persistent_term.put(@count, :atomics.new(1, []))
def inc(), do: :persistent_term.get(@count) |> :atomics.add(1, 1)
def dec(), do: :persistent_term.get(@count) |> :atomics.add(1, -1)
def get(), do: :persistent_term.get(@count) |> :atomics.get(1)
end
LocalProcessCounter.init()
###
defmodule ExampleServer do
use GenServer
require Logger
def init(s) do
timeout = 500 + Enum.random(0..10000)
LocalProcessCounter.inc()
#Logger.debug ">> starting: #{s}"
MyPresence.track(self(), "presence", s, %{id: s, timeout: timeout})
parent = self()
Task.start(fn ->
Process.sleep(div(timeout, 2))
MyPresence.update(parent, "presence", s, %{})
end)
{:ok, s, timeout}
end
def handle_info(:timeout, state) do
#Logger.debug " stopping: #{state}"
LocalProcessCounter.dec()
{:stop, :normal, state}
end
end
###
defmodule Spawner do
def start() do
spawn(__MODULE__, :loop, [0])
end
def loop(n) do
Process.sleep(Enum.random(0..100))
GenServer.start_link(ExampleServer, "server-#{n}")
loop(n + 1)
end
end
Spawner.start()
###
defmodule Clusterer do
require Logger
@nodes ~w(a b c d e) |> Enum.map(& :"#{&1}@127.0.0.1")
def start() do
spawn(__MODULE__, :loop, [[]])
end
def loop(prev_nodes) do
@nodes |> Enum.each(& Node.ping/1)
Process.sleep(500)
nodes = Node.list()
if nodes != prev_nodes do
Logger.warn "Node list now: #{inspect nodes}"
end
loop(nodes)
end
end
Clusterer.start()
###
defmodule Reporter do
require Logger
def loop() do
Process.sleep(500)
presence_count = MyPresence.list("presence") |> Enum.count()
node_counts = :ets.tab2list(MyPresence_shard0) |> Enum.map(& elem(&1, 0) |> elem(1) |> node()) |> Enum.group_by(& &1) |> Enum.map(fn {node, l} -> {node, length(l)} end)
atomic_count = LocalProcessCounter.get()
Logger.warn "Local processes: #{atomic_count} - Presence count: #{presence_count} - Per node counts: #{inspect node_counts}"
loop()
end
end
Reporter.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment