Skip to content

Instantly share code, notes, and snippets.

@paveltyk
Last active January 6, 2021 11:25
Show Gist options
  • Save paveltyk/6246f6c0d3a63fe73ec5fe60fd21888d to your computer and use it in GitHub Desktop.
Save paveltyk/6246f6c0d3a63fe73ec5fe60fd21888d to your computer and use it in GitHub Desktop.
defmodule GoogleStorage.Pipeline do
use Broadway
require Logger
alias Broadway.Message
def start_link(opts) do
producer_module = Keyword.fetch!(opts, :producer_module)
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [module: producer_module],
processors: [
default: [concurrency: 10]
]
)
end
def handle_message(_, %Message{} = message, _) do
with %{attributes: %{} = meta} <- message.metadata,
{:ok, data} <- Jason.decode(message.data) do
Logger.info("#{meta["eventType"]}: #{meta["objectId"]} (#{data["md5Hash"]})")
Message.update_data(message, fn _ -> data end)
else
_ ->
msg = "Failed to process PubSub message"
Logger.error(msg)
Message.failed(message, msg)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment