Last active
August 1, 2020 19:32
-
-
Save 0ex-d/b77e992e23f0b8f64daab7e8a34b6c93 to your computer and use it in GitHub Desktop.
Serverside push notifications done using Firebase SDK in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A server-side implementation module for push notifications | |
utilizing firebase SDK | |
Reference https://firebase.google.com/docs/reference/admin/python/ | |
By Precious Akin""" | |
from app import app | |
import firebase_admin | |
from firebase_admin import credentials, messaging | |
firebase_admin.initialize_app() | |
config = app.config | |
log = app.logger | |
class PushNotifService: | |
def __init__(self): | |
pass | |
def send_notification(self, _receiver: str, **opts): | |
"""for single notif e.g `chats, meetings` | |
`_receiver`: receiver's push token | |
`title`: title | |
`body`: body of push notif | |
`Note`: icon, sounds and badge are provided by client(web, mobile) | |
""" | |
if not 'title' in opts or not 'body' in opts: | |
log.warn("Provide Title and body parameters.") | |
return False | |
message = messaging.Message( | |
notification=messaging.Notification( | |
title=opts['title'], | |
body=opts['body'], | |
), | |
data={ | |
'title': opts['title'], | |
'body': opts['body'], | |
}, | |
token=_receiver, | |
) | |
_response = messaging.send(message) | |
return _response | |
def send_batch_notifications(self, _receivers: list, **opts): | |
"""for multi participants notif e.g `Groups`""" | |
message = messaging.MulticastMessage( | |
notification=messaging.Notification( | |
title=opts['title'], | |
body=opts['body'], | |
), | |
data={ | |
'title': opts['title'], | |
'body': opts['body'] | |
}, | |
tokens=_receivers, | |
) | |
_response = messaging.send_multicast(message) | |
# check for lib errors | |
# in multi-recepient messaging, we need to know who got the message | |
# and who didn't | |
if _response.failure_count > 0: | |
responses = _response.responses | |
failed_tokens = [] | |
for idx, resp in enumerate(responses): | |
if not resp.success: | |
# The order of responses corresponds to the order of the registration tokens. | |
failed_tokens.append(_receivers[idx]) | |
log.warn('Failed recepient tokens: %s'%(failed_tokens)) | |
return _response | |
service = PushNotifService() | |
def _test(): | |
# MAX_ALLOWED => 100 device registration tokens | |
# This registration token comes from the client FCM SDKs. | |
registration_token = "YOUR_FIREBASE_REG_TOKEN" | |
print('Successfully sent message:', service.send_notification( | |
registration_token, title="Your supervisor just set a meeting !", body="A meeting for 12:30 has been set at Conference room A") | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment