Skip to content

Instantly share code, notes, and snippets.

@lau-jay
Last active May 23, 2018 02:23
Show Gist options
  • Save lau-jay/07563132c780b1febea05214c35dd515 to your computer and use it in GitHub Desktop.
Save lau-jay/07563132c780b1febea05214c35dd515 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
from tornado import httpclient, gen, queues
from tornado import escape
import os
# test
from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler
q = queues.Queue()
@gen.coroutine
def async_send(item):
print(item)
@gen.coroutine
def producer(conn=None):
data = [{'openid': 'wx1234555'}, {'openid': 'wx1234556'}]
data_map = []
for i in data:
item = {'openid': i.get('openid')}
item['tips_no'] = 'hello_world'
data_map.append(item)
tips_record = [{'tips_no': 'hello_world', 'wechat_template_no': 'pyclearl'}]
tips_map = {i.get('tips_no'): i.get('wechat_template_no') for i in tips_record}
for item in data_map:
task = {'openid': item.get('openid'), 'wechat_template_no': tips_map.get(item.get('tips_no'))}
yield q.put(task)
@gen.coroutine
def send_msg():
@gen.coroutine
def send_tips():
task = yield q.get()
try:
yield async_send(task)
finally:
q.task_done()
@gen.coroutine
def worker():
while True:
yield send_tips()
yield worker()
yield q.join()
if __name__ == '__main__':
scheduler = TornadoScheduler()
scheduler.add_job(producer, 'interval', seconds=3)
scheduler.add_job(send_msg, 'interval', seconds=3)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment