Skip to content

Instantly share code, notes, and snippets.

@pluser
Last active October 5, 2015 06:25
Show Gist options
  • Save pluser/4f50d7d13f4d03415ce0 to your computer and use it in GitHub Desktop.
Save pluser/4f50d7d13f4d03415ce0 to your computer and use it in GitHub Desktop.
A Fuse mount script whitch provide readability to huge number of zip files.
#!/usr/bin/env python
# The MIT License (MIT)
#
# Copyright (c) 2015 Kaoru Esashika
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import re
import logging
import argparse
import stat as filestat
import zipfile
import tempfile
import fuse
class Zip(object):
LOGGER = logging.getLogger('Zip')
def __init__(self, filename):
self.zipfilename = filename
if not os.path.exists(self.zipfilename):
raise FileNotFoundError
with zipfile.ZipFile(self.zipfilename) as zarc:
self.info = {info.filename: info for info in zarc.infolist()}
def exists(self, path):
return path in self.info.keys()
def isdir(self, path):
LOGGER = self.LOGGER.getChild('isdir')
result = path in self.dirlist() or path + '/' in self.dirlist()
LOGGER.debug('{} is directory? {}'.format(path, str(result)))
return result
def isfile(self, path):
LOGGER = self.LOGGER.getChild('isfile')
result = path in self.filelist()
LOGGER.debug('{} is file? {}'.format(path, str(result)))
return result
def dirlist(self):
dirs = set()
for name in self.info.keys():
path = name.rsplit('/', maxsplit=1)
if len(path) == 1:
continue
parent_dir = str()
for frag in path[0].split('/'):
jdir = os.path.join(parent_dir, frag)
dirs.add(jdir + '/')
parent_dir = jdir
return list(dirs)
def filelist(self):
return list((name for name in self.info.keys() if not name.endswith('/')))
def entries(self):
return list((name for name in self.info.keys()))
def open(self, path):
LOGGER = self.LOGGER.getChild('open')
tmp = tempfile.TemporaryFile()
with zipfile.ZipFile(self.zipfilename) as zarc:
with zarc.open(path) as f:
LOGGER.debug('Extracting zip file... path: {}'.format(path))
tmp.write(f.read())
tmp.seek(0)
return tmp
class Arcfuse(fuse.Operations):
LOGGER = logging.getLogger('Arcfuse')
def __init__(self, root):
LOGGER = self.LOGGER.getChild('init')
self.real_root = os.path.realpath(root)
self.zipfiles = dict()
for dirpath, dirnames, filenames in os.walk(self.real_root):
for filename in filenames:
if os.path.splitext(filename)[1] == '.zip':
try:
self.zipfiles[os.path.join(dirpath, filename)] = Zip(os.path.join(dirpath, filename))
except (zipfile.BadZipFile, zipfile.LargeZipFile):
LOGGER.error('Bad zipfile found: {}'.format(os.path.join(dirpath, filename)))
continue
LOGGER.info('Completed initialize.')
def __call__(self, op, path, *args):
LOGGER = self.LOGGER.getChild('call')
LOGGER.log(3, 'Arcfuse object is called. op: {}, path: {}, args: {}'.format(op, path, args))
return super().__call__(op, path, *args)
def fullpath(self, path):
return os.path.join(self.real_root, path[1:] if path.startswith('/') else path)
def relpath(self, path):
return path[1:]
def stat_to_entry(self, st):
entry = dict()
entry['st_atime'] = st.st_atime
entry['st_ctime'] = st.st_ctime
entry['st_gid'] = st.st_gid
entry['st_mtime'] = st.st_mtime
entry['st_mode'] = st.st_mode
entry['st_nlink'] = st.st_nlink
entry['st_size'] = st.st_size
entry['st_uid'] = st.st_uid
return entry
def getattr(self, path, fh=None):
LOGGER = self.LOGGER.getChild('getattr')
if os.path.exists(self.fullpath(path)):
LOGGER.debug('File exists. file: {}'.format(self.fullpath(path)))
entry = self.stat_to_entry(os.lstat(self.fullpath(path)))
if self.fullpath(path) in self.zipfiles.keys():
LOGGER.debug('This is exactly a zip file. file: {}'.format(self.fullpath(path)))
entry['st_mode'] = filestat.S_IFDIR | 0o755
elif '.zip/' in path:
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path))
entry = self.stat_to_entry(os.lstat(self.fullpath(match.group(1))))
if self.zipfiles[self.fullpath(match.group(1))].isdir(match.group(2)):
LOGGER.debug('This is a directory in zip file. dir: {}'.format(self.fullpath(path)))
# zinfo = self.zipfiles[self.fullpath(match.group(1))].info[match.group(2) + '/']
entry['st_size'] = 0
entry['st_mode'] = filestat.S_IFDIR | 0o755
elif self.zipfiles[self.fullpath(match.group(1))].isfile(match.group(2)):
LOGGER.debug('This is a regular file in zip file. file: {}'.format(self.fullpath(path)))
zinfo = self.zipfiles[self.fullpath(match.group(1))].info[match.group(2)]
entry['st_size'] = zinfo.file_size
else:
LOGGER.error('File or directory in zipfile not found. file: {}'.format(self.fullpath(path)))
raise FileNotFoundError
else:
LOGGER.error('File or directory not found. file: {}'.format(self.fullpath(path)))
raise FileNotFoundError
return entry
def readdir(self, path, fh=None):
LOGGER = self.LOGGER.getChild('readdir')
if self.fullpath(path) in self.zipfiles.keys():
LOGGER.debug('This is a zip file. file: {}'.format(self.fullpath(path)))
res = set()
for entry in self.zipfiles[self.fullpath(path)].entries():
res.add(entry.split('/')[0])
return ['.', '..'] + list(res)
elif '.zip/' in path:
LOGGER.debug('This is a directory in a zip file. dir: {}'.format(self.fullpath(path)))
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path))
res = set()
for entry in self.zipfiles[self.fullpath(match.group(1))].entries():
if entry.startswith(match.group(2)):
res.add(entry[len(match.group(2)):].split('/')[1])
res.discard('')
return ['.', '..'] + list(res)
else:
LOGGER.debug('This is a directory in a normal file system. file: {}'.format(self.fullpath(path)))
return ['.', '..'] + os.listdir(self.fullpath(path))
def open(self, path, flags):
LOGGER = self.LOGGER.getChild('open')
if '.zip/' in path:
LOGGER.debug('Opening a file in a zip file. file: {}'.format(self.fullpath(path)))
return -1
elif os.path.exists(self.fullpath(path)):
LOGGER.debug('Opening a normal file in a normal file system. file: {}'.format(self.fullpath(path)))
return os.open(self.fullpath(path), flags)
else:
LOGGER.warning('Opening a file which is not exists. file: {}'.format(self.fullpath(path)))
raise FileNotFoundError
def read(self, path, size, offset, fh):
LOGGER = self.LOGGER.getChild('read')
if '.zip/' in path:
LOGGER.debug('Reading a file in zip file. file: {}'.format(self.fullpath(path)))
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path))
f = self.zipfiles[self.fullpath(match.group(1))].open(match.group(2)) # reopen beause of c impl
fh = f.fileno()
else:
LOGGER.debug('Reading a normal file in normal filesytem. file: {}'.format(self.fullpath(path)))
os.lseek(fh, offset, 0)
return os.read(fh, size)
class Arcfuse_discotinue(fuse.Operations, fuse.LoggingMixIn):
LOGGER = logging.getLogger('Arcfuse')
def __init__(self, root):
self.real_root = os.path.realpath(root)
def __call__(self, op, path, *args):
self.LOGGER.debug('called:' + op + ':' + path)
return super().__call__(op, self.real_root + path, *args)
def _getvpath(self, fakepath):
return fakepath[1:]
def _getrpath(self, fakepath):
return os.path.join(self.real_root, self._getvpath(fakepath))
def _zipfile(self, path):
print('zipfile?:' + path)
print(path.endswith('.zip'))
return True if path.endswith('.zip') else False
def _zipped(self, path):
print('zipped?:' + path)
print('.zip' in path)
return True if '.zip' in path else False
def _zipcontainer(self, path):
match = re.search(r'^(.*\.zip)(?:/(.*))?$', path)
if match:
return match.groups()
else:
return None
def _split_path_zipfile(self, path):
body = os.path.dirname(path)
if not body:
return None
if os.path.exists(body) and os.path.isfile(body) and self._iszipfile(body):
return (body, os.path.basename(path))
else:
res = self._split_path_zipfile(body)
if res is not None:
return (res[0], res[1] + os.path.basename(path))
def getattr(self, path, fh=None):
self.LOGGER.debug('getattr:' + path)
entry = dict()
if os.path.exists(path):
st = os.lstat(path)
entry['st_atime'] = st.st_atime
entry['st_ctime'] = st.st_ctime
entry['st_gid'] = st.st_gid
entry['st_mtime'] = st.st_mtime
entry['st_mode'] = st.st_mode
entry['st_nlink'] = st.st_nlink
entry['st_size'] = st.st_size
entry['st_uid'] = st.st_uid
if self._zipfile(path):
entry['st_mode'] = filestat.S_IFDIR | 0o755
elif self._zipped(path):
zpath = self._zipcontainer(path)
self.LOGGER.debug('zipcontainer:' + zpath[0])
zfd = zipfile.ZipFile(zpath[0])
zinfo = zfd.getinfo(zpath[1])
print(dir(zinfo))
return entry
def readdir(self, path, fh):
if self._zipfile(path):
print('reached1')
zfd = zipfile.ZipFile(path)
return ['.', '..'] + zfd.namelist()
elif self._zipped(path):
print('reached2')
return ['.', '..'] + os.listdir(path)
def normalize_options(options):
d = dict()
for o in options:
if o == 'rw':
raise NotImplementedError('Writing is not supported. Sorry.')
elif o == 'ro':
pass
elif '=' in o:
k, v = o.split('=', maxsplit=1)
d[k] = v
else:
d[o] = True
return d
def parse_args():
argparser = argparse.ArgumentParser()
argparser.add_argument('source', type=str)
argparser.add_argument('mountpoint', type=str)
argparser.add_argument('--debug', action='store_true')
argparser.add_argument('-o', type=str, nargs='*')
cmdargs = argparser.parse_args()
cmdargs.options = normalize_options(cmdargs.o) if cmdargs.o else dict()
return cmdargs
if __name__ == '__main__':
cmdargs = parse_args()
if cmdargs.debug:
logging.basicConfig(level=logging.DEBUG)
arcfuse = Arcfuse(cmdargs.source)
fuse.FUSE(arcfuse, cmdargs.mountpoint, foreground=cmdargs.debug, ro=True, **cmdargs.options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment