Created
December 11, 2018 19:40
-
-
Save noahsark769/66a1c49e7d1396e4db1c687f116a5b65 to your computer and use it in GitHub Desktop.
APNSService for calling APNS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Apple Push Notification Service Library. | |
Documentation is available on the iOS Developer Library: | |
https://developer.apple.com/library/ios/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/Chapters/ApplePushService.html | |
This is from django_push_notifications, but I cleaned it up and edited it to | |
work with custom settings. | |
""" | |
import json | |
import ssl | |
import struct | |
import socket | |
import time | |
from contextlib import closing | |
from binascii import unhexlify | |
from django.core.exceptions import ImproperlyConfigured | |
from django.conf import settings | |
_APNS_PORT = 2195 | |
_APNS_FEEDBACK_PORT = 2196 | |
_PROD_APNS_HOST = "gateway.push.apple.com" | |
_PROD_APNS_FEEDBACK_HOST = "feedback.push.apple.com" | |
_DEV_APNS_HOST = "gateway.sandbox.push.apple.com" | |
_DEV_APNS_FEEDBACK_HOST = "feedback.sandbox.push.apple.com" | |
def _host_feedback_host(debug): | |
if debug: | |
return _DEV_APNS_HOST, _DEV_APNS_FEEDBACK_HOST | |
else: | |
return _PROD_APNS_HOST, _PROD_APNS_FEEDBACK_HOST | |
APNS_ERROR_MESSAGES = { | |
1: "Processing error", | |
2: "Missing device token", | |
3: "Missing topic", | |
4: "Missing payload", | |
5: "Invalid token size", | |
6: "Invalid topic size", | |
7: "Invalid payload size", | |
8: "Invalid token", | |
10: "Shutdown", | |
128: "Protocol error (APNS could not parse notification)", | |
255: "Unknown APNS error", | |
} | |
class NotificationError(Exception): | |
"""Generic notifications exception.""" | |
pass | |
class APNSError(NotificationError): | |
"""Generic APNS error.""" | |
pass | |
class APNSServerError(APNSError): | |
"""Error from APNS server.""" | |
def __init__(self, status, identifier): | |
"""Init.""" | |
super(APNSServerError, self).__init__(status, identifier) | |
self.status = status | |
self.identifier = identifier | |
class APNSDataOverflow(APNSError): | |
"""Error representing overflow in APNS message.""" | |
pass | |
def _check_certificate(ss): | |
mode = "start" | |
for s in ss.split("\n"): | |
if mode == "start": | |
if "BEGIN RSA PRIVATE KEY" in s or "BEGIN PRIVATE KEY" in s: | |
mode = "key" | |
elif mode == "key": | |
if "END RSA PRIVATE KEY" in s or "END PRIVATE KEY" in s: | |
mode = "end" | |
break | |
elif s.startswith("Proc-Type") and "ENCRYPTED" in s: | |
raise ImproperlyConfigured( | |
"Encrypted APNS private keys are not supported" | |
) | |
if mode != "end": | |
raise ImproperlyConfigured( | |
"The APNS certificate doesn't contain a private key" | |
) | |
def _apns_create_socket(address_tuple, certfile, ca_certs=None): | |
if ca_certs is None: | |
ca_certs = [] | |
try: | |
with open(certfile, "r") as f: | |
content = f.read() | |
except Exception as e: | |
raise ImproperlyConfigured( | |
"The APNS certificate file at %r is not readable: %s" % (certfile, e) # noqa | |
) | |
_check_certificate(content) | |
sock = socket.socket() | |
sock = ssl.wrap_socket( | |
sock, | |
ssl_version=ssl.PROTOCOL_TLSv1, | |
certfile=certfile, | |
ca_certs=ca_certs | |
) | |
sock.connect(address_tuple) | |
return sock | |
def _apns_create_socket_to_push(debug, certfile): | |
host, _ = _host_feedback_host(debug) | |
return _apns_create_socket((host, _APNS_PORT), certfile) | |
def _apns_pack_frame(token_hex, payload, identifier, expiration, priority): | |
token = unhexlify(token_hex) | |
# |COMMAND|FRAME-LEN|{token}|{payload}|{id:4}|{expiration:4}|{priority:1} | |
# 5 items, each 3 bytes prefix, then each item length | |
frame_len = 3 * 5 + len(token) + len(payload) + 4 + 4 + 1 | |
frame_fmt = "!BIBH%ssBH%ssBHIBHIBHB" % (len(token), len(payload)) | |
frame = struct.pack( | |
frame_fmt, | |
2, frame_len, | |
1, len(token), token, | |
2, len(payload), payload, | |
3, 4, identifier, | |
4, 4, expiration, | |
5, 1, priority | |
) | |
return frame | |
def _apns_check_errors(sock): | |
timeout = getattr(settings, "APNS_ERROR_TIMEOUT", None) | |
if timeout is None: | |
return # assume everything went fine! | |
saved_timeout = sock.gettimeout() | |
try: | |
sock.settimeout(timeout) | |
data = sock.recv(6) | |
if data: | |
command, status, identifier = struct.unpack("!BBI", data) | |
# apple protocol says command is always 8. See http://goo.gl/ENUjXg | |
assert command == 8, "Command must be 8!" | |
if status != 0: | |
raise APNSServerError(status, identifier) | |
except socket.timeout: # py3, see http://bugs.python.org/issue10272 | |
pass | |
except ssl.SSLError as e: # py2 | |
if "timed out" not in e.message: | |
raise | |
finally: | |
sock.settimeout(saved_timeout) | |
def _apns_send( | |
token, alert, debug, certfile, badge=None, sound=None, category=None, | |
url=None, content_available=False, action_loc_key=None, loc_key=None, | |
loc_args=[], extra={}, identifier=0, expiration=None, priority=10, | |
socket=None, mutable_content=False | |
): | |
data = {} | |
aps_data = {} | |
if action_loc_key or loc_key or loc_args: | |
alert = {"body": alert} if alert else {} | |
if action_loc_key: | |
alert["action-loc-key"] = action_loc_key | |
if loc_key: | |
alert["loc-key"] = loc_key | |
if loc_args: | |
alert["loc-args"] = loc_args | |
if alert is not None: | |
aps_data["alert"] = alert | |
if badge is not None: | |
if callable(badge): | |
badge = badge(token) | |
aps_data["badge"] = badge | |
if sound is not None: | |
aps_data["sound"] = sound | |
if url is not None: | |
aps_data["url"] = url | |
if category is not None: | |
aps_data["category"] = category | |
if content_available: | |
aps_data["content-available"] = 1 | |
if mutable_content: | |
aps_data["mutable-content"] = 1 | |
data["aps"] = aps_data | |
data.update(extra) | |
# convert to json, avoiding unnecessary whitespace with separators (keys | |
# sorted for tests) | |
json_data = json.dumps( | |
data, | |
separators=(",", ":"), | |
sort_keys=True | |
).encode("utf-8") | |
max_size = getattr(settings, "APNS_MAX_NOTIFICATION_SIZE", 2048) | |
if len(json_data) > max_size: | |
raise APNSDataOverflow( | |
"Notification body cannot exceed %i bytes" % (max_size) | |
) | |
# if expiration isn't specified use 1 month from now | |
expiration_time = expiration if expiration is not None else int(time.time()) + 2592000 # noqa | |
frame = _apns_pack_frame( | |
token, | |
json_data, | |
identifier, | |
expiration_time, | |
priority | |
) | |
if socket: | |
socket.write(frame) | |
else: | |
with closing( | |
_apns_create_socket_to_push(debug=debug, certfile=certfile) | |
) as socket: | |
socket.write(frame) | |
_apns_check_errors(socket) | |
return token | |
def _apns_read_and_unpack(socket, data_format): | |
length = struct.calcsize(data_format) | |
data = socket.recv(length) | |
if data: | |
return struct.unpack_from(data_format, data, 0) | |
else: | |
return None | |
def _apns_receive_feedback(socket): | |
expired_token_list = [] | |
# read a timestamp (4 bytes) and device token length (2 bytes) | |
header_format = "!LH" | |
has_data = True | |
while has_data: | |
try: | |
# read the header tuple | |
header_data = _apns_read_and_unpack(socket, header_format) | |
if header_data is not None: | |
timestamp, token_length = header_data | |
# Unpack format for a single value of length bytes | |
token_format = "%ss" % token_length | |
device_token = _apns_read_and_unpack(socket, token_format) | |
if device_token is not None: | |
# _apns_read_and_unpack returns a tuple, but | |
# it's just one item, so get the first. | |
expired_token_list.append((timestamp, device_token[0])) | |
else: | |
has_data = False | |
except socket.timeout: # py3, see http://bugs.python.org/issue10272 | |
pass | |
except ssl.SSLError as e: # py2 | |
if "timed out" not in e.message: | |
raise | |
return expired_token_list | |
class APNSService(object): | |
"""Service object for APNS.""" | |
def send_message(self, registration_id, certfile, alert, debug=False, **kwargs): # noqa | |
"""Send an APNS notification to a single registration_id. | |
This will send the notification as form data. | |
If sending multiple notifications, it is more efficient to use | |
apns_send_bulk_message() | |
Note that if set alert should always be a string. If it is not set, | |
it won't be included in the notification. You will need to pass None | |
to this for silent notifications. | |
""" | |
return _apns_send(registration_id, alert, certfile=certfile, debug=debug, **kwargs) # noqa | |
apns_service = APNSService() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment