Skip to content

Instantly share code, notes, and snippets.

@azmeuk
Created September 1, 2016 08:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azmeuk/7884a2f983a8ad64d3d8c08f21354a83 to your computer and use it in GitHub Desktop.
Save azmeuk/7884a2f983a8ad64d3d8c08f21354a83 to your computer and use it in GitHub Desktop.
Firebase Cloud Messaging aioxmpp script
import asyncio, datetime, uuid, json, aioxmpp.xso, aioxmpp.utils, aioxmpp.node, aioxmpp.security_layer, aioxmpp.structs, aioxmpp.stanza, colorlog
class FCMPayload(aioxmpp.xso.XSO):
TAG = ("google:mobile:data", "gcm")
text = aioxmpp.xso.Text(default=None)
async def main(jid, password, recipient):
@asyncio.coroutine
def get_password(client_jid, nattempt):
return None if nattempt > 1 else password
aioxmpp.stanza.Message.fcm_payload = aioxmpp.xso.Child([FCMPayload])
client = aioxmpp.node.PresenceManagedClient(
aioxmpp.structs.JID.fromstr(jid),
aioxmpp.security_layer.tls_with_password_based_authentication(get_password),
override_peer = [(FCM_SERVER_URL, FCM_SERVER_PORT, aioxmpp.connector.XMPPOverTLSConnector())],
)
payload = FCMPayload()
payload.text = json.dumps({
"message_id": str(uuid.uuid4()),
"to": recipient,
"notification": {
"title": "Hello", "text": "World",
}
})
async with aioxmpp.node.UseConnected(client, timeout=datetime.timedelta(seconds=30)) as stream:
msg = aioxmpp.stanza.Message(type_="normal", id_="")
msg.fcm_payload = payload
await stream.send_and_wait_for_sent(msg)
asyncio.get_event_loop().run_until_complete(main(FCM_JID, FCM_API_KEY, RECIPIENT))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment