Skip to content

Instantly share code, notes, and snippets.

@korc
Last active October 28, 2015 17:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save korc/36ebfddf4e60a55a083a to your computer and use it in GitHub Desktop.
Save korc/36ebfddf4e60a55a083a to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import fuse
import os
import stat
import errno
import re
import time
import warnings
fuse.fuse_python_api=(0, 2)
class NoMatchException(ValueError): pass
class PatternIterSM(object):
rstrip=None
history_size=0
def __init__(self, data_source, state="start", end_state=None, states=None):
if states is not None: self.states=states
if not hasattr(self, "states"): raise NotImplementedError("Need to define some states")
self.data_iter=iter(data_source)
self.state_history=[]
self.state=state
self.end_state=end_state
self.next_states=self.states[state][0]
def on_nomatch(self, data):
raise NoMatchException("No match (%s>%s -> %s): %r"%(">".join(self.state_history), self.state, ",".join(self.next_states), data))
def on_excess_data(self, excess_data):
warnings.warn("Excess data after processing %r[%d] state: %r"%(self.state, self.match_index, excess_data))
def __iter__(self):
while True:
try: data=self.data=self.data_iter.next()
except StopIteration: break
if self.rstrip is not None:
data=data.rstrip(self.rstrip)
match=self.match_index=None
for test_state in self.next_states:
for pat_nr, test_pat in enumerate(self.states[test_state][1]):
match=test_pat.match(data)
if match is not None:
self.match=match
self.match_index=pat_nr
break
if match is not None:
break
if match is None:
self.on_nomatch(data)
continue
if self.history_size:
if len(self.state_history)>self.history_size:
self.state_history.pop(0)
self.state_history.append(self.state)
self.state=test_state
ns=self.states[test_state][0]
if ns is not None:
self.next_states=ns
if match.end()<len(data):
self.on_excess_data(data[match.end():])
yield self.match
if self.end_state is not None and self.state==self.end_state:
break
# TODO: support output of: find -ls, unsquashfs -ll
class ListFileSM(PatternIterSM):
"""Supported formats (with env LANG=C.UTF-8):
tar tvf <tarfile> (--numeric-owner recommended)
(adb shell) ls -Rasiln
(linux) ls -Rasiln --time-style=long-iso
"""
crnl_re=re.compile(r'^$')
history_size = 10
rstrip="\r\n"
states=dict(
start=(["tart_line", "ls_start", "crnl"], ),
tart_line=(None, [re.compile(r'^(?: *(?P<inode>\d+) +(?:(?P<blocks>\d+) +)?)?'
r'(?P<type>[hdbcpls-])(?P<mode>[TStsrwx-]{9}) +'
r'(?:(?P<links>\d+) +)?'
r'(?P<user>[^/ ]+)(?:/| +)(?P<group>\S+) +'
r'(?:(?P<node>\d+, *\d+)|(?P<size>\d+))? +'
r'(?P<mtime>\d+-\d+-\d+ \d+:\d+) +'
r'(?:\./)?(?P<fname>.*)$')]),
crnl=(None, [crnl_re]),
ls_start=(["ls_total", "tart_line", "end_of_ls", "ls_noperm", "ls_nostat"], [re.compile(r'^(?:\./)?(?P<prefix>.+):$')]),
ls_total=(["tart_line", "end_of_ls"], [re.compile(r'total (?P<total_count>\d+)$')]),
end_of_ls=(["ls_start"], [crnl_re]),
ls_noperm=(["end_of_ls", "tart_noperm"], [re.compile(r'^opendir failed, Permission denied$')]),
ls_nostat=(["tart_line", "end_of_ls", "tart_noperm"], [re.compile(r'^stat failed on .*: Permission denied$')]),
tart_noperm=(None, [re.compile(r'.*: Permission denied')]),
)
class ListFS(fuse.Fuse):
@property
def dirtree(self):
try: return self._dirtree
except AttributeError: pass
ret=self._dirtree=self.get_dirtree()
return ret
def get_dirtree(self):
ret={}
with open(self.listfile) as f:
fname_prefix=""
for match in ListFileSM(f):
inf=match.groupdict()
if "prefix" in inf:
fname_prefix=inf["prefix"].strip("/")
if fname_prefix==".": fname_prefix=""
p=fname_prefix.split("/")
if fname_prefix: p.append("")
for pp, pf in map(lambda i: ("/".join(p[:i]), p[i]), range(len(p))):
children=ret.setdefault(pp, {"type": "d"}).setdefault("children", [])
if pf and pf not in children:
children.append(pf)
if "fname" not in inf: continue
fname="%s%s%s"%(fname_prefix, "/" if fname_prefix else "", inf.pop("fname"))
if inf["type"]=="d":
fname=fname.rstrip("/")
elif inf["type"]=="l":
try: fname, inf["target"]=fname.split(" -> ", 1)
except ValueError: pass
elif inf["type"]=="h":
fname, linkto=fname.split(" link to ", 1)
inf=ret[linkto]
ret.setdefault(fname, {}).update(inf)
if inf["type"]=="d": ret[fname].setdefault("children", [])
if fname:
parent=fname.rsplit("/", 1)[0] if "/" in fname else ""
ret.setdefault(parent, {"type": "d"}).setdefault("children", []).append(fname.rsplit("/", 1)[-1])
return ret
stat_types={"d": stat.S_IFDIR, "b": stat.S_IFBLK, "c": stat.S_IFCHR, "p": stat.S_IFIFO, "l": stat.S_IFLNK,
"-": stat.S_IFREG, "s": stat.S_IFSOCK}
def_stats=dict(map(lambda n: (n, 0), filter(lambda n: n.startswith("st_"), dir(os.lstat("/")))))
@staticmethod
def mode2int(mode):
ret=0
for idx, xbit in enumerate((stat.S_ISUID, stat.S_ISGID, stat.S_ISVTX)):
char_pos=idx*3+2
if mode[char_pos].lower() in ("s", "t"):
ret|=xbit
mode="%s%s%s"%(mode[:char_pos], "x" if mode[char_pos].islower() else "-", mode[char_pos+1:])
ret|=int(mode.replace("-", "0").replace("r", "1").replace("w", "1").replace("x", "1"), 2)
return ret
def inf2stat(self, inf):
st=type("InfStat", (object,), self.def_stats.copy())()
st.st_mode=self.stat_types[inf["type"]]
if "mode" in inf: st.st_mode|=self.mode2int(inf["mode"])
if "size" in inf and inf["size"] is not None: st.st_size=int(inf["size"])
if "mtime" in inf: st.st_mtime=time.mktime(time.strptime(inf["mtime"], "%Y-%m-%d %H:%M"))
if inf.get("user"):
uid=inf["user"]
if uid.isdigit(): st.st_uid=int(uid)
if inf.get("group"):
gid=inf["group"]
if gid.isdigit(): st.st_gid=int(gid)
if inf.get("links"): st.st_nlink=int(inf["links"])
if inf.get("inode"): st.st_ino=int(inf["inode"])
if inf.get("blocks"): st.st_blocks=int(inf["blocks"])*2
return st
def getattr(self, path):
try: inf=self.dirtree[path.lstrip("/")]
except KeyError: raise OSError(errno.ENOENT, "No such file or directory: %r"%(path,))
return self.inf2stat(inf)
def readdir(self, path, offset):
dir_entry=self.dirtree[path.lstrip("/")]
yield fuse.Direntry(".")
yield fuse.Direntry("..")
for child in dir_entry["children"]:
yield fuse.Direntry(child)
def readlink(self, path):
return self.dirtree[path.lstrip("/")]["target"]
if __name__ == '__main__':
srv=ListFS()
srv.parser.add_option("--listfile", help="file containing output of 'env LANG=C.UTF-8 tar tv'")
srv.parse()
if not srv.parser.fuse_args.modifiers["showhelp"]:
if not srv.parser.values.listfile:
raise RuntimeError("--listfile option required, see -h for help")
srv.listfile=os.path.realpath(srv.parser.values.listfile)
srv.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment