Skip to content

Instantly share code, notes, and snippets.

@0ex-d
Last active August 1, 2020 19:32
Show Gist options
  • Save 0ex-d/b77e992e23f0b8f64daab7e8a34b6c93 to your computer and use it in GitHub Desktop.
Save 0ex-d/b77e992e23f0b8f64daab7e8a34b6c93 to your computer and use it in GitHub Desktop.
Serverside push notifications done using Firebase SDK in Python
"""
A server-side implementation module for push notifications
utilizing firebase SDK
Reference https://firebase.google.com/docs/reference/admin/python/
By Precious Akin"""
from app import app
import firebase_admin
from firebase_admin import credentials, messaging
firebase_admin.initialize_app()
config = app.config
log = app.logger
class PushNotifService:
def __init__(self):
pass
def send_notification(self, _receiver: str, **opts):
"""for single notif e.g `chats, meetings`
`_receiver`: receiver's push token
`title`: title
`body`: body of push notif
`Note`: icon, sounds and badge are provided by client(web, mobile)
"""
if not 'title' in opts or not 'body' in opts:
log.warn("Provide Title and body parameters.")
return False
message = messaging.Message(
notification=messaging.Notification(
title=opts['title'],
body=opts['body'],
),
data={
'title': opts['title'],
'body': opts['body'],
},
token=_receiver,
)
_response = messaging.send(message)
return _response
def send_batch_notifications(self, _receivers: list, **opts):
"""for multi participants notif e.g `Groups`"""
message = messaging.MulticastMessage(
notification=messaging.Notification(
title=opts['title'],
body=opts['body'],
),
data={
'title': opts['title'],
'body': opts['body']
},
tokens=_receivers,
)
_response = messaging.send_multicast(message)
# check for lib errors
# in multi-recepient messaging, we need to know who got the message
# and who didn't
if _response.failure_count > 0:
responses = _response.responses
failed_tokens = []
for idx, resp in enumerate(responses):
if not resp.success:
# The order of responses corresponds to the order of the registration tokens.
failed_tokens.append(_receivers[idx])
log.warn('Failed recepient tokens: %s'%(failed_tokens))
return _response
service = PushNotifService()
def _test():
# MAX_ALLOWED => 100 device registration tokens
# This registration token comes from the client FCM SDKs.
registration_token = "YOUR_FIREBASE_REG_TOKEN"
print('Successfully sent message:', service.send_notification(
registration_token, title="Your supervisor just set a meeting !", body="A meeting for 12:30 has been set at Conference room A")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment