Skip to content

Instantly share code, notes, and snippets.

@bbengfort
Created May 23, 2013 19:14
Show Gist options
  • Save bbengfort/5638664 to your computer and use it in GitHub Desktop.
Save bbengfort/5638664 to your computer and use it in GitHub Desktop.
A chunker that appends data to file buckets at a specified directory, such that no file exceeds the size specified. It will append the data to the first file that has enough space available, if none do, then it appends the data to a new file. Note that an exception is raised if the data is bigger than the maximum file size. To test it with rando…
import os
import sys
DEFAULT_SIZE = 128 * 1024 * 1024 #128 MB
class Bucket(object):
"""
Handles the writing of a stream into a bucket.
Pass a path to a directory which holds the bucket, and
the chunk size required for each file in the bucket.
"""
def __init__(self, path, size=DEFAULT_SIZE):
path = os.path.expandvars(path)
path = os.path.expanduser(path)
path = os.path.abspath(path)
if not os.path.exists(path):
os.makedirs(path, 0755)
elif os.path.isfile(path):
raise TypeError("Please specify the path to a directory, not a file")
self.path = path
self.size = size
def append(self, data, debug=False):
dsize = sys.getsizeof(data)
if dsize > self.size:
raise ValueError("Size of data is greater than bucket size.")
outpath = None
for path in self:
if os.path.getsize(path) + dsize <= self.size:
outpath = path
break
if not outpath:
outpath = self.new_slice()
with open(outpath, 'a+') as outfile:
if debug:
print "Appending %i bytes of data to %s" % (dsize, outpath)
outfile.write(data)
def new_slice(self):
current = 0
for name in os.listdir(self.path):
name = name.replace("data", "")
if int(name) > current:
current = int(name)
current += 1
return os.path.join(self.path, "data%i" % current)
def __iter__(self):
for name in os.listdir(self.path):
fpath = os.path.join(self.path, name)
if os.path.isfile(fpath):
yield fpath
def __len__(self):
count = 0
for name in self:
count += 1
return count
def __str__(self):
return "<Bucket at %s with %i chunks of less than %i bytes>" % (self.path, len(self), self.size)
import random
import string
import bucket
import time
def generate(length=300):
chars = string.letters + string.digits + string.punctuation
return ''.join(random.choice(chars) for x in range(length))
def random_length(low=1024, high=4096):
return random.randint(low, high)
if __name__ == "__main__":
manager = bucket.Bucket(path="~/Desktop/RandomBucket", size=10240)
while True:
try:
manager.append(generate(random_length()), debug=True)
time.sleep(0.5)
except KeyboardInterrupt:
print "Finished!"
print str(manager)
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment