Skip to content

Instantly share code, notes, and snippets.

def handle_failed(messages, _context) do
IO.puts("Messages in failed stage: #{inspect(messages)}")
Enum.map(messages, fn %{status: {:failed, "invalid-data"}} = message ->
IO.puts("ACK invalid message and log error: #{inspect(message.data)}")
Broadway.Message.configure_ack(message, on_failure: :ack)
message -> message
end)
end
def handle_message(_, %Message{data: %{event: {:error, err}}} = message, _) do
IO.puts("#{inspect(self())} Handling parsing error: #{inspect(err)}")
Broadway.Message.failed(message,"invalid-data")
end
def handle_message(_, %Message{data: %{event: {:ok, taxidata}}} = message, _) do
IO.puts("#{inspect(self())} Handling first step: #{inspect(taxidata)}")
message = Broadway.Message.update_data(message, fn _data -> taxidata end)
case taxidata["ride_status"] do
"enroute" ->
message
|> Broadway.Message.put_batcher(:enroute)
"pickup" ->
message
|> Broadway.Message.put_batcher(:pickup)
def prepare_messages(messages, _context) do
IO.puts("Messages in prepare stage: #{inspect(messages)}")
messages = Enum.map(messages, fn message ->
Broadway.Message.update_data(message, fn data ->
%{event: Jason.decode(data)}
end)
end)
messages
end
%{
"latitude" => 40.71922,
"longitude" => -73.97506,
"meter_increment" => 0.021794872,
"meter_reading" => 10.962821,
"passenger_count" => 1,
"point_idx" => 503,
"ride_id" => "732c74e0-d54c-439c-b950-340f30c201c3",
"ride_status" => "enroute",
"timestamp" => "2023-03-21T04:48:31.07775-04:00"
defmodule MyBroadway do
# behaviour impl, aliases and imports ...
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
# producer configuration
],
processors: [
defmodule MyBroadway do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
# producer configuration
plug OauthTutorial.Plugs.RequireAuth when action in [:index, :new, :create, :show, :edit, :update, :delete]
defmodule OauthTutorial.Plugs.RequireAuth do
import Plug.Conn
import Phoenix.Controller
alias OauthTutorialWeb.Router.Helpers, as: Routes
def init(_params) do
end