Last active
August 29, 2015 13:55
-
-
Save henter/8748092 to your computer and use it in GitHub Desktop.
https://github.com/henter/PythonRabbitMQAPN Python结合RabbitMQ实现多线程APN推送
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket, ssl, json, struct | |
import binascii | |
def Payload(alert='', badge=1, data={}): | |
payload = { | |
'aps': { | |
'alert':alert, | |
'sound':'k1DiveAlarm.caf', | |
'badge':badge, | |
}, | |
'acme': data, | |
} | |
return payload | |
class APN(object): | |
def __init__(self, cert_file=None, dev=False): | |
super(APN, self).__init__() | |
self.cert_file = cert_file | |
self.dev = dev | |
self.connection = None | |
def get_connection(self): | |
if not self.connection: | |
self.connection = APNConnection(cert_file = self.cert_file, dev = self.dev) | |
return self.connection | |
def send(self, token=None, payload=None): | |
data = json.dumps( payload , ensure_ascii=False) | |
#data = json.dumps( payload ) | |
data = data.encode('utf-8') | |
# Clear out spaces in the device token and convert to hex | |
deviceToken = token.replace(' ','') | |
byteToken = binascii.unhexlify(token) | |
theFormat = '!BH32sH%ds' % len(data) | |
theNotification = struct.pack( theFormat, 0, 32, byteToken, len(data), data ) | |
retry_time_limit = 3 | |
isFail = True | |
tryTime = 0 | |
while isFail and tryTime < retry_time_limit: | |
try: | |
ret = self.get_connection().write(theNotification) | |
isFail = False | |
except : | |
isFail = True | |
tryTime += 1 | |
print("apn try " + str(tryTime) + " time failed, time out.") | |
if isFail: | |
return False | |
return True if int(ret)<=293 else False | |
class APNConnection(object): | |
def __init__(self, cert_file=None, dev=False): | |
super(APNConnection, self).__init__() | |
self.cert_file = cert_file | |
self.ssl_sock = None | |
self.server = 'gateway.push.apple.com' | |
if dev == True: | |
self.server = 'gateway.sandbox.push.apple.com' | |
self.port = 2195 | |
def connect(self): | |
self.ssl_sock = ssl.wrap_socket( | |
socket.socket( socket.AF_INET, socket.SOCK_STREAM ), | |
certfile = self.cert_file | |
) | |
self.ssl_sock.connect( (self.server, self.port) ) | |
return self.ssl_sock | |
def get_connection(self): | |
if not self.ssl_sock: | |
self.connect() | |
return self.ssl_sock | |
def read(self, n=None): | |
return self.get_connection().read(n) | |
def write(self, string): | |
return self.get_connection().write(string) | |
def __del__(self): | |
if self.ssl_sock: | |
self.ssl_sock.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#coding=utf-8 | |
import sys | |
from api.apns import APN,Payload | |
def push(msg, config): | |
token = msg['udid'] | |
data = msg['data'] | |
if len(token) < 50: | |
return 'udid invalid' | |
payload = Payload(msg['content'], msg['count'], data) | |
#不同的app用不同证书 | |
if 'app' in msg: | |
pem = config[msg['app']]['pem'] | |
else: | |
return 'error app' | |
apn = APN(pem, config['dev']) | |
return apn.send(token, payload) | |
if __name__ == '__main__': | |
from config import get_env_config | |
msg = { | |
'app': 'test', | |
'data': {'type':'feed', 'id': 123}, | |
'count': 1, | |
'udid': 'bf7124b455c46395d619046df2b1fe68aed9cb8c2dd6b5a9f8531exxxxxxxxxx', | |
#最多49个汉字 | |
'content':u'44445萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德萨范德' | |
} | |
config = get_env_config() | |
if not config: | |
print 'config error' | |
print push(msg, config) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#coding=utf-8 | |
import sys | |
reload(sys) | |
sys.setdefaultencoding("utf-8") | |
import threading | |
import time | |
import pika | |
import json | |
from thread_manager import thread_manager | |
from config import get_config | |
from push_ios import push | |
def main(thread_id): | |
thread_str = 'Thread %s : ' % str(thread_id) | |
user = config['user'] | |
password = config['password'] | |
queue = config['queue'] | |
queue_size = get_queue_size() | |
try: | |
credentials = pika.PlainCredentials(user, password) | |
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) | |
channel = connection.channel() | |
channel.queue_declare(queue=queue,durable=True) | |
except: | |
print 'rabbitmq conneciton fail' | |
sys.exit() | |
print thread_str + ' [*] Waiting for messages. To exit press CTRL+C' | |
def callback(ch, method, properties, body): | |
#print " [x] Received json "+ (body) | |
print thread_str + " [x] Received... " | |
ret = push(json.loads(body), config) | |
if ret == True: | |
print thread_str + "push success "+ body | |
else: | |
print thread_str + 'push fail: '+body | |
ch.basic_ack(delivery_tag = method.delivery_tag) | |
ch.stop_consuming() | |
channel.basic_qos(prefetch_count=10) | |
channel.basic_consume(callback, queue=queue) | |
if queue_size > 0: | |
#print thread_str + 'start consuming...' | |
#这里如果开启consuming会阻塞 | |
#channel.start_consuming() | |
pass | |
else: | |
print thread_str + 'queue is empty, stop consuming' | |
channel.stop_consuming() | |
connection.close() | |
def get_queue_size(): | |
user = config['user'] | |
password = config['password'] | |
queue = config['queue'] | |
try: | |
credentials = pika.PlainCredentials(user, password) | |
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) | |
channel = connection.channel() | |
declare_ok = channel.queue_declare(queue=queue,durable=True,passive=True) | |
return declare_ok.method.message_count | |
except: | |
print 'rabbitmq conneciton fail while get queue size' | |
return 0 | |
def get_thread_num(queue_size): | |
global _sleep | |
if queue_size > 100: | |
thread_num = 100 | |
_sleep = 0.1 | |
elif queue_size > 10: | |
thread_num = 5 | |
_sleep = 0.1 | |
else: | |
thread_num = 2 | |
_sleep = 2 | |
return thread_num | |
if __name__ == '__main__': | |
config = get_config() | |
if not config: | |
print 'config error' | |
sys.exit() | |
_sleep = 2 | |
thread_num = get_thread_num(get_queue_size()) | |
# 防止KeyboardInterrupt时报错,程序会在下一步捕获KeyboardInterrupt | |
try: | |
thread_manager(thread_num, main) | |
except: | |
pass | |
# 所有线程执行完退出后 循环检查 | |
try: | |
while True: | |
time.sleep(_sleep) | |
print 'all threads exit. loop checking...' | |
queue_size = get_queue_size(); | |
if queue_size > 0: | |
thread_manager(get_thread_num(queue_size), main) | |
except KeyboardInterrupt: | |
print "\nthread manager stoped" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding=utf-8 | |
import threading | |
class thread_manager: | |
def __init__(self, thread_num=1, target=None): | |
print 'start thread manager ...' | |
for i in range(thread_num): | |
th = threading.Thread(target=target, args=(i,)) | |
th.setDaemon(True) | |
th.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment