Skip to content

Instantly share code, notes, and snippets.

{:broadway, github: "dashbitco/broadway", override: true},
{:broadway_dashboard, github: "dashbitco/broadway_dashboard", override: true},
{:broadway_cloud_pub_sub, github: "dashbitco/broadway_cloud_pub_sub", override: true},
{:goth, "~> 1.0"}
mix phx.new broadway_demo --no-html --no-ecto
#PID<0.610.0> Handling first step: %{"latitude" => 40.71922, "longitude" => -73.97506, "meter_increment" => 0.021794872, "meter_reading" => 10.962821, "passenger_count" => 1, "point_idx" => 503, "ride_id" => "732c74e0-d54c-439c-b950-340f30c201c3", "ride_status" => "enroute", "timestamp" => "2023-03-21T04:48:31.07775-04:00"}
#PID<0.624.0> Enroute Batch default
Got enroute batch: : [
%{
"latitude" => 40.71922,
"longitude" => -73.97506,
"meter_increment" => 0.021794872,
"meter_reading" => 10.962821,
"passenger_count" => 1,
"point_idx" => 503,
export GOOGLE_APPLICATION_CREDENTIALS=/Users/kris/Documents/your-gcp-pub-sub-creds-file.json
mix phx.server
children = [
# Start the Telemetry supervisor
BroadwayDemoWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: BroadwayDemo.PubSub},
# Start the Endpoint (http/https)
BroadwayDemoWeb.Endpoint,
# Start a worker by calling: BroadwayDemo.Worker.start_link(arg)
# {BroadwayDemo.Worker, arg}
{BroadwayPubSubSimple, []}
# live_dashboard "/dashboard", metrics: BroadwayDemoWeb.Telemetry
live_dashboard "/dashboard", metrics: BroadwayDemoWeb.Telemetry,
additional_pages: [
broadway: {BroadwayDashboard, pipelines: [BroadwayPubSubSimple]}
]
@default_partition 0
defp partition(msg) do
case Jason.decode(msg.data) do
{:ok, data} ->
:erlang.phash2(data["ride_id"])
_ ->
:default_partition
end
end
processors: [
# valid options are: [:concurrency, :min_demand, :max_demand, :partition_by, :spawn_opt, :hibernate_after]
default: [concurrency: 6, partition_by: &partition/1]
],
batchers: [
# valid options are: [:concurrency, :batch_size, :max_demand, :batch_timeout, :partition_by, :spawn_opt, :hibernate_after]
pickup: [
concurrency: 1,
batch_size: 5,
batch_timeout: 2_000
producer: [
module:
{BroadwayCloudPubSub.Producer,
subscription: "projects/your-account-name/subscriptions/nyc-taxi-sub"},
concurrency: 1
],
processors: [
...
],
batchers: [
def handle_batch(:enroute, messages, batch_info, _) do
IO.puts("#{inspect(self())} Enroute Batch #{batch_info.batch_key}")
list = messages |> Enum.map(fn e -> e.data end)
IO.inspect(list, label: "Got enroute batch: ")
messages
end