Skip to content

Instantly share code, notes, and snippets.

@dmoney
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmoney/a1cae71ffc791821c925 to your computer and use it in GitHub Desktop.
Save dmoney/a1cae71ffc791821c925 to your computer and use it in GitHub Desktop.
'''Streaming pickle implementation for efficiently serializing and
de-serializing an iterable (e.g., list)
Created on 2010-06-19 by Philip Guo
Mostly rewritten 2015-01-16 by Dustin King for Python 3.4
Not backwards compatible.
2.7 version here: https://archive.is/5RZ24
'''
from pickle import dumps, loads
def writeByteArray(byteArray, binaryFile):
'''Write a bytearray or bytes object to a file,
escaping so that multiple bytearray's can be writen.
filecontents -> array contents
\\ -> \
\\n -> \n
\n -> end of byte array
'''
for byte in byteArray:
if byte == b'\\'[0]:
binaryFile.write(b'\\\\')
elif byte == b'\n'[0]:
binaryFile.write(b'\\\n')
else:
binaryFile.write(bytes([byte]))
binaryFile.write(b'\n')
def writeByteArrayStream(byteArrays, binaryFile):
for barray in byteArrays:
writeByteArray(barray, binaryFile)
def readByteArrayStream(binaryFile):
f = binaryFile
buf = bytearray()
byte = f.read(1)
while byte != b'':
if byte == b'\\':
byte = f.read(1)
if byte == b'\\':
buf.append(b'\\'[0])
elif byte == b'\n':
buf.append(b'\n'[0])
else:
raise Exception('unexpected byte: ' + str(byte))
elif byte == b'\n':
yield bytes(buf)
buf = bytearray()
else:
buf.append(byte[0])
byte = f.read(1)
def pickleIterable(iterable):
for item in iterable:
yield dumps(item)
def s_dump(iterable_to_pickle, file_obj):
'''dump contents of an iterable iterable_to_pickle to file_obj, a file
opened in write mode'''
writeByteArrayStream(pickleIterable(iterable_to_pickle), file_obj)
def s_dump_elt(elt, file_obj):
writeByteArray(dumps(elt), file_obj)
def s_load(file_obj):
'''load contents from file_obj, returning a generator that yields one
element at a time'''
for barray in readByteArrayStream(file_obj):
yield loads(barray)
import unittest
import sPickle
import os
import datetime
class TestSPickle(unittest.TestCase):
def setUp(self):
self.testfn='sPickle.testfile'
def tearDown(self):
if os.path.exists(self.testfn):
os.remove(self.testfn)
def test_empty(self):
with open(self.testfn, 'wb') as f:
sPickle.s_dump([], f)
with open(self.testfn, 'rb') as f:
for elt in sPickle.s_load(f):
self.fail('found element for stream that should be empty: ' + str(elt))
def _assertArray(self, expected, actual):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(expected[i], actual[i])
def _assertArrayFloats(self, expected, actual, delta):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(expected[i], actual[i], delta)
def _dump(self, iterable):
with open(self.testfn, 'wb') as f:
sPickle.s_dump(iterable, f)
def _load(self):
with open(self.testfn, 'rb') as f:
return list(sPickle.s_load(f))
def test_ints(self):
ints = [-4, -3, -2, -1, 0, 1, 2, 3, 4]
self._dump(ints)
self._assertArray(ints, self._load())
def test_floats(self):
floats = [-3.0, -2.5, -1.0, 0.0, 1.0, 2.5, 3.0]
self._dump(floats)
self._assertArrayFloats(floats, self._load(), .001)
def test_strings(self):
strings=['a', 'bCD', 'EF', 'G']
self._dump(strings)
self._assertArray(strings, self._load())
def test_stringsWithNewlines(self):
strings=['\n', 'a\n', 'b\nCD', '\nEF', '\nG\n']
self._dump(strings)
self._assertArray(strings, self._load())
def test_dict(self):
d = {'int_field': 1,
'float_field': 2.3,
'string_field': 'hello',
'date_field': datetime.datetime(2015, 1, 16, 19, 44, 0)}
self._dump([d])
self._assertArray([d], self._load())
def test_writeByteArrayStream(self):
with open(self.testfn, 'wb') as f:
sPickle.writeByteArrayStream([b'abc\\\n'], f)
with open(self.testfn, 'rb') as f:
b = f.read()
self.assertEqual(b'abc\\\\\\\n\n', b)
def test_readByteArrayStream(self):
with open(self.testfn, 'wb') as f:
f.write(b'abc\\\\\\\n\n')
with open(self.testfn, 'rb') as f:
lst = list(sPickle.readByteArrayStream(f))
self._assertArray([b'abc\\\n'], lst)
def test_readByteArrayStream_empty(self):
with open(self.testfn, 'wb') as f:
pass
with open(self.testfn, 'rb') as f:
lst = list(sPickle.readByteArrayStream(f))
self._assertArray([], lst)
def test_readByteArrayStream_oneElement(self):
with open(self.testfn, 'wb') as f:
f.write(b'a\n')
with open(self.testfn, 'rb') as f:
lst = list(sPickle.readByteArrayStream(f))
self._assertArray([b'a'], lst)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment