Skip to content

Instantly share code, notes, and snippets.

@arnaudcordier
Last active August 29, 2015 14:08
Show Gist options
  • Save arnaudcordier/385a86e283194f7ef2bd to your computer and use it in GitHub Desktop.
Save arnaudcordier/385a86e283194f7ef2bd to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Manage files and the underlying tree of directories
you can use it to manage a lot of files with a «distributed» amount of files per directory
Usage :
# import
from hiddenFileSystem import HiddenFileSystem
# Instanciate with a directory name :
base_directory = "myDocs"
fs = HiddenFileSystem(base_directory)
# Write
file_name = "myFile.txt"
content = "Oh my lord, it works\n"
fs.write(file_name, content)
print "Path of", file_name, fs.get_path(file_name)
# read
content = fs.read(file_name)
print "content of", file_name, content,
# move an existing file
newFileName = 'test.txt'
with open(newFileName, 'w', encoding='utf-8') as newFile:
newFile.write(content)
print "Path of", newFileName, fs.move(newFileName)
# iterate over the files
for name in fs.files():
print name
# delete
fs.delete(file_name)
fs.delete(newFileName)
You can open a file, but you have to close it !
Written by ArnAud, licence MIT
"""
import hashlib
import os
import errno
from codecs import open
class HiddenFileSystem():
def __init__(self, base_dir, depth=3):
if os.path.isdir(base_dir):
self.base_dir = base_dir
else:
print "%s is not a valid directory" % (base_dir)
raise OSError
self.depth = depth
# returns a file handle of the file
def open(self, file_name, mode='r', encoding='utf8', create_path=False):
path = self._get_path(file_name, create_path=create_path)
file_path = os.path.join(path, file_name)
fh = open(file_path, mode, encoding=encoding)
return fh
# returns the content of the file
def read(self, file_name, encoding='utf8'):
fh = self.open(file_name, encoding=encoding)
content = fh.read()
fh.close()
return content
# write content to the file
def write(self, file_name, content, encoding='utf8'):
fh = self.open(file_name, 'w', encoding=encoding, create_path=True)
if type(content) is str:
content = content.decode(encoding)
fh.write(content)
fh.close()
return True
# delete the file
def delete(self, file_name):
path = self._get_path(file_name)
if not path:
return
file_path = os.path.join(path, file_name)
os.remove(file_path)
# remove the empty directories
delpath = path
while delpath != self.base_dir:
try:
os.rmdir(delpath)
except OSError:
break
delpath, _ = os.path.split(delpath)
# move an existing file into the file system
def move(self, file_path):
if not os.path.isfile(file_path):
raise OSError
_, file_name = os.path.split(file_path)
new_path = os.path.join(self._get_path(file_name, create_path=True), file_name)
os.rename(file_path, new_path)
return new_path
# returns file path
def get_path(self, file_name):
path = self._get_path(file_name, True)
if not path:
return None
file_path = os.path.join(path, file_name)
return file_path
# iterator over the files names
def files(self, full_path=False):
for path, _, files in os.walk(self.base_dir):
for name in files:
if full_path:
name = os.path.join(path, name)
yield name
# iterator over the files names (recusive version)
def filesr(self, full_path=False, dirname=None):
if dirname is None:
dirname = self.base_dir
for name in os.listdir(dirname):
pathname = os.path.join(dirname, name)
if os.path.isdir(pathname):
names = self.files(full_path, pathname)
for name in names:
yield name
else:
if full_path:
yield pathname
else:
yield name
# iterator over the files names (iterative version)
def filesi(self, full_path=False):
names = [self.base_dir]
while names:
name = names.pop()
if os.path.isdir(name):
for n in os.listdir(name):
names.append(os.path.join(name, n))
else:
if full_path:
yield name
else:
_, file_name = os.path.split(name)
yield file_name
# returns the path to the file
def _get_path(self, file_name, only_existing=False, create_path=False):
path = self.base_dir
hash = hashlib.md5(file_name).hexdigest()[0:self.depth]
for dir_name in list(hash):
path = os.path.join(path, dir_name)
if only_existing and not os.path.isfile(os.path.join(path, file_name)):
return None
if create_path:
self._create_path(path)
return path
# create the directory structure
def _create_path(self, path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
# find the depth of a given directory
@staticmethod
def find_depth(base_dir):
depth = -1
while os.path.isdir(base_dir):
base_dir = os.path.join(base_dir, os.listdir(base_dir)[0])
depth += 1
if os.path.isdir(base_dir) and not os.listdir(base_dir):
depth += 1
break
if depth < 1:
print "%s is not a valid directory" % (base_dir)
raise OSError
return depth
# migrate to fs of another depth
@staticmethod
def migrate(base_dir, depth):
old_depth = HiddenFileSystem.find_depth(base_dir)
if old_depth == depth:
return
base_dir = os.path.dirname(base_dir + os.sep) # strip /
new_rep = base_dir + "-" + str(depth)
old_fs = HiddenFileSystem(base_dir, old_depth)
old_fs._create_path(new_rep)
new_fs = HiddenFileSystem(new_rep, depth)
for name in old_fs.files(True):
new_fs.move(name)
return new_rep
# show stats about depth and number of files
@staticmethod
def statistic(total_files=19, not_enough=100, too_mutch=20000):
for nfiles in [10**j for j in xrange(4, total_files)]:
f = "Number of files :{: %d,}" % (total_files+1+(total_files/3))
print f.format(nfiles)
depth = 0
while True:
depth += 1
ndir = 16**depth
file_per_dir = nfiles / ndir
if file_per_dir > too_mutch:
continue
if file_per_dir < not_enough:
break
f = "\tDepth:{: 3}, number of dir:{: %d}, files per dir: {: %d}" % (total_files-2, len(str(too_mutch))+1)
print f.format(depth, ndir, file_per_dir)
@staticmethod
def monitor(base_dir):
print '********** Hidden File System monitoring **********'
total_file = 0
total_dir = 0
nb_dir_of_nFiles = {}
for _, _, files in os.walk(base_dir):
nFiles = len(files)
if nFiles not in nb_dir_of_nFiles.keys():
nb_dir_of_nFiles[nFiles] = 0
nb_dir_of_nFiles[nFiles] += 1
total_file += nFiles
total_dir += 1
print '%s directories, %s files, depth %d in %s' % (total_dir, total_file, HiddenFileSystem.find_depth(base_dir), base_dir)
dir_sizes = [k for k in nb_dir_of_nFiles.keys()]
dir_sizes.sort()
print '\n'.join(['%s directories of %s files' % (nb_dir_of_nFiles[nfile], nfile) for nfile in dir_sizes])
print '********** ***************************** **********'
if __name__ == '__main__':
import sys
import getopt
optlist, args = getopt.getopt(sys.argv[1:], 'mMs')
opt = [o[0] for o in optlist]
show_usage = False
# make sure argument is a directory
is_dir = False
if len(args) > 0:
base_dir = args.pop(0)
is_dir = os.path.isdir(base_dir)
# show help to choose depth
if '-s' in opt:
print "What Depth should you use:"
HiddenFileSystem.statistic()
# monitor a given directory
elif '-m' in opt and is_dir:
HiddenFileSystem.monitor(base_dir)
# migrate to another depth
elif '-M' in opt and is_dir and len(args) > 0:
depth = args[0]
if not depth.isdigit():
print "Migration: depth should be an integer"
show_usage = True
else:
new_dir = HiddenFileSystem.migrate(base_dir, int(depth))
HiddenFileSystem.monitor(new_dir)
# information about a file
elif is_dir and len(args) > 0:
file_name = args[0]
fs = HiddenFileSystem(base_dir, HiddenFileSystem.find_depth(base_dir))
file_path = fs.get_path(file_name)
if not file_path:
print "Your file does not exist."
path = fs._get_path(file_name)
file_path = os.path.join(path, file_name)
print "But it would be", file_path
else:
print file_path
# help
else:
show_usage = True
if show_usage:
print "Usages:"
print " HiddenFileSystem -s => show help to choose depth"
print " HiddenFileSystem -m base_dir => monitor an existing HFS"
print " HiddenFileSystem -M base_dir newdepth => migrate base_dir HFS to another depth"
print " HiddenFileSystem base_dir file_name => show information about file_name in base_dir HFS"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment