Skip to content

Instantly share code, notes, and snippets.



Last active Aug 29, 2015
What would you like to do?
import os, shutil
import luigi
import sparkey
import random
class SparkeyTarget(luigi.Target):
def __init__(self, path=None, spi='data.spi', spl='data.spl', writer_cls=sparkey.HashWriter, reader_cls=sparkey.HashReader):
self.path = path
self.spi_path = spi
self.spl_path = spl
self.reader_cls = reader_cls
self.writer_cls = writer_cls
def open(self, mode='r'):
if mode == 'r':
return self.reader_cls(self.spi_path, self.spl_path)
elif mode == 'w':
dir_path = self.path
if dir_path:
assert not os.path.exists(dir_path)
dir_tmp_path = dir_path + '-luigi-tmp-%09d' % random.randrange(0, 1e10)
spi_tmp_path = os.path.join(dir_tmp_path, self.spi_path)
spl_tmp_path = os.path.join(dir_tmp_path, self.spl_path)
spi_tmp_path = self.spi_path + '-luigi-tmp-%09d' % random.randrange(0, 1e10)
spl_tmp_path = self.spl_path + '-luigi-tmp-%09d' % random.randrange(0, 1e10)
for p in [spi_tmp_path, spl_tmp_path]:
normpath = os.path.normpath(p)
parentfolder = os.path.dirname(normpath)
if parentfolder and not os.path.exists(parentfolder):
class MyWriter(self.writer_cls): # TODO: inherit __doc__ etc
def close(self):
super(MyWriter, self).close()
# move to final destination
if dir_path:
# Rename directory
os.rename(dir_tmp_path, dir_path)
# Note: not fully atomic
os.path.rename(spi_tmp_path, spi_path)
os.path.rename(spl_tmp_path, spl_path)
def __del__(self):
if spi_tmp_path and os.path.exists(spi_tmp_path): os.remove(spi_tmp_path)
if spl_tmp_path and os.path.exists(spl_tmp_path): os.remove(spl_tmp_path)
if dir_tmp_path and os.path.exists(dir_tmp_path): os.rmdir(dir_tmp_path)
super(MyWriter, self).__del__()
return MyWriter(spi_tmp_path, spl_tmp_path)
def exists(self):
return os.path.exists(self.path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment