Skip to content

Instantly share code, notes, and snippets.

@gaoconghui
Last active January 5, 2018 03:29
Show Gist options
  • Save gaoconghui/1cc2fc93686430e17917398413a328d8 to your computer and use it in GitHub Desktop.
Save gaoconghui/1cc2fc93686430e17917398413a328d8 to your computer and use it in GitHub Desktop.
延迟任务队列
# -*- coding: utf-8 -*-
import heapq
import time
class DelayJobQueue(object):
"""
延迟执行任务队列
q = DelayJobQueue()
q.add(task,at=time.time() + 10)
q.pop_ready() # None
# 10秒后
q.pop_ready() # task
"""
def __init__(self):
self._tasks = []
def add(self, task, at=None):
"""
增加任务
:param task: 任务
:param at: 执行时间
:return:
"""
if not at:
at = time.time()
heapq.heappush(self._tasks, (at, task))
def pop_ready(self):
"""
pop 应该需要执行的任务
:return:
"""
ready_tasks = []
while self._tasks and self._tasks[0][0] < time.time():
try:
task = self._pop_next()
except KeyError:
break
ready_tasks.append(task)
return ready_tasks
def _pop_next(self):
if not self._tasks:
raise KeyError('pop from an empty DelayedTaskQueue')
at, task = heapq.heappop(self._tasks)
return task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment