Skip to content

Instantly share code, notes, and snippets.

@noahsark769
Created December 11, 2018 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noahsark769/66a1c49e7d1396e4db1c687f116a5b65 to your computer and use it in GitHub Desktop.
Save noahsark769/66a1c49e7d1396e4db1c687f116a5b65 to your computer and use it in GitHub Desktop.
APNSService for calling APNS
"""Apple Push Notification Service Library.
Documentation is available on the iOS Developer Library:
https://developer.apple.com/library/ios/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/Chapters/ApplePushService.html
This is from django_push_notifications, but I cleaned it up and edited it to
work with custom settings.
"""
import json
import ssl
import struct
import socket
import time
from contextlib import closing
from binascii import unhexlify
from django.core.exceptions import ImproperlyConfigured
from django.conf import settings
_APNS_PORT = 2195
_APNS_FEEDBACK_PORT = 2196
_PROD_APNS_HOST = "gateway.push.apple.com"
_PROD_APNS_FEEDBACK_HOST = "feedback.push.apple.com"
_DEV_APNS_HOST = "gateway.sandbox.push.apple.com"
_DEV_APNS_FEEDBACK_HOST = "feedback.sandbox.push.apple.com"
def _host_feedback_host(debug):
if debug:
return _DEV_APNS_HOST, _DEV_APNS_FEEDBACK_HOST
else:
return _PROD_APNS_HOST, _PROD_APNS_FEEDBACK_HOST
APNS_ERROR_MESSAGES = {
1: "Processing error",
2: "Missing device token",
3: "Missing topic",
4: "Missing payload",
5: "Invalid token size",
6: "Invalid topic size",
7: "Invalid payload size",
8: "Invalid token",
10: "Shutdown",
128: "Protocol error (APNS could not parse notification)",
255: "Unknown APNS error",
}
class NotificationError(Exception):
"""Generic notifications exception."""
pass
class APNSError(NotificationError):
"""Generic APNS error."""
pass
class APNSServerError(APNSError):
"""Error from APNS server."""
def __init__(self, status, identifier):
"""Init."""
super(APNSServerError, self).__init__(status, identifier)
self.status = status
self.identifier = identifier
class APNSDataOverflow(APNSError):
"""Error representing overflow in APNS message."""
pass
def _check_certificate(ss):
mode = "start"
for s in ss.split("\n"):
if mode == "start":
if "BEGIN RSA PRIVATE KEY" in s or "BEGIN PRIVATE KEY" in s:
mode = "key"
elif mode == "key":
if "END RSA PRIVATE KEY" in s or "END PRIVATE KEY" in s:
mode = "end"
break
elif s.startswith("Proc-Type") and "ENCRYPTED" in s:
raise ImproperlyConfigured(
"Encrypted APNS private keys are not supported"
)
if mode != "end":
raise ImproperlyConfigured(
"The APNS certificate doesn't contain a private key"
)
def _apns_create_socket(address_tuple, certfile, ca_certs=None):
if ca_certs is None:
ca_certs = []
try:
with open(certfile, "r") as f:
content = f.read()
except Exception as e:
raise ImproperlyConfigured(
"The APNS certificate file at %r is not readable: %s" % (certfile, e) # noqa
)
_check_certificate(content)
sock = socket.socket()
sock = ssl.wrap_socket(
sock,
ssl_version=ssl.PROTOCOL_TLSv1,
certfile=certfile,
ca_certs=ca_certs
)
sock.connect(address_tuple)
return sock
def _apns_create_socket_to_push(debug, certfile):
host, _ = _host_feedback_host(debug)
return _apns_create_socket((host, _APNS_PORT), certfile)
def _apns_pack_frame(token_hex, payload, identifier, expiration, priority):
token = unhexlify(token_hex)
# |COMMAND|FRAME-LEN|{token}|{payload}|{id:4}|{expiration:4}|{priority:1}
# 5 items, each 3 bytes prefix, then each item length
frame_len = 3 * 5 + len(token) + len(payload) + 4 + 4 + 1
frame_fmt = "!BIBH%ssBH%ssBHIBHIBHB" % (len(token), len(payload))
frame = struct.pack(
frame_fmt,
2, frame_len,
1, len(token), token,
2, len(payload), payload,
3, 4, identifier,
4, 4, expiration,
5, 1, priority
)
return frame
def _apns_check_errors(sock):
timeout = getattr(settings, "APNS_ERROR_TIMEOUT", None)
if timeout is None:
return # assume everything went fine!
saved_timeout = sock.gettimeout()
try:
sock.settimeout(timeout)
data = sock.recv(6)
if data:
command, status, identifier = struct.unpack("!BBI", data)
# apple protocol says command is always 8. See http://goo.gl/ENUjXg
assert command == 8, "Command must be 8!"
if status != 0:
raise APNSServerError(status, identifier)
except socket.timeout: # py3, see http://bugs.python.org/issue10272
pass
except ssl.SSLError as e: # py2
if "timed out" not in e.message:
raise
finally:
sock.settimeout(saved_timeout)
def _apns_send(
token, alert, debug, certfile, badge=None, sound=None, category=None,
url=None, content_available=False, action_loc_key=None, loc_key=None,
loc_args=[], extra={}, identifier=0, expiration=None, priority=10,
socket=None, mutable_content=False
):
data = {}
aps_data = {}
if action_loc_key or loc_key or loc_args:
alert = {"body": alert} if alert else {}
if action_loc_key:
alert["action-loc-key"] = action_loc_key
if loc_key:
alert["loc-key"] = loc_key
if loc_args:
alert["loc-args"] = loc_args
if alert is not None:
aps_data["alert"] = alert
if badge is not None:
if callable(badge):
badge = badge(token)
aps_data["badge"] = badge
if sound is not None:
aps_data["sound"] = sound
if url is not None:
aps_data["url"] = url
if category is not None:
aps_data["category"] = category
if content_available:
aps_data["content-available"] = 1
if mutable_content:
aps_data["mutable-content"] = 1
data["aps"] = aps_data
data.update(extra)
# convert to json, avoiding unnecessary whitespace with separators (keys
# sorted for tests)
json_data = json.dumps(
data,
separators=(",", ":"),
sort_keys=True
).encode("utf-8")
max_size = getattr(settings, "APNS_MAX_NOTIFICATION_SIZE", 2048)
if len(json_data) > max_size:
raise APNSDataOverflow(
"Notification body cannot exceed %i bytes" % (max_size)
)
# if expiration isn't specified use 1 month from now
expiration_time = expiration if expiration is not None else int(time.time()) + 2592000 # noqa
frame = _apns_pack_frame(
token,
json_data,
identifier,
expiration_time,
priority
)
if socket:
socket.write(frame)
else:
with closing(
_apns_create_socket_to_push(debug=debug, certfile=certfile)
) as socket:
socket.write(frame)
_apns_check_errors(socket)
return token
def _apns_read_and_unpack(socket, data_format):
length = struct.calcsize(data_format)
data = socket.recv(length)
if data:
return struct.unpack_from(data_format, data, 0)
else:
return None
def _apns_receive_feedback(socket):
expired_token_list = []
# read a timestamp (4 bytes) and device token length (2 bytes)
header_format = "!LH"
has_data = True
while has_data:
try:
# read the header tuple
header_data = _apns_read_and_unpack(socket, header_format)
if header_data is not None:
timestamp, token_length = header_data
# Unpack format for a single value of length bytes
token_format = "%ss" % token_length
device_token = _apns_read_and_unpack(socket, token_format)
if device_token is not None:
# _apns_read_and_unpack returns a tuple, but
# it's just one item, so get the first.
expired_token_list.append((timestamp, device_token[0]))
else:
has_data = False
except socket.timeout: # py3, see http://bugs.python.org/issue10272
pass
except ssl.SSLError as e: # py2
if "timed out" not in e.message:
raise
return expired_token_list
class APNSService(object):
"""Service object for APNS."""
def send_message(self, registration_id, certfile, alert, debug=False, **kwargs): # noqa
"""Send an APNS notification to a single registration_id.
This will send the notification as form data.
If sending multiple notifications, it is more efficient to use
apns_send_bulk_message()
Note that if set alert should always be a string. If it is not set,
it won't be included in the notification. You will need to pass None
to this for silent notifications.
"""
return _apns_send(registration_id, alert, certfile=certfile, debug=debug, **kwargs) # noqa
apns_service = APNSService()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment