Skip to content

Instantly share code, notes, and snippets.

@ask
Created April 22, 2009 14:43
Show Gist options
  • Save ask/99830 to your computer and use it in GitHub Desktop.
Save ask/99830 to your computer and use it in GitHub Desktop.
import settings
from django.core.management import setup_environ
setup_environ(settings)
from djangofeeds.messaging import consumer as feed_consumer
from djangofeeds.importers import FeedImporter
from UserList import UserList
import multiprocessing
import simplejson
import logging
import time
# If the queue is empty, this is the time *in seconds* the daemon sleeps
# until it wakes up to check if there's any new messages on the queue.
QUEUE_WAKEUP_AFTER = 0.3
# As long as the queue is empty, the daemon logs a "Queue is empty" message
# every ``EMPTY_MESG_EMIT_EVERY`` *seconds*.
EMPTY_MSG_EMIT_EVERY = 5
class EmptyQueue(Exception):
"""The message queue is currently empty."""
class UnknownAction(Exception):
"""Got an unknown action in the queue. The message is requeued and
ignored."""
class ProcessQueue(UserList):
"""Queue of running child processes, which starts waiting for the
processes to finish when the queue limit is reached."""
def __init__(self, limit, done_msg="Got %s"):
self.limit = limit
self.data = []
def add(self, result):
self.data.append(result)
if len(self.data) >= self.limit:
for result in self.data:
value = result.get()
self.data = []
def refresh_feed(feed_url, loglevel=logging.WARNING, logfile=None):
"""Refresh a djangofeed feed, supports multiprocessing."""
logger = setup_logger(loglevel, logfile)
importer = FeedImporter(update_on_import=True, logger=logger)
importer.import_feed(feed_url)
return feed_url
def setup_logger(loglevel=None, logfile=None):
"""Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
``stderr`` is used.
Returns logger object.
"""
logger = multiprocessing.get_logger()
if logfile:
log_file_handler = logging.FileHandler(logfile)
logger.addHandler(log_file_handler)
else:
multiprocessing.log_to_stderr()
logger.setLevel(loglevel)
return logger
class RefreshFeedDaemon(object):
"""Refreshes feed_urls in the queue using a process pool.
``concurrency`` is the number of simultaneous processes.
"""
def __init__(self, concurrency=None, logfile=None, loglevel=None):
self.loglevel = loglevel or self.loglevel
self.concurrency = concurrency or self.concurrency
self.logfile = logfile or self.logfile
self.logger = setup_logger(loglevel, logfile)
self.pool = multiprocessing.Pool(self.concurrency)
def fetch_next_feed(self):
message = feed_consumer.fetch()
if message is None: # No messages waiting.
raise EmptyQueue()
data = simplejson.loads(message.body)
action = data.get("action", "")
if action != "import_feed":
message.reject()
raise UnknownAction(action)
if data.get("action", "") == "import_feed":
feed_url = data["feed_url"]
self.logger.info(">>> Importing feed: %s" % feed_url)
try:
result = self.pool.apply_async(refresh_feed, [
feed_url, self.loglevel, self.logfile])
except:
message.reject()
raise
message.ack()
return result
def run(self):
results = ProcessQueue(self.concurrency,
"Feed %s successfully refreshed")
last_empty_emit = None
while True:
try:
result = self.fetch_next_feed()
except EmptyQueue:
if not last_empty_emit or \
time.time() > last_empty_emit + EMPTY_MSG_EMIT_EVERY:
self.logger.info("Waiting for queue.")
last_empty_emit = time.time()
time.sleep(QUEUE_WAKEUP_AFTER)
continue
except UnknownAction, e:
self.logger.info("Unknown action %s requeued and ignored.")
continue
results.add(result)
def main(concurrency=5, loglevel=logging.DEBUG, logfile="rdaemon.log"):
daemon = RefreshFeedDaemon(concurrency=concurrency,
loglevel=loglevel,
logfile=logfile)
daemon.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment