Skip to content

Instantly share code, notes, and snippets.

@ps2
Created February 16, 2018 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ps2/9736c15c772e6e5458c46af3a1ecd480 to your computer and use it in GitHub Desktop.
Save ps2/9736c15c772e6e5458c46af3a1ecd480 to your computer and use it in GitHub Desktop.
from datetime import datetime
import os
class TimestampedDiskQueue(object):
def __init__(self, path, filename_prefix, filename_extension='json', date_format="%Y-%m-%d-%H-%M-%S"):
self.path = path
self.filename_prefix = filename_prefix
self.filename_extension = filename_extension
self.date_format = "%Y-%m-%d-%H-%M-%S"
def enqueue(self, data, timestamp):
filename = "%s_%s.%s" % (self.filename_prefix, timestamp.strftime(self.date_format), self.filename_extension)
temp_path = os.path.join(self.path, "." + filename)
final_path = os.path.join(self.path, filename)
with open(temp_path, 'w') as output:
output.write(data)
os.rename(temp_path, final_path)
# Handler takes two arguments: (data, timestamp)
# If handler returns true, then file is removed from queue.
def dequeue(self, handler):
files = [f for f in os.listdir(self.path) if f.startswith(self.filename_prefix + '_')]
if len(files) > 0:
next_file = sorted(files)[0]
timestamp_str = next_file[len(self.filename_prefix)+1:-(len(self.filename_extension)+1)]
timestamp = datetime.strptime(timestamp_str, self.date_format)
data = ""
full_path = os.path.join(self.path, next_file)
with open(full_path, 'r') as f:
data = f.read()
if handler(data, timestamp):
os.remove(full_path)
return True
else
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment