Last active
October 5, 2015 06:25
-
-
Save pluser/4f50d7d13f4d03415ce0 to your computer and use it in GitHub Desktop.
A Fuse mount script whitch provide readability to huge number of zip files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# The MIT License (MIT) | |
# | |
# Copyright (c) 2015 Kaoru Esashika | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
import os | |
import re | |
import logging | |
import argparse | |
import stat as filestat | |
import zipfile | |
import tempfile | |
import fuse | |
class Zip(object): | |
LOGGER = logging.getLogger('Zip') | |
def __init__(self, filename): | |
self.zipfilename = filename | |
if not os.path.exists(self.zipfilename): | |
raise FileNotFoundError | |
with zipfile.ZipFile(self.zipfilename) as zarc: | |
self.info = {info.filename: info for info in zarc.infolist()} | |
def exists(self, path): | |
return path in self.info.keys() | |
def isdir(self, path): | |
LOGGER = self.LOGGER.getChild('isdir') | |
result = path in self.dirlist() or path + '/' in self.dirlist() | |
LOGGER.debug('{} is directory? {}'.format(path, str(result))) | |
return result | |
def isfile(self, path): | |
LOGGER = self.LOGGER.getChild('isfile') | |
result = path in self.filelist() | |
LOGGER.debug('{} is file? {}'.format(path, str(result))) | |
return result | |
def dirlist(self): | |
dirs = set() | |
for name in self.info.keys(): | |
path = name.rsplit('/', maxsplit=1) | |
if len(path) == 1: | |
continue | |
parent_dir = str() | |
for frag in path[0].split('/'): | |
jdir = os.path.join(parent_dir, frag) | |
dirs.add(jdir + '/') | |
parent_dir = jdir | |
return list(dirs) | |
def filelist(self): | |
return list((name for name in self.info.keys() if not name.endswith('/'))) | |
def entries(self): | |
return list((name for name in self.info.keys())) | |
def open(self, path): | |
LOGGER = self.LOGGER.getChild('open') | |
tmp = tempfile.TemporaryFile() | |
with zipfile.ZipFile(self.zipfilename) as zarc: | |
with zarc.open(path) as f: | |
LOGGER.debug('Extracting zip file... path: {}'.format(path)) | |
tmp.write(f.read()) | |
tmp.seek(0) | |
return tmp | |
class Arcfuse(fuse.Operations): | |
LOGGER = logging.getLogger('Arcfuse') | |
def __init__(self, root): | |
LOGGER = self.LOGGER.getChild('init') | |
self.real_root = os.path.realpath(root) | |
self.zipfiles = dict() | |
for dirpath, dirnames, filenames in os.walk(self.real_root): | |
for filename in filenames: | |
if os.path.splitext(filename)[1] == '.zip': | |
try: | |
self.zipfiles[os.path.join(dirpath, filename)] = Zip(os.path.join(dirpath, filename)) | |
except (zipfile.BadZipFile, zipfile.LargeZipFile): | |
LOGGER.error('Bad zipfile found: {}'.format(os.path.join(dirpath, filename))) | |
continue | |
LOGGER.info('Completed initialize.') | |
def __call__(self, op, path, *args): | |
LOGGER = self.LOGGER.getChild('call') | |
LOGGER.log(3, 'Arcfuse object is called. op: {}, path: {}, args: {}'.format(op, path, args)) | |
return super().__call__(op, path, *args) | |
def fullpath(self, path): | |
return os.path.join(self.real_root, path[1:] if path.startswith('/') else path) | |
def relpath(self, path): | |
return path[1:] | |
def stat_to_entry(self, st): | |
entry = dict() | |
entry['st_atime'] = st.st_atime | |
entry['st_ctime'] = st.st_ctime | |
entry['st_gid'] = st.st_gid | |
entry['st_mtime'] = st.st_mtime | |
entry['st_mode'] = st.st_mode | |
entry['st_nlink'] = st.st_nlink | |
entry['st_size'] = st.st_size | |
entry['st_uid'] = st.st_uid | |
return entry | |
def getattr(self, path, fh=None): | |
LOGGER = self.LOGGER.getChild('getattr') | |
if os.path.exists(self.fullpath(path)): | |
LOGGER.debug('File exists. file: {}'.format(self.fullpath(path))) | |
entry = self.stat_to_entry(os.lstat(self.fullpath(path))) | |
if self.fullpath(path) in self.zipfiles.keys(): | |
LOGGER.debug('This is exactly a zip file. file: {}'.format(self.fullpath(path))) | |
entry['st_mode'] = filestat.S_IFDIR | 0o755 | |
elif '.zip/' in path: | |
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path)) | |
entry = self.stat_to_entry(os.lstat(self.fullpath(match.group(1)))) | |
if self.zipfiles[self.fullpath(match.group(1))].isdir(match.group(2)): | |
LOGGER.debug('This is a directory in zip file. dir: {}'.format(self.fullpath(path))) | |
# zinfo = self.zipfiles[self.fullpath(match.group(1))].info[match.group(2) + '/'] | |
entry['st_size'] = 0 | |
entry['st_mode'] = filestat.S_IFDIR | 0o755 | |
elif self.zipfiles[self.fullpath(match.group(1))].isfile(match.group(2)): | |
LOGGER.debug('This is a regular file in zip file. file: {}'.format(self.fullpath(path))) | |
zinfo = self.zipfiles[self.fullpath(match.group(1))].info[match.group(2)] | |
entry['st_size'] = zinfo.file_size | |
else: | |
LOGGER.error('File or directory in zipfile not found. file: {}'.format(self.fullpath(path))) | |
raise FileNotFoundError | |
else: | |
LOGGER.error('File or directory not found. file: {}'.format(self.fullpath(path))) | |
raise FileNotFoundError | |
return entry | |
def readdir(self, path, fh=None): | |
LOGGER = self.LOGGER.getChild('readdir') | |
if self.fullpath(path) in self.zipfiles.keys(): | |
LOGGER.debug('This is a zip file. file: {}'.format(self.fullpath(path))) | |
res = set() | |
for entry in self.zipfiles[self.fullpath(path)].entries(): | |
res.add(entry.split('/')[0]) | |
return ['.', '..'] + list(res) | |
elif '.zip/' in path: | |
LOGGER.debug('This is a directory in a zip file. dir: {}'.format(self.fullpath(path))) | |
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path)) | |
res = set() | |
for entry in self.zipfiles[self.fullpath(match.group(1))].entries(): | |
if entry.startswith(match.group(2)): | |
res.add(entry[len(match.group(2)):].split('/')[1]) | |
res.discard('') | |
return ['.', '..'] + list(res) | |
else: | |
LOGGER.debug('This is a directory in a normal file system. file: {}'.format(self.fullpath(path))) | |
return ['.', '..'] + os.listdir(self.fullpath(path)) | |
def open(self, path, flags): | |
LOGGER = self.LOGGER.getChild('open') | |
if '.zip/' in path: | |
LOGGER.debug('Opening a file in a zip file. file: {}'.format(self.fullpath(path))) | |
return -1 | |
elif os.path.exists(self.fullpath(path)): | |
LOGGER.debug('Opening a normal file in a normal file system. file: {}'.format(self.fullpath(path))) | |
return os.open(self.fullpath(path), flags) | |
else: | |
LOGGER.warning('Opening a file which is not exists. file: {}'.format(self.fullpath(path))) | |
raise FileNotFoundError | |
def read(self, path, size, offset, fh): | |
LOGGER = self.LOGGER.getChild('read') | |
if '.zip/' in path: | |
LOGGER.debug('Reading a file in zip file. file: {}'.format(self.fullpath(path))) | |
match = re.search(r'(.*\.zip)/(.*)', self.relpath(path)) | |
f = self.zipfiles[self.fullpath(match.group(1))].open(match.group(2)) # reopen beause of c impl | |
fh = f.fileno() | |
else: | |
LOGGER.debug('Reading a normal file in normal filesytem. file: {}'.format(self.fullpath(path))) | |
os.lseek(fh, offset, 0) | |
return os.read(fh, size) | |
class Arcfuse_discotinue(fuse.Operations, fuse.LoggingMixIn): | |
LOGGER = logging.getLogger('Arcfuse') | |
def __init__(self, root): | |
self.real_root = os.path.realpath(root) | |
def __call__(self, op, path, *args): | |
self.LOGGER.debug('called:' + op + ':' + path) | |
return super().__call__(op, self.real_root + path, *args) | |
def _getvpath(self, fakepath): | |
return fakepath[1:] | |
def _getrpath(self, fakepath): | |
return os.path.join(self.real_root, self._getvpath(fakepath)) | |
def _zipfile(self, path): | |
print('zipfile?:' + path) | |
print(path.endswith('.zip')) | |
return True if path.endswith('.zip') else False | |
def _zipped(self, path): | |
print('zipped?:' + path) | |
print('.zip' in path) | |
return True if '.zip' in path else False | |
def _zipcontainer(self, path): | |
match = re.search(r'^(.*\.zip)(?:/(.*))?$', path) | |
if match: | |
return match.groups() | |
else: | |
return None | |
def _split_path_zipfile(self, path): | |
body = os.path.dirname(path) | |
if not body: | |
return None | |
if os.path.exists(body) and os.path.isfile(body) and self._iszipfile(body): | |
return (body, os.path.basename(path)) | |
else: | |
res = self._split_path_zipfile(body) | |
if res is not None: | |
return (res[0], res[1] + os.path.basename(path)) | |
def getattr(self, path, fh=None): | |
self.LOGGER.debug('getattr:' + path) | |
entry = dict() | |
if os.path.exists(path): | |
st = os.lstat(path) | |
entry['st_atime'] = st.st_atime | |
entry['st_ctime'] = st.st_ctime | |
entry['st_gid'] = st.st_gid | |
entry['st_mtime'] = st.st_mtime | |
entry['st_mode'] = st.st_mode | |
entry['st_nlink'] = st.st_nlink | |
entry['st_size'] = st.st_size | |
entry['st_uid'] = st.st_uid | |
if self._zipfile(path): | |
entry['st_mode'] = filestat.S_IFDIR | 0o755 | |
elif self._zipped(path): | |
zpath = self._zipcontainer(path) | |
self.LOGGER.debug('zipcontainer:' + zpath[0]) | |
zfd = zipfile.ZipFile(zpath[0]) | |
zinfo = zfd.getinfo(zpath[1]) | |
print(dir(zinfo)) | |
return entry | |
def readdir(self, path, fh): | |
if self._zipfile(path): | |
print('reached1') | |
zfd = zipfile.ZipFile(path) | |
return ['.', '..'] + zfd.namelist() | |
elif self._zipped(path): | |
print('reached2') | |
return ['.', '..'] + os.listdir(path) | |
def normalize_options(options): | |
d = dict() | |
for o in options: | |
if o == 'rw': | |
raise NotImplementedError('Writing is not supported. Sorry.') | |
elif o == 'ro': | |
pass | |
elif '=' in o: | |
k, v = o.split('=', maxsplit=1) | |
d[k] = v | |
else: | |
d[o] = True | |
return d | |
def parse_args(): | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument('source', type=str) | |
argparser.add_argument('mountpoint', type=str) | |
argparser.add_argument('--debug', action='store_true') | |
argparser.add_argument('-o', type=str, nargs='*') | |
cmdargs = argparser.parse_args() | |
cmdargs.options = normalize_options(cmdargs.o) if cmdargs.o else dict() | |
return cmdargs | |
if __name__ == '__main__': | |
cmdargs = parse_args() | |
if cmdargs.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
arcfuse = Arcfuse(cmdargs.source) | |
fuse.FUSE(arcfuse, cmdargs.mountpoint, foreground=cmdargs.debug, ro=True, **cmdargs.options) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment