Skip to content

Instantly share code, notes, and snippets.

@themusicman
Created January 3, 2024 00:34
Show Gist options
  • Save themusicman/33e32ab58c27e7145938feb50a9863eb to your computer and use it in GitHub Desktop.
Save themusicman/33e32ab58c27e7145938feb50a9863eb to your computer and use it in GitHub Desktop.
Broadway logs
[debug] Elixir.ER.Destinations.Manager.Server.start_destination_pipeline(%ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "a23e555e-460d-4e32-9eb6-13da92897a22", name: "s3_users", offset: nil, ordered: false, destination_type: :s3, paused: false, config: %{"s3_bucket" => "eventrelay-dev", "s3_region" => "us-east-2"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "X1KFbu42hJ2GOf6jcbKrRHIBGWjJ0djHMiffeD4X3fr5cWsxYP_bH7cF6wr464vs", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-02 21:14:01Z], updated_at: ~U[2024-01-02 21:14:01Z]} starting pipeline. pipeline=ER.Destinations.Pipeline.S3
[debug] Elixir.ER.Destinations.Pipeline.S3.child_spec with child_spec=%{id: "Elixir.ER.Destinations.Pipeline.S3:base:destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22", shutdown: :infinity, start: {ER.Destinations.Pipeline.S3, :start_link, [[destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "a23e555e-460d-4e32-9eb6-13da92897a22", name: "s3_users", offset: nil, ordered: false, destination_type: :s3, paused: false, config: %{"s3_bucket" => "eventrelay-dev", "s3_region" => "us-east-2"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "X1KFbu42hJ2GOf6jcbKrRHIBGWjJ0djHMiffeD4X3fr5cWsxYP_bH7cF6wr464vs", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-02 21:14:01Z], updated_at: ~U[2024-01-02 21:14:01Z]}]]}}
[debug] Elixir.ER.Destinations.Pipeline.S3.start_link opts=[destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "a23e555e-460d-4e32-9eb6-13da92897a22", name: "s3_users", offset: nil, ordered: false, destination_type: :s3, paused: false, config: %{"s3_bucket" => "eventrelay-dev", "s3_region" => "us-east-2"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "X1KFbu42hJ2GOf6jcbKrRHIBGWjJ0djHMiffeD4X3fr5cWsxYP_bH7cF6wr464vs", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-02 21:14:01Z], updated_at: ~U[2024-01-02 21:14:01Z]}] and broadway_config=%ER.Destinations.Pipeline.BroadwayConfig{processor_concurrency: 10, batcher_concurrency: 1, batch_size: 50, batch_timeout: 1000, name: {:via, Registry, {ER.Registry, "destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}, destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "a23e555e-460d-4e32-9eb6-13da92897a22", name: "s3_users", offset: nil, ordered: false, destination_type: :s3, paused: false, config: %{"s3_bucket" => "eventrelay-dev", "s3_region" => "us-east-2"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "X1KFbu42hJ2GOf6jcbKrRHIBGWjJ0djHMiffeD4X3fr5cWsxYP_bH7cF6wr464vs", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-02 21:14:01Z], updated_at: ~U[2024-01-02 21:14:01Z]}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Terminator.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Producer_0.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_0.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_1.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_2.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_4.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_5.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_6.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_7.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_8.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default_9.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "RateLimiter.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "ProducerSupervisor.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "ProcessorSupervisor.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Batcher_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "BatchProcessor_s3_0.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "BatchProcessorSupervisor_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "BatcherSupervisor_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "BatchersSupervisor.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Supervisor.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] -starting supervior-------------------------------
[debug] Elixir.ER.Destinations.Pipeline.Client.receive_messages(20, %{client: ER.Destinations.Pipeline.Client, demand: 20, receive_timer: nil, force_interval: false, receive_interval: 5000, ack_ref: {:via, Registry, {ER.Registry, "destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}, client_options: [destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "a23e555e-460d-4e32-9eb6-13da92897a22", name: "s3_users", offset: nil, ordered: false, destination_type: :s3, paused: false, config: %{"s3_bucket" => "eventrelay-dev", "s3_region" => "us-east-2"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "X1KFbu42hJ2GOf6jcbKrRHIBGWjJ0djHMiffeD4X3fr5cWsxYP_bH7cF6wr464vs", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-02 21:14:01Z], updated_at: ~U[2024-01-02 21:14:01Z]}]}
[debug] Elixir.ER.Destinations.QueuedEvents.Server.factory("a23e555e-460d-4e32-9eb6-13da92897a22", %{}) with module=ER.Destinations.QueuedEvents.Server and name="queued_events:a23e555e-460d-4e32-9eb6-13da92897a22"
[debug] -result-------------------> {:ok, #PID<0.849.0>} <-----------
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Producer.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Processor_default.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Batcher_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "BatchProcessor_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Producer_0.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.process_name with name={:via, Registry, {ER.Registry, "Batcher_s3.Broadway.destination:pipeline:s3:a23e555e-460d-4e32-9eb6-13da92897a22"}}
[debug] Elixir.ER.Destinations.Pipeline.S3.start_link with result={:ok, #PID<0.848.0>}
[debug] Elixir.ER.Destinations.QueuedEvents.Server.via_tuple("queued_events:a23e555e-460d-4e32-9eb6-13da92897a22")
[debug] Elixir.ER.Destinations.Manager.Server.start_destination_pipeline with result={:ok, #PID<0.848.0>}
[debug] Elixir.ER.Destinations.QueuedEvents.Server.via_tuple("queued_events:a23e555e-460d-4e32-9eb6-13da92897a22")
[info] Elixir.ER.Destinations.QueuedEvents.Server.start_link here: starting {:via, Registry, {ER.Registry, "queued_events:a23e555e-460d-4e32-9eb6-13da92897a22"}} on node=:nonode@nohost
[debug] Elixir.ER.Destinations.QueuedEvents.Server.factory("a23e555e-460d-4e32-9eb6-13da92897a22", %{}) with result={:ok, #PID<0.869.0>}
[debug] Elixir.ER.Destinations.QueuedEvents.Server.via_tuple("queued_events:a23e555e-460d-4e32-9eb6-13da92897a22")
[debug] QUERY OK source="destinations" db=0.3ms queue=0.4ms idle=1087.5ms
SELECT d0."id", d0."name", d0."offset", d0."ordered", d0."destination_type", d0."paused", d0."config", d0."topic_identifier", d0."group_key", d0."signing_secret", d0."query", d0."topic_name", d0."inserted_at", d0."updated_at" FROM "destinations" AS d0 WHERE (d0."id" = $1) ["a23e555e-460d-4e32-9eb6-13da92897a22"]
↳ ER.Destinations.QueuedEvents.Server.handle_continue/2, at: lib/event_relay/destinations/queued_events/server.ex:11
[debug] Queued Events server started for destination="a23e555e-460d-4e32-9eb6-13da92897a22"
[debug] QUERY OK source="users_events" db=0.4ms queue=0.7ms idle=1090.9ms
SELECT u0."id", u0."errors", u0."context", u0."data", u0."name", u0."topic_identifier", u0."user_key", u0."anonymous_key", u0."occurred_at", u0."offset", u0."source", u0."verified", u0."group_key", u0."reference_key", u0."trace_key", u0."destination_locks", u0."data_schema", u0."prev_id", u0."topic_name", u0."inserted_at", u0."updated_at" FROM "users_events" AS u0 WHERE (u0."topic_name" = $1) AND (NOT (u0."occurred_at" IS NULL)) AND (NOT ($2 = ANY(u0."destination_locks"))) ORDER BY u0."offset" LIMIT $3 ["users", "a23e555e-460d-4e32-9eb6-13da92897a22", 20]
↳ ER.Destinations.QueuedEvents.Server.handle_call/3, at: lib/event_relay/destinations/queued_events/server.ex:42
[debug] Elixir.ER.Destinations.Manager.Server.start_destination_pipeline(%ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "9a98d548-4113-43ff-97ff-40733657c0b5", name: "topic_users", offset: nil, ordered: false, destination_type: :topic, paused: false, config: %{"topic_name" => "actions"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "wJ5J2ArRBnlqCKALP_1hbstWFC0Ec00e_ApRCWimQcSeiRyTrelEfvoef5fF5Nqu", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-03 00:32:12Z], updated_at: ~U[2024-01-03 00:32:12Z]} starting pipeline. pipeline=ER.Destinations.Pipeline.Topic
[debug] Elixir.ER.Destinations.Pipeline.Topic.child_spec with child_spec=%{id: "Elixir.ER.Destinations.Pipeline.Topic:base:destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5", shutdown: :infinity, start: {ER.Destinations.Pipeline.Topic, :start_link, [[destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "9a98d548-4113-43ff-97ff-40733657c0b5", name: "topic_users", offset: nil, ordered: false, destination_type: :topic, paused: false, config: %{"topic_name" => "actions"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "wJ5J2ArRBnlqCKALP_1hbstWFC0Ec00e_ApRCWimQcSeiRyTrelEfvoef5fF5Nqu", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-03 00:32:12Z], updated_at: ~U[2024-01-03 00:32:12Z]}]]}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.start_link opts=[destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "9a98d548-4113-43ff-97ff-40733657c0b5", name: "topic_users", offset: nil, ordered: false, destination_type: :topic, paused: false, config: %{"topic_name" => "actions"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "wJ5J2ArRBnlqCKALP_1hbstWFC0Ec00e_ApRCWimQcSeiRyTrelEfvoef5fF5Nqu", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-03 00:32:12Z], updated_at: ~U[2024-01-03 00:32:12Z]}] and broadway_config=%ER.Destinations.Pipeline.BroadwayConfig{processor_concurrency: 10, batcher_concurrency: 1, batch_size: 50, batch_timeout: 1000, name: {:via, Registry, {ER.Registry, "destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}, destination: %ER.Destinations.Destination{__meta__: #Ecto.Schema.Metadata<:loaded, "destinations">, id: "9a98d548-4113-43ff-97ff-40733657c0b5", name: "topic_users", offset: nil, ordered: false, destination_type: :topic, paused: false, config: %{"topic_name" => "actions"}, config_json: nil, topic_identifier: nil, group_key: nil, signing_secret: "wJ5J2ArRBnlqCKALP_1hbstWFC0Ec00e_ApRCWimQcSeiRyTrelEfvoef5fF5Nqu", query: nil, topic_name: "users", topic: #Ecto.Association.NotLoaded<association :topic is not loaded>, inserted_at: ~U[2024-01-03 00:32:12Z], updated_at: ~U[2024-01-03 00:32:12Z]}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Terminator.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Producer_0.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_0.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_1.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_2.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_3.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_4.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_5.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_6.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_7.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_8.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Processor_default_9.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "RateLimiter.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "ProducerSupervisor.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "ProcessorSupervisor.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] Elixir.ER.Destinations.Pipeline.Topic.process_name with name={:via, Registry, {ER.Registry, "Supervisor.Broadway.destination:pipeline:topic:9a98d548-4113-43ff-97ff-40733657c0b5"}}
[debug] -starting supervior-------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment