Skip to content

Instantly share code, notes, and snippets.

@kaka19ace
Last active June 17, 2020 04:06
Show Gist options
  • Save kaka19ace/c5f20613414888de2dc4 to your computer and use it in GitHub Desktop.
Save kaka19ace/c5f20613414888de2dc4 to your computer and use it in GitHub Desktop.
gevent with zmq and redis tasks
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @file gevent_zmq_redis_demo.py
# @author kaka_ace <xiang.ace@gmail.com>
# @date Tue Oct 14 2014
"""
fetures: there are two concurrent tasks
1. From Redis server, Getting the notify msg with BLPOP operation,
then using zmq dealer send the msg
2. zmq dealer recv msg from zmq router, then set the value(from recv's msg) to Redis
base on gevent, redis-py, pyzmq
Reference:
[ gevent with redis-py ](http://gehrcke.de/2013/01/highly-concurrent-connections-to-redis-with-gevent-and-redis-py/)
[ gevent with zmq ](https://github.com/zeromq/pyzmq/blob/master/examples/gevent/poll.py)
Python 2.7.8
"""
import sys
reload(sys)
sys.setdefaultencoding("utf8")
import os
import signal
# test ip and port
REDIS_IP = "127.0.0.1"
REDIS_PORT = 6379
ROUTER_IP = "127.0.0.1"
ROUTER_PORT = 59144 # also we could use 44944, just interesting number in chinese :)
# gevent, we use gevent to manipulate io tasks (here all socket classes)
import gevent
# after gevent 1.0.1 in __init__.py, the __all__ has no 'socket' attribute
# another way: from ... import ...
from gevent import socket as gevent_socket
# redis
import redis.connection
# redis's socket resign to gevent.socket
redis.connection.socket = gevent_socket
# Instantiate two redis client
# redis_pop_client pop list
# redis_setvalue_client set value
from redis import RedisError
from redis import ConnectionPool
from redis import Redis
REDIS_CONNECTION_POOL = ConnectionPool(max_connections = 8, host = REDIS_IP, port = REDIS_PORT)
redis_pop_client = Redis(connection_pool=REDIS_CONNECTION_POOL)
redis_setvalue_client = Redis(connection_pool=REDIS_CONNECTION_POOL)
# Redis key
TEST_REDIS_LIST_KEY = "kaka:test:list"
TEST_REDIS_SETVALUE_KEY = "kaka:test:key"
# zmq, based on gevent
from zmq import green as zmq
context = zmq.Context()
dealer = context.socket(zmq.DEALER)
dealer.set_hwm(1000) # zmq default high water mark is 1000, explicitly here
router_addr = "tcp://" + ROUTER_IP + ":" + str(ROUTER_PORT)
dealer.connect(router_addr)
# regiser dealer in zmq poller,
# in each poll task, poller will check dealer ZMQ_POLLIN event
poller = zmq.Poller()
poller.register(dealer, zmq.POLLIN)
# process functions
def process_redis_pop_task():
"""
brpop msg from Redis list, then dealer send msg
"""
while True:
try:
_, content = redis_pop_client.brpop(TEST_REDIS_LIST_KEY )
msgs = [content]
dealer.send_multipart(msgs, copy=False)
except RedisError, e:
continue
except TypeError, e:
# if redis_pop_client.brpop add timeout param,
# brpop will raise TypeError, when the list is empty
continue
except Exception, e:
print "Exception: ", str(e)
continue
def process_dealer_zmq_pollin():
"""
dealer recv msg from router, then set value to Redis
"""
while True:
socks = dict(poller.poll())
if dealer in socks and socks[dealer] == zmq.POLLIN:
msgs = dealer.recv_multipart(copy=True)
if len(msgs) >= 1:
# the demo just use the first msg frame
redis_setvalue_client.set(TEST_REDIS_SETVALUE_KEY, msgs[0])
def process_shutdown(signum, greenlets):
"""
when the process recv signal, before the process exit, gevent will kill all tasks
"""
gevent.killall(greenlets)
def register_sys_exit_handler(greenlets):
"""
register signal
"""
gevent.signal(signal.SIGQUIT, process_shutdown, signal.SIGQUIT, greenlets)
gevent.signal(signal.SIGINT, process_shutdown, signal.SIGQUIT, greenlets)
gevent.signal(signal.SIGTERM, process_shutdown, signal.SIGQUIT, greenlets)
gevent.signal(signal.SIGKILL, process_shutdown, signal.SIGQUIT, greenlets)
tasks = [gevent.spawn(process_redis_pop_task), gevent.spawn(process_dealer_zmq_pollin)]
register_sys_exit_handler(tasks)
# gevent loop
gevent.joinall(tasks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment