Skip to content

Instantly share code, notes, and snippets.

@darkjh
Last active August 29, 2015 14:05
Show Gist options
  • Save darkjh/6c40bd1aa1c876bbba86 to your computer and use it in GitHub Desktop.
Save darkjh/6c40bd1aa1c876bbba86 to your computer and use it in GitHub Desktop.
from cdf.utils.kvstore import LevelDB
import struct
class LevelDBExternalSort(object):
SEP = '\0'
FMT = '>i'
def __init__(self, tmp_dir=None, **configs):
if tmp_dir is None:
self.tmp_dir = tempfile.mkdtemp()
else:
self.tmp_dir = tmp_dir
self.db = LevelDB(self.tmp_dir)
self.db.open(**configs)
@classmethod
def _pack_elem(cls, elem):
if isinstance(elem, int):
return struct.pack(cls.FMT, elem)
elif isinstance(elem, str):
return elem
else:
raise Exception("Not supported type: {}".format(type(elem)))
@classmethod
def _encode_key(cls, sort_keys):
if isinstance(sort_keys, (tuple, list)):
packed = [cls._pack_elem(elem) for elem in sort_keys]
return cls.SEP.join(packed)
return cls._pack_elem(sort_keys)
def _finalize(self):
self.db.destroy()
def _generate_kv_stream(self, stream, key_func):
for line in stream:
sort_keys = key_func(line)
# generate a kv stream
# key and value are both serialized
# value should be json serializable
yield self._encode_key(sort_keys), json.dumps(line)
def external_sort(self, stream, key):
kv_stream = self._generate_kv_stream(stream, key)
self.db.batch_write(kv_stream)
# expose a generator of values
try:
for _, v in self.db.iterator():
yield json.loads(v)
finally:
self._finalize()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment