Skip to content

Instantly share code, notes, and snippets.

@RafGb
Created July 13, 2016 07:53
Show Gist options
  • Save RafGb/84d71a3bc6e083a7e09bd5dbb73895cd to your computer and use it in GitHub Desktop.
Save RafGb/84d71a3bc6e083a7e09bd5dbb73895cd to your computer and use it in GitHub Desktop.
Delayed task queue for django-channels

Delayed task queue for django-channels.

Only for redis!

Usage:

  1. Run worker
python manage.py channels_delayed_tasks_worker
  1. Start creating delayd tasks
import channels

channels.Channel('channels.delay').send({
    'channel': 'target_channel',
    'content': {},
    'delay': 10,
})
import channels
import time
import redis
import Queue
import select
from redis.exceptions import (
ConnectionError,
TimeoutError,
)
from channels.log import setup_logger
from django.core.management.base import BaseCommand
logger = setup_logger('django.channels')
CHANNEL_NAME = 'channels.delay'
SELECT_TIMEOUT = 5
BLPOP_TIMEOUT = 5
class Task(object):
def __init__(self, channel, delay, content):
self.channel = channel
self.delay = delay
self.content = content
self.scheduled_time = time.time() + delay
def is_time(self):
return time.time() >= self.scheduled_time
def timeout(self):
return max(self.scheduled_time - time.time(), 0)
def __cmp__(self, other):
return cmp(self.scheduled_time, other.scheduled_time)
class Worker():
def __init__(self, channel_layer):
self.channel_layer = channel_layer
host = channel_layer.hosts[0]
prefix = channel_layer.prefix
self.channel_key = prefix + CHANNEL_NAME
self.redis = redis.Redis.from_url(host)
self.blpop_connection = self.redis.connection_pool.get_connection('BLPOP')
self.queue = Queue.PriorityQueue()
self.select_timeout = SELECT_TIMEOUT
self.blpop_timeout = BLPOP_TIMEOUT
def process_blpop_response(self, channel, task_key):
content = self.redis.get(task_key)
if content:
task_data = self.channel_layer.deserialize(content)
task = Task(**task_data)
self.queue.put(task)
def run(self):
connection = self.blpop_connection
while True:
try:
connection.send_command('BLPOP', self.channel_key, self.blpop_timeout)
except ConnectionError, e:
connection.disconnect()
logger.error(e)
time.sleep(5)
continue
if self.queue.empty():
next_task = None
timeout = self.select_timeout
else:
next_task = self.queue.get()
timeout = next_task.timeout()
inp, _, _ = select.select([connection._sock, ], [], [], timeout)
if inp:
try:
resp = connection.read_response()
if resp:
self.process_blpop_response(*resp)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
logger.error(e)
except TypeError:
logger.error("Invalid task format")
except Exception, e:
logger.error(e)
if next_task:
if next_task.is_time():
# logger.info("Sending task to channel: %s" % next_task.channel)
channels.Channel(next_task.channel).send(next_task.content)
else:
self.queue.put(next_task)
class Command(BaseCommand):
def handle(self, *args, **options):
channel_layer = channels.asgi.channel_layers[channels.DEFAULT_CHANNEL_LAYER]
worker = Worker(channel_layer)
worker.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment