Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created April 7, 2009 14:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Arachnid/91254 to your computer and use it in GitHub Desktop.
Save Arachnid/91254 to your computer and use it in GitHub Desktop.
import logging
import os
import StringIO
from google.appengine.ext import db
BLOCK_NAME_FMT = '#%.8X'
class Inode(db.Model):
DEFAULT_BLOCK_SIZE = 524288
size = db.IntegerProperty(required=True, default=0)
block_size = db.IntegerProperty(required=True, default=DEFAULT_BLOCK_SIZE)
reference_count = db.IntegerProperty(required=True, default=0)
created = db.DateTimeProperty(required=True, auto_now_add=True)
modified = db.DateTimeProperty(required=True, auto_now=True)
@classmethod
def kind(cls):
return "DatastoreFile.Inode"
def get_block_key(self, block_num):
return db.Key.from_path(DataBlock.kind(),
BLOCK_NAME_FMT % block_num, parent=self.key())
def get_block(self, block_num):
return DataBlock.get(self.get_block_key(block_num))
def open(self, *args, **kwargs):
if not self.is_saved():
raise db.NotSavedError()
return DatastoreFile(self, *args, **kwargs)
class DataBlock(db.Model):
data = db.BlobProperty()
@classmethod
def kind(cls):
return "DatastoreFile.DataBlock"
class DatastoreFile(object):
def __init__(self, inode, mode="r", max_uncommitted=None):
self._inode = inode
self._mode = mode
self._pos = 0
if max_uncommitted is None:
self._max_uncommitted = self._inode.block_size
elif max_uncommitted > self._inode.block_size:
raise ValueError("max_uncommitted must be <= block size")
self._current = None
self._dirty = False
self._previous = None
def close(self):
self.flush()
self._inode = None
def _flush_blocks(self, blocks):
blocknums, blockdata = zip(*blocks)
keys = [self._inode.get_block_key(x) for x in blocknums]
keys.append(self._inode.key())
def _do_flush():
entities = db.get(keys)
inode = entities.pop()
for i in range(len(entities)):
if entities[i] is None:
block_name = BLOCK_NAME_FMT % blocknums[i]
entities[i] = DataBlock(key_name=block_name, parent=inode)
entities[i].data = blockdata[i].getvalue()
if self._pos > inode.size:
inode.size = self._pos
entities.append(inode)
db.put(entities)
return inode
new_inode = db.run_in_transaction(_do_flush)
self._inode.size = new_inode.size
self._inode.modified = new_inode.modified
def flush(self):
if self._current:
block_num = self._pos / self._inode.block_size
to_flush = []
if self._previous:
to_flush.append((block_num - 1, self._previous))
if self._dirty:
to_flush.append((block_num, self._current))
if to_flush:
self._flush_blocks(to_flush)
self._previous = None
self._dirty = False
def _load_current(self):
if not self._current:
block_num = self._pos / self._inode.block_size
block = self._inode.get_block(block_num)
if block:
self._current = StringIO.StringIO(block.data)
self._current.seek(self._pos - block_num * self._inode.block_size)
else:
this_block_size = min(self._pos - block_num * self._inode.block_size,
self._inode.block_size)
self._current = StringIO.StringIO('\0' * this_block_size)
def _advance(self):
if self._previous:
# Flush previous block, and current if it's dirty
# We don't simply call flush() because at this point, the state of
# _current and _previous isn't yet updated to reflect _pos.
block_num = (self._pos - 1) / self._inode.block_size
to_flush = [(block_num - 1, self._previous)]
if self._dirty:
to_flush.append((block_num, self._current))
self._flush_blocks(to_flush)
self._previous = None
self._dirty = False
elif self._dirty:
self._previous = self._current
self._dirty = False
assert self._pos % self._inode.block_size == 0
self._current = None
def next(self):
return self.readline()
def read(self, size=None):
self._load_current()
ret = part = self._current.read(size)
self._pos += len(part)
if size:
size -= len(part)
while self._pos < self._inode.size and (size is None or size > 0):
if part == "":
self._advance()
self._load_current()
part = self._current.read(size)
self._pos += len(part)
ret += part
if size:
size -= len(part)
return ret
def readline(self, size=None):
self._load_current()
ret = part = self._current.readline(size)
self._pos += len(part)
if size:
size -= len(part)
while (self._pos < self._inode.size
and (size is None or size > 0)
and ret[-1] != "\n"):
if part == "":
self._advance()
self._load_current()
part = self._current.readline(size)
self._pos += len(part)
ret += part
if size:
size -= len(part)
return ret
def readlines(self, sizehint=None):
line = self.readline(sizehint)
ret = [line]
if sizehint:
sizehint -= len(line)
while line and (sizehint is None or sizehint > 0):
line = self.readline()
ret.append(line)
if sizehint:
sizehint -= len(line)
return ret
def xreadlines(self):
return self
def seek(self, offset, whence=0):
self.flush()
block_num = self._pos / self._inode.block_size
if whence == 0: # os.SEEK_SET
self._pos = offset
elif whence == 1: # os.SEEK_CUR
self._pos += offset
elif whence == 2: # os.SEEK_END
self._pos = self._inode.size - offset
new_block_num = self._pos / self._inode.block_size
if block_num == new_block_num and self._current:
self._current.seek(self._pos - block_num * self._inode.block_size, 0)
else:
self._current = None
def tell(self):
return self._pos
def truncate(self, size=None):
raise NotImplementedError()
def write(self, str):
block_num = self._pos / self._inode.block_size
block_end = (block_num + 1) * self._inode.block_size
bytes_left = block_end - self._pos
part, str = str[:bytes_left], str[bytes_left:]
self._load_current()
self._current.write(part)
self._dirty = True
self._pos += len(part)
while str:
part, str = str[:bytes_left], str[bytes_left:]
self._advance()
self._load_current()
self._current.write(part)
self._dirty = True
self._pos += len(part)
def writelines(self, sequence):
for line in sequence:
self.write(line)
def __iter__(self):
return self
def __del__(self):
self.close()
@property
def closed(self):
return self._inode is None
@property
def mode(self):
return self._mode
import os
import datastorefile
import unittest
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import datastore_file_stub
from google.appengine.ext import db
class DatastoreFileTest(unittest.TestCase):
def setUp(self):
os.environ['APPLICATION_ID'] = 'test'
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
datastore = datastore_file_stub.DatastoreFileStub('test', None, None)
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', datastore)
def testCreateFile(self):
test_string = "Hello, world!"
file = datastorefile.Inode()
file.put()
fh = file.open()
fh.write(test_string)
self.failUnlessEqual(fh._pos, len(test_string))
fh.seek(0)
self.failUnlessEqual(fh.read(len(test_string)), test_string)
self.failUnlessEqual(fh._pos, len(test_string))
fh.close()
self.failUnlessEqual(file.size, len(test_string))
self.failIfEqual(file.created, file.modified)
blocks = datastorefile.DataBlock.all().ancestor(file).fetch(100)
self.failUnlessEqual(len(blocks), 1)
self.failUnlessEqual(blocks[0].key().name(), '#00000000')
self.failUnlessEqual(blocks[0].data, test_string)
# Reopen the file and check it still has the same contents
fh = file.open()
fh.seek(1)
self.failUnlessEqual(fh.read(len(test_string)), test_string[1:])
fh.close()
def testMultipleBlocks(self):
file = datastorefile.Inode(block_size=32)
file.put()
fh = file.open()
fh.write('x'*16)
fh.flush()
self.failUnlessEqual(fh._dirty, False)
fh.write('y'*32)
self.failIfEqual(fh._previous, None)
self.failUnlessEqual(fh._previous.getvalue(), 'x'*16+'y'*16)
self.failUnlessEqual(fh._dirty, True)
self.failUnlessEqual(fh._pos, 48)
fh.write('z'*32)
self.failUnlessEqual(fh._previous, None)
self.failUnlessEqual(fh._dirty, True)
self.failUnlessEqual(fh._pos, 80)
fh.close()
self.failUnlessEqual(fh._previous, None)
self.failUnlessEqual(fh._dirty, False)
blocks = datastorefile.DataBlock.all().ancestor(file).fetch(100)
self.failUnlessEqual(len(blocks), 3)
self.failUnlessEqual(blocks[0].key().name(), '#00000000')
self.failUnlessEqual(blocks[1].key().name(), '#00000001')
self.failUnlessEqual(blocks[2].key().name(), '#00000002')
self.failUnlessEqual(blocks[0].data, 'x'*16+'y'*16)
self.failUnlessEqual(blocks[1].data, 'y'*16+'z'*16)
self.failUnlessEqual(blocks[2].data, 'z'*16)
# Reopen and read the whole file
fh = file.open()
self.failUnlessEqual(fh.read(), 'x'*16+'y'*32+'z'*32)
def testReadlines(self):
file = datastorefile.Inode(block_size=32)
file.put()
fh = file.open()
fh.write("This is a test!!\n" * 10)
fh.close()
fh = file.open()
self.failUnlessEqual(fh.readlines(90), ["This is a test!!\n"]*6)
self.failUnlessEqual(fh.readlines(), ["This is a test!!\n"]*4+[''])
def testLongRead(self):
file = datastorefile.Inode(block_size=32)
file.put()
fh = file.open()
fh.write('x'*80+'\n')
fh.close()
fh = file.open()
self.failUnlessEqual(fh.read(65), 'x'*65)
self.failUnlessEqual(fh._pos, 65)
fh = file.open()
self.failUnlessEqual(fh.readline(81), 'x'*80+'\n')
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment