Skip to content

Instantly share code, notes, and snippets.

@jhowarth
Last active January 4, 2019 12:34
Show Gist options
  • Save jhowarth/4e8c37ad61c197e68d2cb456ac28797d to your computer and use it in GitHub Desktop.
Save jhowarth/4e8c37ad61c197e68d2cb456ac28797d to your computer and use it in GitHub Desktop.
defmodule GCM.Pusher do
use GenStage
# The maximum number of requests Firebase allows at once per XMPP connection
@max_demand 100
defstruct [
:producer,
:producer_from,
:fcm_conn_pid,
:pending_requests,
]
def start_link(producer, fcm_conn_pid, opts \\ []) do
GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts)
end
def init({producer, fcm_conn_pid}) do
state = %__MODULE__{
next_id: 1,
pending_requests: Map.new,
producer: producer,
fcm_conn_pid: fcm_conn_pid,
}
send(self, :init)
# Run as consumer
{:consumer, state}
end
def handle_info(:init, %{producer: producer}=state) do
# Subscribe to the Push Collector
GenStage.async_subscribe(self, to: producer, cancel: :temporary)
{:noreply, [], state}
end
def handle_subscribe(:producer, _opts, from, state) do
# Start demanding requests now that we are subscribed
GenStage.ask(from, @max_demand)
{:manual, %{state | producer_from: from}}
end
def handle_events(push_requests, _from, state) do
# We got some push requests from the Push Collector.
# Let’s send them.
state = Enum.reduce(push_requests, state, &do_send/2)
{:noreply, [], state}
end
# Send the message to FCM, track as a pending request
defp do_send(push_request, %{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state) do
{message_id, state} = generate_id(state)
xml = PushRequest.to_xml(push_request, message_id)
:ok = FCM.Connection.send(fcm_conn_pid, xml)
pending_requests = Map.put(pending_requests, message_id, push_request)
%{state | pending_requests: pending_requests}
end
# FCM response handling
defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do
{push_request, pending_requests} = Map.pop(pending_requests, message_id)
# Since we finished a request, ask the Push Collector for more.
GenStage.ask(producer_from, 1)
%{state | pending_requests: pending_requests}
end
defp generate_id(%{next_id: next_id}=state) do
{to_string(next_id), %{state | next_id: next_id + 1}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment