Last active
August 29, 2015 14:23
-
-
Save dmoney/a1cae71ffc791821c925 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Streaming pickle implementation for efficiently serializing and | |
de-serializing an iterable (e.g., list) | |
Created on 2010-06-19 by Philip Guo | |
Mostly rewritten 2015-01-16 by Dustin King for Python 3.4 | |
Not backwards compatible. | |
2.7 version here: https://archive.is/5RZ24 | |
''' | |
from pickle import dumps, loads | |
def writeByteArray(byteArray, binaryFile): | |
'''Write a bytearray or bytes object to a file, | |
escaping so that multiple bytearray's can be writen. | |
filecontents -> array contents | |
\\ -> \ | |
\\n -> \n | |
\n -> end of byte array | |
''' | |
for byte in byteArray: | |
if byte == b'\\'[0]: | |
binaryFile.write(b'\\\\') | |
elif byte == b'\n'[0]: | |
binaryFile.write(b'\\\n') | |
else: | |
binaryFile.write(bytes([byte])) | |
binaryFile.write(b'\n') | |
def writeByteArrayStream(byteArrays, binaryFile): | |
for barray in byteArrays: | |
writeByteArray(barray, binaryFile) | |
def readByteArrayStream(binaryFile): | |
f = binaryFile | |
buf = bytearray() | |
byte = f.read(1) | |
while byte != b'': | |
if byte == b'\\': | |
byte = f.read(1) | |
if byte == b'\\': | |
buf.append(b'\\'[0]) | |
elif byte == b'\n': | |
buf.append(b'\n'[0]) | |
else: | |
raise Exception('unexpected byte: ' + str(byte)) | |
elif byte == b'\n': | |
yield bytes(buf) | |
buf = bytearray() | |
else: | |
buf.append(byte[0]) | |
byte = f.read(1) | |
def pickleIterable(iterable): | |
for item in iterable: | |
yield dumps(item) | |
def s_dump(iterable_to_pickle, file_obj): | |
'''dump contents of an iterable iterable_to_pickle to file_obj, a file | |
opened in write mode''' | |
writeByteArrayStream(pickleIterable(iterable_to_pickle), file_obj) | |
def s_dump_elt(elt, file_obj): | |
writeByteArray(dumps(elt), file_obj) | |
def s_load(file_obj): | |
'''load contents from file_obj, returning a generator that yields one | |
element at a time''' | |
for barray in readByteArrayStream(file_obj): | |
yield loads(barray) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import sPickle | |
import os | |
import datetime | |
class TestSPickle(unittest.TestCase): | |
def setUp(self): | |
self.testfn='sPickle.testfile' | |
def tearDown(self): | |
if os.path.exists(self.testfn): | |
os.remove(self.testfn) | |
def test_empty(self): | |
with open(self.testfn, 'wb') as f: | |
sPickle.s_dump([], f) | |
with open(self.testfn, 'rb') as f: | |
for elt in sPickle.s_load(f): | |
self.fail('found element for stream that should be empty: ' + str(elt)) | |
def _assertArray(self, expected, actual): | |
self.assertEqual(len(expected), len(actual)) | |
for i in range(len(expected)): | |
self.assertEqual(expected[i], actual[i]) | |
def _assertArrayFloats(self, expected, actual, delta): | |
self.assertEqual(len(expected), len(actual)) | |
for i in range(len(expected)): | |
self.assertEqual(expected[i], actual[i], delta) | |
def _dump(self, iterable): | |
with open(self.testfn, 'wb') as f: | |
sPickle.s_dump(iterable, f) | |
def _load(self): | |
with open(self.testfn, 'rb') as f: | |
return list(sPickle.s_load(f)) | |
def test_ints(self): | |
ints = [-4, -3, -2, -1, 0, 1, 2, 3, 4] | |
self._dump(ints) | |
self._assertArray(ints, self._load()) | |
def test_floats(self): | |
floats = [-3.0, -2.5, -1.0, 0.0, 1.0, 2.5, 3.0] | |
self._dump(floats) | |
self._assertArrayFloats(floats, self._load(), .001) | |
def test_strings(self): | |
strings=['a', 'bCD', 'EF', 'G'] | |
self._dump(strings) | |
self._assertArray(strings, self._load()) | |
def test_stringsWithNewlines(self): | |
strings=['\n', 'a\n', 'b\nCD', '\nEF', '\nG\n'] | |
self._dump(strings) | |
self._assertArray(strings, self._load()) | |
def test_dict(self): | |
d = {'int_field': 1, | |
'float_field': 2.3, | |
'string_field': 'hello', | |
'date_field': datetime.datetime(2015, 1, 16, 19, 44, 0)} | |
self._dump([d]) | |
self._assertArray([d], self._load()) | |
def test_writeByteArrayStream(self): | |
with open(self.testfn, 'wb') as f: | |
sPickle.writeByteArrayStream([b'abc\\\n'], f) | |
with open(self.testfn, 'rb') as f: | |
b = f.read() | |
self.assertEqual(b'abc\\\\\\\n\n', b) | |
def test_readByteArrayStream(self): | |
with open(self.testfn, 'wb') as f: | |
f.write(b'abc\\\\\\\n\n') | |
with open(self.testfn, 'rb') as f: | |
lst = list(sPickle.readByteArrayStream(f)) | |
self._assertArray([b'abc\\\n'], lst) | |
def test_readByteArrayStream_empty(self): | |
with open(self.testfn, 'wb') as f: | |
pass | |
with open(self.testfn, 'rb') as f: | |
lst = list(sPickle.readByteArrayStream(f)) | |
self._assertArray([], lst) | |
def test_readByteArrayStream_oneElement(self): | |
with open(self.testfn, 'wb') as f: | |
f.write(b'a\n') | |
with open(self.testfn, 'rb') as f: | |
lst = list(sPickle.readByteArrayStream(f)) | |
self._assertArray([b'a'], lst) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment