Skip to content

Instantly share code, notes, and snippets.

Created Sep 1, 2016
What would you like to do?
Firebase Cloud Messaging aioxmpp script
import asyncio, datetime, uuid, json, aioxmpp.xso, aioxmpp.utils, aioxmpp.node, aioxmpp.security_layer, aioxmpp.structs, aioxmpp.stanza, colorlog
class FCMPayload(aioxmpp.xso.XSO):
TAG = ("google:mobile:data", "gcm")
text = aioxmpp.xso.Text(default=None)
async def main(jid, password, recipient):
def get_password(client_jid, nattempt):
return None if nattempt > 1 else password
aioxmpp.stanza.Message.fcm_payload = aioxmpp.xso.Child([FCMPayload])
client = aioxmpp.node.PresenceManagedClient(
override_peer = [(FCM_SERVER_URL, FCM_SERVER_PORT, aioxmpp.connector.XMPPOverTLSConnector())],
payload = FCMPayload()
payload.text = json.dumps({
"message_id": str(uuid.uuid4()),
"to": recipient,
"notification": {
"title": "Hello", "text": "World",
async with aioxmpp.node.UseConnected(client, timeout=datetime.timedelta(seconds=30)) as stream:
msg = aioxmpp.stanza.Message(type_="normal", id_="")
msg.fcm_payload = payload
await stream.send_and_wait_for_sent(msg)
asyncio.get_event_loop().run_until_complete(main(FCM_JID, FCM_API_KEY, RECIPIENT))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment