Skip to content

Instantly share code, notes, and snippets.

@dvic
Created March 29, 2023 16:49
Show Gist options
  • Save dvic/a2177efd0d95fc6ff755b2650928cbca to your computer and use it in GitHub Desktop.
Save dvic/a2177efd0d95fc6ff755b2650928cbca to your computer and use it in GitHub Desktop.
Script demonstrating spear read/write issue
# Setup
Mix.install([
{:jason, "~> 1.4"},
{:spear, "~> 1.3"}
])
connection_string = "esdb://localhost:2113"
stream_name = "tha-stream"
separate_clients = false
# The symptoms that caused us to investigate this issue is a slowdown in the
# system, taking 10 seconds. These 10 seconds are, we believe, related to the
# time EventStore waits until it retries sending a message.
#
# Generally speaking 128 kB quickly causes issues, while 64 kB tends to work
# most of the time. The HTTP2 window size is 64 kB. We don't know if this is
# relevant or not.
#
# Also, when you have `num_events` rather high, you always trigger the slowdown.
# One might start to believe that the writes fill up some kind of buffer, so an
# acknowledgement of a received event (also a kind of write) might end up very
# far down the buffer, hence it not getting sent. This seems not to be the case,
# because if you keep the number of sent events very low, it sometimes happens
# as well. So the problem is *not* necessarily some kind of buffer somewhere
# that is filling up such that the Ack is delayed.
event_bytes = 1024 * 128
num_events = 25
defmodule Server do
use GenServer
def init(opts) do
# Fetch from opts
client_pid = Keyword.fetch!(opts, :client)
stream_name = Keyword.fetch!(opts, :stream_name)
# Create persistent subscription and subscribe
filter = Spear.Filter.exclude_system_events()
settings = %Spear.PersistentSubscription.Settings{
max_subscriber_count: 1,
message_timeout: 10_000,
resolve_links?: true
}
case Spear.create_persistent_subscription(client_pid, stream_name, "hammertime", settings,
filter: filter
) do
:ok -> :ok
{:error, %Spear.Grpc.Response{status: :already_exists}} -> :ok
end
{:ok, subscription} =
Spear.connect_to_persistent_subscription(client_pid, self(), stream_name, "hammertime")
# Return initial state
{:ok,
%{
client_pid: client_pid,
subscription: subscription,
count: 0,
ids: MapSet.new(),
start_time: System.os_time(:millisecond)
}}
end
def handle_info(%Spear.Event{} = message, state) do
# Do some formatting on the event to print a fancy message
count_string =
state.count
|> Integer.to_string()
|> String.pad_leading(8)
content_string =
(is_map(message.body) and Map.has_key?(message.body, "data") &&
String.length(message.body["data"])) || 0
seen_string = (MapSet.member?(state.ids, message.id) && "OLD") || "NEW"
IO.puts(
"[Serv] #{seen_string} #{count_string} :: Acking event #{message.id} of size #{content_string} ... "
)
# Acknowledge and update state
Spear.ack(state.client_pid, state.subscription, [message.id])
{:noreply, %{state | count: state.count + 1, ids: MapSet.put(state.ids, message.id)}}
end
def handle_info(%Spear.Filter.Checkpoint{} = message, state) do
IO.puts(
"[Serv] Checkpoint, commit = #{message.commit_position}, prepare = #{message.prepare_position}"
)
{:noreply, state}
end
def handle_call(:shutdown, _from, state) do
delta = System.os_time(:millisecond) - state.start_time
IO.puts(
"[Serv] Shutting down, got #{state.count} messages in #{delta} ms, so #{delta / state.count} ms per message"
)
{:stop, :normal, :shutdown, state}
end
end
# Start client(s). Change "separate_clients" at the top to switch between one
# client and two clients
{:ok, write_client_pid} = Spear.Connection.start_link(connection_string: connection_string)
read_client_pid =
if separate_clients do
{:ok, read_client_pid} = Spear.Connection.start_link(connection_string: connection_string)
read_client_pid
else
write_client_pid
end
# Start GenServer that listens to events
{:ok, pid} =
GenServer.start(Server, [client: read_client_pid, stream_name: stream_name], id: Server)
IO.puts("[Sys] Started GenServer on #{inspect(pid)}")
# Write a lot of events
write_start = System.os_time(:millisecond)
for _ <- 0..(num_events - 1) do
event = Spear.Event.new("grpc-client", %{"data" => String.duplicate("!", event_bytes)})
IO.puts("[Sys] Sending message #{event.id}")
:ok = Spear.append([event], write_client_pid, stream_name)
end
write_delta = System.os_time(:millisecond) - write_start
IO.puts(
"[Sys] Writing #{num_events} took #{write_delta} ms, so #{write_delta / num_events} ms per event"
)
# Wait for a line of input, then shut down
IO.puts("[Sys] Press any key to exit")
IO.read(:line)
GenServer.call(pid, :shutdown)
IO.puts("[Sys] Script finished :)")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment