Skip to content

Instantly share code, notes, and snippets.

@theanti9
Created December 31, 2014 05:29
Show Gist options
  • Save theanti9/344b878aeab3e0953699 to your computer and use it in GitHub Desktop.
Save theanti9/344b878aeab3e0953699 to your computer and use it in GitHub Desktop.
A Durable (non thread safe) Queue class backed by a commit log.
import os
import struct
import Queue
class QueueItem(object):
def __init__(self, msg_id, data, removed=False):
self.msg_id = msg_id
self.data = data
self.removed = removed
def serialize(self):
data_len = len(self.data) if self.data is not None else 0
fmt = "<2i?"
if data_len > 0:
fmt += "%ds" % data_len
return struct.pack(fmt, data_len, self.msg_id, self.removed, self.data)
else:
return struct.pack(fmt, data_len, self.msg_id, self.removed)
class CommitLog(object):
def __init__(self, file_path):
self.file_path = file_path
self.log_handle = None
def open(self):
if self.log_handle is not None:
raise Exception("Commit Log already open or improperly closed.")
self.log_handle = open(self.file_path, "ab")
def close(self):
self.log_handle.flush()
self.log_handle.close()
self.log_handle = None
def commit(self, queue_item):
try:
self.log_handle.write(queue_item.serialize())
self.log_handle.flush()
return True
except Exception as e:
print e
return False
def replay(self):
if os.path.isfile(self.file_path):
with open(self.file_path, "rb") as read_only:
while True:
header = read_only.read(9)
if header == "" or len(header) < 9:
break
data_len, msg_id, removed = struct.unpack("<2i?", header)
data = None
if data_len > 0:
data = struct.unpack("<%ds" % data_len, read_only.read(data_len))[0]
yield QueueItem(msg_id, data, removed)
class DurableQueue(object):
def __init__(self, q_name, commit_log_path="."):
self.q_name = q_name
self.commit_log_path = os.path.join(commit_log_path, q_name)
self.commit_log = None
self.q = Queue.Queue()
self.next_id = 1
def _construct(self):
self.commit_log = CommitLog(self.commit_log_path)
items = {}
for item in self.commit_log.replay():
if item.removed:
if item.msg_id in items.keys():
del items[item.msg_id]
else:
# Need to log this probably?
pass
else:
items[item.msg_id] = item
ordered_keys = list(items.keys())
ordered_keys.sort()
for key in ordered_keys:
self.next_id = key
self.q.put_nowait(items[key])
def put(self, data):
msg_id = self.next_id
self.next_id += 1
item = QueueItem(msg_id, data)
try:
if not self.commit_log.commit(item):
return False
except Exception as e:
return False
try:
self.q.put_nowait(item)
return True
except Exception as e:
# Failed to put it in the queue.
# Strike it from the commit log
item.removed = False
return False
def pop(self):
if self.q.empty():
return None
return self.q.get()
def done(self, msg_id):
try:
if not self.commit_log.commit(QueueItem(msg_id, None, True)):
return False
return True
except Exception as e:
return False
def __enter__(self):
self._construct()
self.commit_log.open()
return self
def __exit__(self, *args, **kwargs):
self.commit_log.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment