Skip to content

Instantly share code, notes, and snippets.

@henter
Last active August 29, 2015 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henter/8748092 to your computer and use it in GitHub Desktop.
Save henter/8748092 to your computer and use it in GitHub Desktop.
https://github.com/henter/PythonRabbitMQAPN Python结合RabbitMQ实现多线程APN推送
import socket, ssl, json, struct
import binascii
def Payload(alert='', badge=1, data={}):
payload = {
'aps': {
'alert':alert,
'sound':'k1DiveAlarm.caf',
'badge':badge,
},
'acme': data,
}
return payload
class APN(object):
def __init__(self, cert_file=None, dev=False):
super(APN, self).__init__()
self.cert_file = cert_file
self.dev = dev
self.connection = None
def get_connection(self):
if not self.connection:
self.connection = APNConnection(cert_file = self.cert_file, dev = self.dev)
return self.connection
def send(self, token=None, payload=None):
data = json.dumps( payload , ensure_ascii=False)
#data = json.dumps( payload )
data = data.encode('utf-8')
# Clear out spaces in the device token and convert to hex
deviceToken = token.replace(' ','')
byteToken = binascii.unhexlify(token)
theFormat = '!BH32sH%ds' % len(data)
theNotification = struct.pack( theFormat, 0, 32, byteToken, len(data), data )
retry_time_limit = 3
isFail = True
tryTime = 0
while isFail and tryTime < retry_time_limit:
try:
ret = self.get_connection().write(theNotification)
isFail = False
except :
isFail = True
tryTime += 1
print("apn try " + str(tryTime) + " time failed, time out.")
if isFail:
return False
return True if int(ret)<=293 else False
class APNConnection(object):
def __init__(self, cert_file=None, dev=False):
super(APNConnection, self).__init__()
self.cert_file = cert_file
self.ssl_sock = None
self.server = 'gateway.push.apple.com'
if dev == True:
self.server = 'gateway.sandbox.push.apple.com'
self.port = 2195
def connect(self):
self.ssl_sock = ssl.wrap_socket(
socket.socket( socket.AF_INET, socket.SOCK_STREAM ),
certfile = self.cert_file
)
self.ssl_sock.connect( (self.server, self.port) )
return self.ssl_sock
def get_connection(self):
if not self.ssl_sock:
self.connect()
return self.ssl_sock
def read(self, n=None):
return self.get_connection().read(n)
def write(self, string):
return self.get_connection().write(string)
def __del__(self):
if self.ssl_sock:
self.ssl_sock.close()
#!/usr/bin/env python
#coding=utf-8
import sys
from api.apns import APN,Payload
def push(msg, config):
token = msg['udid']
data = msg['data']
if len(token) < 50:
return 'udid invalid'
payload = Payload(msg['content'], msg['count'], data)
#不同的app用不同证书
if 'app' in msg:
pem = config[msg['app']]['pem']
else:
return 'error app'
apn = APN(pem, config['dev'])
return apn.send(token, payload)
if __name__ == '__main__':
from config import get_env_config
msg = {
'app': 'test',
'data': {'type':'feed', 'id': 123},
'count': 1,
'udid': 'bf7124b455c46395d619046df2b1fe68aed9cb8c2dd6b5a9f8531exxxxxxxxxx',
#最多49个汉字
'content':u'44445萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德'
}
config = get_env_config()
if not config:
print 'config error'
print push(msg, config)
#!/usr/bin/env python
#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
import threading
import time
import pika
import json
from thread_manager import thread_manager
from config import get_config
from push_ios import push
def main(thread_id):
thread_str = 'Thread %s : ' % str(thread_id)
user = config['user']
password = config['password']
queue = config['queue']
queue_size = get_queue_size()
try:
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
channel = connection.channel()
channel.queue_declare(queue=queue,durable=True)
except:
print 'rabbitmq conneciton fail'
sys.exit()
print thread_str + ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
#print " [x] Received json "+ (body)
print thread_str + " [x] Received... "
ret = push(json.loads(body), config)
if ret == True:
print thread_str + "push success "+ body
else:
print thread_str + 'push fail: '+body
ch.basic_ack(delivery_tag = method.delivery_tag)
ch.stop_consuming()
channel.basic_qos(prefetch_count=10)
channel.basic_consume(callback, queue=queue)
if queue_size > 0:
#print thread_str + 'start consuming...'
#这里如果开启consuming会阻塞
#channel.start_consuming()
pass
else:
print thread_str + 'queue is empty, stop consuming'
channel.stop_consuming()
connection.close()
def get_queue_size():
user = config['user']
password = config['password']
queue = config['queue']
try:
credentials = pika.PlainCredentials(user, password)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials))
channel = connection.channel()
declare_ok = channel.queue_declare(queue=queue,durable=True,passive=True)
return declare_ok.method.message_count
except:
print 'rabbitmq conneciton fail while get queue size'
return 0
def get_thread_num(queue_size):
global _sleep
if queue_size > 100:
thread_num = 100
_sleep = 0.1
elif queue_size > 10:
thread_num = 5
_sleep = 0.1
else:
thread_num = 2
_sleep = 2
return thread_num
if __name__ == '__main__':
config = get_config()
if not config:
print 'config error'
sys.exit()
_sleep = 2
thread_num = get_thread_num(get_queue_size())
# 防止KeyboardInterrupt时报错,程序会在下一步捕获KeyboardInterrupt
try:
thread_manager(thread_num, main)
except:
pass
# 所有线程执行完退出后 循环检查
try:
while True:
time.sleep(_sleep)
print 'all threads exit. loop checking...'
queue_size = get_queue_size();
if queue_size > 0:
thread_manager(get_thread_num(queue_size), main)
except KeyboardInterrupt:
print "\nthread manager stoped"
#coding=utf-8
import threading
class thread_manager:
def __init__(self, thread_num=1, target=None):
print 'start thread manager ...'
for i in range(thread_num):
th = threading.Thread(target=target, args=(i,))
th.setDaemon(True)
th.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment