Created
March 29, 2023 16:49
-
-
Save dvic/a2177efd0d95fc6ff755b2650928cbca to your computer and use it in GitHub Desktop.
Script demonstrating spear read/write issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Setup | |
Mix.install([ | |
{:jason, "~> 1.4"}, | |
{:spear, "~> 1.3"} | |
]) | |
connection_string = "esdb://localhost:2113" | |
stream_name = "tha-stream" | |
separate_clients = false | |
# The symptoms that caused us to investigate this issue is a slowdown in the | |
# system, taking 10 seconds. These 10 seconds are, we believe, related to the | |
# time EventStore waits until it retries sending a message. | |
# | |
# Generally speaking 128 kB quickly causes issues, while 64 kB tends to work | |
# most of the time. The HTTP2 window size is 64 kB. We don't know if this is | |
# relevant or not. | |
# | |
# Also, when you have `num_events` rather high, you always trigger the slowdown. | |
# One might start to believe that the writes fill up some kind of buffer, so an | |
# acknowledgement of a received event (also a kind of write) might end up very | |
# far down the buffer, hence it not getting sent. This seems not to be the case, | |
# because if you keep the number of sent events very low, it sometimes happens | |
# as well. So the problem is *not* necessarily some kind of buffer somewhere | |
# that is filling up such that the Ack is delayed. | |
event_bytes = 1024 * 128 | |
num_events = 25 | |
defmodule Server do | |
use GenServer | |
def init(opts) do | |
# Fetch from opts | |
client_pid = Keyword.fetch!(opts, :client) | |
stream_name = Keyword.fetch!(opts, :stream_name) | |
# Create persistent subscription and subscribe | |
filter = Spear.Filter.exclude_system_events() | |
settings = %Spear.PersistentSubscription.Settings{ | |
max_subscriber_count: 1, | |
message_timeout: 10_000, | |
resolve_links?: true | |
} | |
case Spear.create_persistent_subscription(client_pid, stream_name, "hammertime", settings, | |
filter: filter | |
) do | |
:ok -> :ok | |
{:error, %Spear.Grpc.Response{status: :already_exists}} -> :ok | |
end | |
{:ok, subscription} = | |
Spear.connect_to_persistent_subscription(client_pid, self(), stream_name, "hammertime") | |
# Return initial state | |
{:ok, | |
%{ | |
client_pid: client_pid, | |
subscription: subscription, | |
count: 0, | |
ids: MapSet.new(), | |
start_time: System.os_time(:millisecond) | |
}} | |
end | |
def handle_info(%Spear.Event{} = message, state) do | |
# Do some formatting on the event to print a fancy message | |
count_string = | |
state.count | |
|> Integer.to_string() | |
|> String.pad_leading(8) | |
content_string = | |
(is_map(message.body) and Map.has_key?(message.body, "data") && | |
String.length(message.body["data"])) || 0 | |
seen_string = (MapSet.member?(state.ids, message.id) && "OLD") || "NEW" | |
IO.puts( | |
"[Serv] #{seen_string} #{count_string} :: Acking event #{message.id} of size #{content_string} ... " | |
) | |
# Acknowledge and update state | |
Spear.ack(state.client_pid, state.subscription, [message.id]) | |
{:noreply, %{state | count: state.count + 1, ids: MapSet.put(state.ids, message.id)}} | |
end | |
def handle_info(%Spear.Filter.Checkpoint{} = message, state) do | |
IO.puts( | |
"[Serv] Checkpoint, commit = #{message.commit_position}, prepare = #{message.prepare_position}" | |
) | |
{:noreply, state} | |
end | |
def handle_call(:shutdown, _from, state) do | |
delta = System.os_time(:millisecond) - state.start_time | |
IO.puts( | |
"[Serv] Shutting down, got #{state.count} messages in #{delta} ms, so #{delta / state.count} ms per message" | |
) | |
{:stop, :normal, :shutdown, state} | |
end | |
end | |
# Start client(s). Change "separate_clients" at the top to switch between one | |
# client and two clients | |
{:ok, write_client_pid} = Spear.Connection.start_link(connection_string: connection_string) | |
read_client_pid = | |
if separate_clients do | |
{:ok, read_client_pid} = Spear.Connection.start_link(connection_string: connection_string) | |
read_client_pid | |
else | |
write_client_pid | |
end | |
# Start GenServer that listens to events | |
{:ok, pid} = | |
GenServer.start(Server, [client: read_client_pid, stream_name: stream_name], id: Server) | |
IO.puts("[Sys] Started GenServer on #{inspect(pid)}") | |
# Write a lot of events | |
write_start = System.os_time(:millisecond) | |
for _ <- 0..(num_events - 1) do | |
event = Spear.Event.new("grpc-client", %{"data" => String.duplicate("!", event_bytes)}) | |
IO.puts("[Sys] Sending message #{event.id}") | |
:ok = Spear.append([event], write_client_pid, stream_name) | |
end | |
write_delta = System.os_time(:millisecond) - write_start | |
IO.puts( | |
"[Sys] Writing #{num_events} took #{write_delta} ms, so #{write_delta / num_events} ms per event" | |
) | |
# Wait for a line of input, then shut down | |
IO.puts("[Sys] Press any key to exit") | |
IO.read(:line) | |
GenServer.call(pid, :shutdown) | |
IO.puts("[Sys] Script finished :)") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment