Skip to content

Instantly share code, notes, and snippets.

@FZambia
Last active December 25, 2015 16:49
Show Gist options
  • Save FZambia/7008721 to your computer and use it in GitHub Desktop.
Save FZambia/7008721 to your computer and use it in GitHub Desktop.
tarantool queue wrapper
# -*- coding: utf-8 -*-
# client for tarantool's queue
# https://github.com/tarantool/queue
# usage:
# queue = Queue("127.0.0.1", 12013)
# queue.put(data={"foo":"bar"})
# task = queue.take()
import tarantool
from tarantool.error import DatabaseError, NetworkError
from threading import Lock
import struct
try:
import simplejson as json
except ImportError:
import json
def unpack_long_long(value):
return struct.unpack("<q", value)[0]
def unpack_long(value):
return struct.unpack("<l", value)[0]
class Task(object):
def __init__(self, queue, space=0, task_id=0, tube="", status="", raw_data=None, ):
self.task_id = task_id
self.tube = tube
self.status = status
self.raw_data = raw_data
self.space = space
self.queue = queue
def ack(self):
return self.queue.ack(self.task_id)
def release(self, **kwargs):
return self.queue.release(self.task_id, **kwargs)
def delete(self):
return self.queue.delete(self.task_id)
def requeue(self):
return self.queue.requeue(self.task_id)
def done(self):
return self.queue.done(self.task_id)
def bury(self):
return self.queue.bury(self.task_id)
def dig(self):
return self.queue.dig(self.task_id)
def meta(self):
return self.queue.meta(self.task_id)
def _data(self):
if not self.raw_data:
return None
if not hasattr(self, '_decoded_data'):
self._decoded_data = self.queue.from_json(self.raw_data)
return self._decoded_data
def __str__(self):
args = (
self.task_id, self.tube, self.status, self.space
)
return "Task (id: {0}, tube:{1}, status: {2}, space:{3})".format(*args)
data = property(_data)
@classmethod
def from_tuple(cls, queue, the_tuple):
if the_tuple is None:
return
if the_tuple.rowcount < 1:
raise Queue.ZeroTupleException('error creating task')
row = the_tuple[0]
return cls(
queue,
space=queue.space,
task_id=row[0],
tube=row[1],
status=row[2],
raw_data=row[3],
)
class Queue(object):
"""
Tarantool queue wrapper
"""
DataBaseError = DatabaseError
NetworkError = NetworkError
class BadConfigException(Exception):
pass
class ZeroTupleException(Exception):
pass
def to_json(self, data):
return json.dumps(data, separators=(',', ':'))
def from_json(self, data):
return json.loads(data)
def __init__(self, host="localhost", port=33013, tube="queue", space=0, schema=None):
if not(host and port):
raise Queue.BadConfigException("host and port params must be not empty")
if not isinstance(port, int):
raise Queue.BadConfigException("port must be int")
if not tube:
raise Queue.BadConfigException("tube param must be not empty")
if not isinstance(space, int):
raise Queue.BadConfigException("space must be int")
self.host = host
self.port = port
self.tube = tube
self.space = space
self.schema = schema
self._instance_lock = Lock()
def tnt_instance(self):
with self._instance_lock:
if not hasattr(self, '_tnt'):
self._tnt = tarantool.connect(self.host, self.port, schema=self.schema)
return self._tnt
tnt = property(tnt_instance)
def put(self, data=None, urgent=False, **kwargs):
"""
Enqueue a task. Returns a tuple, representing the new task.
The list of fields with task data ('...')is optional.
If urgent set to True then the task will get the highest priority.
"""
opt = {
'tube': self.tube,
'pri': 0, # priority
'delay': 0, # delay for task
'ttl': 0, # time to live
'ttr': 0 # time to read
}
opt.update(kwargs)
method = "queue.urgent" if urgent else "queue.put"
the_tuple = self.tnt.call(method, (
str(self.space),
str(opt["tube"]),
str(opt["delay"]),
str(opt["ttl"]),
str(opt["ttr"]),
str(opt["pri"]),
self.to_json(data))
)
return Task.from_tuple(self, the_tuple)
def take(self, tube=None, timeout=0):
"""
If there are tasks in the queue ready for execution,
take the highest-priority task. Otherwise, wait for a
ready task to appear in the queue, and, as soon as it appears,
mark it as taken and return to the consumer. If there is a
timeout, and the task doesn't appear until the timeout expires,
return 'nil'. If timeout is None, wait indefinitely until
a task appears.
"""
tube = tube or self.tube
args = [str(self.space), str(tube)]
if timeout is not None:
args.append(str(timeout))
the_tuple = self.tnt.call("queue.take", tuple(args))
if the_tuple.rowcount == 0:
return None
return Task.from_tuple(self, the_tuple)
def ack(self, task_id):
"""
Confirm completion of a task. Before marking a task as complete
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.ack", args)
return the_tuple.return_code == 0
def release(self, task_id, delay=0, ttl=0):
"""
Return a task back to the queue: the task is not executed.
"""
the_tuple = self.tnt.call("queue.release", (
str(self.space),
str(task_id),
str(delay),
str(ttl)
))
return Task.from_tuple(self, the_tuple)
def requeue(self, task_id):
"""
Return a task to the queue, the task is not executed.
Puts the task at the end of the queue, so that it's
executed only after all existing tasks in the queue are
executed.
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.requeue", args)
return the_tuple.return_code == 0
def bury(self, task_id):
"""
Mark a task as buried. This special status excludes the
task from the active list, until it's dug up. This function
is useful when several attempts to execute a task lead to a
failure. Buried tasks can be monitored by the queue owner,
and treated specially.
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.bury", args)
return the_tuple.return_code == 0
def done(self, task_id, data=None):
"""
Mark a task as complete (done), but don't delete it.
Replaces task data with the supplied data.
"""
the_tuple = self.tnt.call("queue.done", (
str(self.space),
str(task_id),
self.to_json(data))
)
return the_tuple.return_code == 0
def delete(self, task_id):
"""
Delete a task from the queue (regardless of task state or status).
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.delete", args)
return the_tuple.return_code == 0
def meta(self, task_id):
"""
Return unpacked task metadata.
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.meta", args)
if the_tuple.rowcount:
row = list(the_tuple[0])
for index in [3, 7, 8, 9, 10, 11, 12]:
row[index] = unpack_long_long(row[index])
for index in [6]:
row[index] = unpack_long(row[index])
keys = [
'task_id', 'tube', 'status', 'event', 'ipri',
'pri', 'cid', 'created', 'ttl', 'ttr', 'cbury',
'ctaken', 'now'
]
return dict(zip(keys, row))
return None
def peek(self, task_id):
"""
Return a task by task id.
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.peek", args)
return Task.from_tuple(self, the_tuple)
def dig(self, task_id):
"""
'Dig up' a buried task, after checking that the task is buried.
The task status is changed to ready.
"""
args = (str(self.space), task_id)
the_tuple = self.tnt.call("queue.dig", args)
return the_tuple.return_code == 0
def kick(self, tube=None, count=None):
"""
'Dig up' count tasks in a queue. If count is not given, digs up
just one buried task.
"""
tube = tube or self.tube
args = [str(self.space), str(tube)]
if count:
args.append(str(count))
the_tuple = self.tnt.call("queue.kick", tuple(args))
return the_tuple.return_code == 0
def statistics(self):
"""
Return queue module statistics accumulated since server start.
"""
stat = self.tnt.call("queue.statistics", ())
if stat.rowcount > 0:
return dict(zip(stat[0][0::2], stat[0][1::2]))
return dict()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment