Skip to content

Instantly share code, notes, and snippets.

@TylerPachal
Last active January 6, 2022 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TylerPachal/744311b685cf59d3fd2621c7eacf10d4 to your computer and use it in GitHub Desktop.
Save TylerPachal/744311b685cf59d3fd2621c7eacf10d4 to your computer and use it in GitHub Desktop.
@doc """
Assert that a Kafka contains a message on a given topic.
Example usage:
assert_kafka_contains "my_topic", fn message_body ->
Jason.decode!(message_body)["name"] == "tyler"
end
"""
defmacro assert_kafka_contains(topic, opts \\ [], assert_body_fn) do
endpoints = opts[:endpoints] || [localhost: 9092]
offset_lookback = opts[:offset_lookback] || 5
quote do
{:ok, %{topic_metadata: [%{partition_metadata: partition_metadata}]}} = :brod.get_metadata(unquote(endpoints), [unquote(topic)])
# Get all of the messages across all of the partitions with respect to the offset_lookback
messages =
Stream.flat_map(partition_metadata, fn %{partition: partition} ->
{:ok, latest_offset} = :brod.resolve_offset(unquote(endpoints), unquote(topic), partition)
fetch_offset = max(0, latest_offset - unquote(offset_lookback))
{:ok, {_offset, messages}} = :brod.fetch(unquote(endpoints), unquote(topic), partition, fetch_offset)
messages
end)
# See if any of the messages matched
result =
Enum.any?(messages, fn message ->
{:kafka_message, _partition, _key, body, :create, _timestamp, _state} = message
unquote(assert_body_fn).(body)
end)
# Create assertion result
if result do
assert true
else
flunk "no matching kafka message received"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment