Skip to content

Instantly share code, notes, and snippets.

@te223
Last active December 28, 2023 19:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save te223/7611227 to your computer and use it in GitHub Desktop.
Save te223/7611227 to your computer and use it in GitHub Desktop.
jzgrep.py is a grep-like command-line utility. It can search recursively in compressed archives such as zip and tar.gz .
#!/usr/bin/env python
# -*- coding: utf-8 -*-
r'''
jzgrep.py is a grep-like command-line utility.
It can search recursively in compressed archives such as zip and tar.gz .
And, supports various Japanese encodings.
Requirements: python2.6 or newer - not including python3.
For more info: $ python jzgrep.py --help | less
'''
_version = "0.7.7"
_program="jzgrep.py"
_copyright = "Copyright (c) 2013 Tetsu. All rights reserved."
_license = "BSD"
import sys
import os, re, StringIO, tempfile
import tarfile, gzip, bz2, zipfile
_isdebug = False
# top must be 7bit-coding such as "iso-2022-jp"
#_encodings = ("iso-2022-jp", "ascii", "utf-8", "euc-jp", "cp932")
_encodings = (
# "iso-2022-jp",
# "iso2022_jp_1",
"iso2022_jp_2",
# "iso2022_jp_3",
# "iso2022_jp_ext",
#"ascii", # need for ascii string contains iso-2022-jp ESC sequence ???
"utf-8",
"euc-jp",
"shift_jis",
"cp932",
)
_ioenc = "utf-8"
_iswarout = True # output warning message or not
isa = isinstance
def join_t(lst, term=""): return term.join( filter(bool, lst) )
def trunc(s, lng):
if (lng > 0 and len(s) > lng): return s[:max(lng-4,0)] + " ..."
return s
def outnl(string, maxlng=0):
string = re.sub(r'[\n\r]+\Z', "", U(string))
string = trunc(string, maxlng)
sys.stdout.write(string.encode(_ioenc,"replace") + "\n")
def eoutnl(string):
string = re.sub(r'[\n\r]+\Z', "", U(string))
sys.stderr.write(string.encode(_ioenc,"replace") + "\n")
def U(s):
if isinstance(s, str): return s.decode(_ioenc, "replace")
return unicode(s)
def unvnl_reader(fobj, bufsize=1024):
"Return readline function supports universal newline mode"
rxnl = re.compile(r'[\n\r].?', re.S)
class _ctx (object): pass
x = _ctx()
(x.unread, x.srchid, x.lbuf) = ("", 0, [])
x.iseof = False
isterminal = hasattr(fobj, "isatty") and fobj.isatty()
def readinto():
if not x.iseof:
s = fobj.read(bufsize)
if s:
if isterminal and (len(s) < bufsize):
x.iseof = True # when sys.stdin is terminal, mybug? or pybug?
return s
x.iseof = True
return ""
def readline():
while not x.iseof:
if x.srchid >= len(x.unread):
(x.unread, x.srchid) = (readinto(), 0)
continue
#print "UNREAD=", x.unread[x.srchid:]
m = rxnl.search(x.unread, x.srchid)
if m:
line = None; nxt = 0
tm = m.group(0)
if tm[0] == "\n":
nxt = m.start() + 1
line = "".join(x.lbuf) + x.unread[x.srchid: nxt ]
elif tm == "\r\n":
line = "".join(x.lbuf) + x.unread[x.srchid: m.start()] + "\n"
nxt = m.end()
elif tm == "\r": # x.unread tail
x.lbuf.append(x.unread[x.srchid: m.start()])
(x.unread, x.srchid) = ("\r" + readinto(), 0)
continue # not need
else: # "\rX"
line = "".join(x.lbuf) + x.unread[x.srchid: m.start()] + "\n"
nxt = m.start() + 1
if line is not None:
(x.srchid, x.lbuf) = (nxt, [])
return line
else: # when not match [\n\r]
x.lbuf.append( x.unread[x.srchid:] )
x.unread = ""
# end of while
if x.lbuf or x.unread[x.srchid: ] :
# print "FLUSH"
line = "".join(x.lbuf) + x.unread[x.srchid: ]
line = re.sub(r'\r\Z', "\n", line)
(x.unread, x.srchid, x.lbuf) = ("", 0, [])
return line
#print "x=",vars(x)
return ""
return readline
# for tar.open(), gzip.open() and bz2.BZ2File() in python2.6
class FileCtx(object):
def __init__(self, fobj): self.fobj = fobj
def __enter__(self): return self.fobj
def __exit__(self, exc_type, exc_value, traceback):
if not (self.fobj == sys.stdin):
## i'ts better??? ##################
#if hasattr(self.fobj, "fileobj"):
# if not isinstance(self.fobj.fileobj, tempfile.SpooledTemporaryFile):
# if hasattr(self.fobj.fileobj, "close"):
# try: self.fobj.fileobj.close()
# except Exception:pass
##
# print " %s will close" % self.fobj # for debug
self.fobj.close()
# print u" %s did close" % self.fobj # for debug
return False # throw exception
# py2.6's zipfile module is not support `is_zipfile(file-object)'
iszip = None
try:
if not zipfile.is_zipfile(StringIO.StringIO("BadZip")):
if _isdebug: print "Can use is_zipfile(file-object)"
iszip = zipfile.is_zipfile
except StandardError: pass # TypeError
if not iszip:
if _isdebug: print "Cannot use is_zipfile(file-object)"
def iszip(f):
if isinstance(f, basestring): return zipfile.is_zipfile(f)
try:
z = zipfile.ZipFile(f,"r")
try: z.infolist()
finally: z.close()
return True
except (zipfile.BadZipfile, zipfile.LargeZipFile, StandardError): pass
return False
def ftype(filename):
# test tar or tar.gz
if tarfile.is_tarfile(filename):
return "tar"
# test gzip
try:
with FileCtx(gzip.open(filename)) as f:
f.read(128)
return "gzip"
except (StandardError, tarfile.TarError): pass
# test bz2
try:
with FileCtx(bz2.BZ2File(filename)) as f:
f.read(128)
return "bz2"
except (StandardError, tarfile.TarError): pass
# test zip
if zipfile.is_zipfile(filename):
return "zip"
#except: pass
return None
# test functions , open each archive types with fileobject
def testtar(e):
with FileCtx( tarfile.open(fileobj=e, mode='r') ) : pass
return "tar"
def testgzip(e):
with FileCtx( gzip.GzipFile(fileobj=e, mode="rb") ) as fz: fz.read(1024)
return "gzip"
def testbz2(e):
b = e.read(2048)
bz2.BZ2Decompressor().decompress(b)
return "bz2"
def testzip(e):
if iszip(e) : return "zip"
return None
# entryname is basestring or tarfile.TarInfo"
def ftypetar(tar, entryname):
testfuncs = (testtar, testgzip, testbz2, testzip)
for func in testfuncs:
with FileCtx( tar.extractfile(entryname) ) as entry:
try:
ret = func(entry)
if ret: return ret
except (StandardError, tarfile.TarError): pass
return None
# Wrapper that accesses ZipFile.open() by tempfile
# ZipFile.open() has not `seek', ZOpenWithTemp() has `seek'
class ZOpenWithTemp(object):
def __init__(self, zipobj, zinfo, mode="r"):
(self.zp, self.zinfo, self.mode) = (zipobj, zinfo, mode)
self.tmp = None
def __enter__(self):
with FileCtx(self.zp.open(self.zinfo, self.mode)) as ze: s = ze.read()
self.tmp = tempfile.SpooledTemporaryFile(max_size=1024 * 1024)
self.tmp.write(s); self.tmp.seek(0,0)
return self.tmp
def __exit__(self, exc_type, exc_value, traceback):
if self.tmp:
#print " ZOpenWithTemp will close" # for debug
self.tmp.close()
self.tmp = None
return False # throw exception
def ftypezip(zp, entryname):
with ZOpenWithTemp(zp, entryname) as tmp:
for func in (testtar, testgzip, testbz2, testzip):
try:
ret = func(tmp)
if ret: return ret
except (StandardError, tarfile.TarError): pass
tmp.seek(0,0)
return None
class PlainPort(object):
def __init__(self, name): self._name = name
def filename(self):
"return file name (unicode string)"
return decode_any(self._name)
def entryname(self): return u""
def eachline(self):
with open(self._name, "rU") as f:
for text in f: yield(text)
class StdinPort(object):
def filename(self): return u"<stdin>"
def entryname(self): return u""
def eachline(self):
for text in iter(sys.stdin.readline, ""): yield(text)
class GzipPort(PlainPort):
def eachline(self):
with FileCtx( gzip.open(self._name) ) as f:
# for text in f: yield(text)
for text in iter(unvnl_reader(f), ""): yield(text)
class BZ2Port(PlainPort):
def eachline(self):
with FileCtx(bz2.BZ2File(self._name, "rU")) as f:
for text in f: yield(text)
class PlainPortInTar(object):
# entry is basestring or tarfile.TarInfo
# tarnames is [parent-entryname ... top-tarname]
def __init__(self, tarobj, entry, tarnames=[]):
self.tar = tarobj
self.tarnames = tarnames
self.entry = entry
self._ename = decode_any(
self.entry.name if isa(self.entry, tarfile.TarInfo) else entry)
def filename(self):
return (self.tarnames and self.tarnames[-1]) or decode_any(self.tar.name)
def entryname(self):
return u"#".join( reversed( [self._ename] + self.tarnames[0:-1] ) )
def eachline(self):
with FileCtx( self.tar.extractfile(self.entry) ) as e:
# for text in e: yield(text)
for text in iter(unvnl_reader(e), ""): yield(text)
class GzipPortInTar(PlainPortInTar):
def eachline(self):
with FileCtx( self.tar.extractfile(self.entry)) as fentry:
with FileCtx( gzip.GzipFile( fileobj=fentry ) ) as fz:
# for text in fz: yield(text)
for text in iter(unvnl_reader(fz), ""): yield(text)
class BZ2PortInTar(PlainPortInTar):
def eachline(self):
with FileCtx( self.tar.extractfile(self.entry)) as fentry:
sbuf = bz2.decompress( fentry.read() )
with FileCtx( StringIO.StringIO(sbuf) ) as fbz:
# for text in fbz: yield(text)
for text in iter(unvnl_reader(fbz), ""): yield(text)
class PlainPortInZip(object):
# entry is basestring or zipfile.ZipInfo
# zipnames is [parent-entryname ... top-zipname]
def __init__(self, zipobj, entry, zipnames=[]):
self.zip = zipobj
self.entry = entry
self.zipnames = zipnames
self._ename = decode_any(
self.entry.filename if isa(self.entry, zipfile.ZipInfo) else entry)
def filename(self):
return (self.zipnames and self.zipnames[-1]) or (
hasattr(self.zip, "filename") and
self.zip.filename and decode_any(self.zip.filename)) or u"??"
def entryname(self):
return u"#".join( reversed( [self._ename] + self.zipnames[0:-1] ) )
def eachline(self):
with FileCtx( self.zip.open(self.entry, "rU") ) as e:
for text in e: yield(text)
#for text in iter(unvnl_reader(e), ""): yield(text)
class GzipPortInZip(PlainPortInZip):
def eachline(self):
with ZOpenWithTemp(self.zip, self.entry) as tmp:
# if hasattr(tmp, 'mode'): print "MODE=", tmp.mode
with FileCtx( gzip.GzipFile( fileobj=tmp, mode='rb' ) ) as fz:
# for text in fz: yield(text)
for text in iter(unvnl_reader(fz), ""): yield(text)
class BZ2PortInZip(PlainPortInZip):
def eachline(self):
with FileCtx( self.zip.open(self.entry, "r") ) as zipext:
sbuf = bz2.decompress( zipext.read() )
with FileCtx( StringIO.StringIO(sbuf) ) as fbz:
# for text in fbz: yield(text)
for text in iter(unvnl_reader(fbz), ""): yield(text)
def decode_any(bstr):
if isinstance(bstr, unicode): return bstr
for enc in _encodings:
try:
return bstr.decode(enc)
except UnicodeError: pass
return bstr.decode("ascii", "replace")
def matchtext(pattern, text, enclist):
encoded = None
for enc in enclist:
try:
m = re.search(pattern, text.decode(enc))
encoded = enc
if m: return (m, enc)
#except: pass
except UnicodeError: pass
#if encoded is None:
# m = re.search(pattern, text.decode("ascii", "replace"))
# if m: return(m, "ascii")
return (None, encoded)
def matchport(pattern, port, enclist, output):
fail_decode = False
lineno = 0
ismatch = False
if _isdebug: outnl("---%s#%s---" % (port.filename(), port.entryname()))
for text in port.eachline():
#if _isdebug: outnl(u"text=" + decode_any(text)) # debug
# text = re.sub(r'[\r\n]$',"", text)
lineno += 1
(m, enc) = matchtext(pattern, text, enclist)
if m:
ismatch = True
output(filename=port.filename(), entryname=port.entryname(),
text=text, lineno=lineno,
match=m, encoding=enc, endcall=False)
#elif _isdebug and enc is None and (not fail_decode):
elif enc is None and (not fail_decode):
if _iswarout:
eoutnl(u"[Warning]: Cannot decode %s line %d" %
(join_t((port.filename(), port.entryname()), u"#"), lineno) )
fail_decode = True
output(filename=port.filename(), entryname=port.entryname(),
text=None, lineno=-1,
match=ismatch, encoding=None, endcall=True)
return ismatch
def match_files(files, pattern, enclist, output):
ret = False
for fname in files:
port = None
try:
if fname == sys.stdin: port = StdinPort()
elif os.path.isfile(fname):
type = ftype(fname)
if type == "gzip": port = GzipPort(fname)
elif type == "bz2": port = BZ2Port(fname)
elif type == "tar":
with FileCtx( tarfile.open(name=fname) ) as tar:
if match_tarobj(tar, [decode_any(fname)], pattern, enclist, output):
ret = True
continue
elif type == "zip":
with FileCtx( zipfile.ZipFile(fname,"r") ) as zp:
if match_zipobj(zp, [decode_any(fname)], pattern, enclist, output):
ret = True
continue
elif type is None: port = PlainPort(fname)
else:
eoutnl(u"[Error]: %s file type `%s' is not supported" % (U(fname), type))
elif not os.path.exists(fname):
eoutnl(u"[Error]: `%s': No such file" % U(fname))
elif os.path.isdir(fname):
eoutnl(u"[Error]: directory entries are not supported yet - `%s'" % U(fname))
else:
eoutnl(u"[Error]: `%s' is not a plain file." % U(fname))
if port:
if matchport(pattern, port, enclist, output): ret = True
except Exception as ex:
if _isdebug:
import traceback
tblist = traceback.format_tb(sys.exc_info()[2])
for tbstr in tblist: sys.stderr.write(tbstr)
sys.stderr.write("[Error]: " + str(ex) + "\n")
return ret
def match_tarobj(tarobj, tarnames, pattern, enclist, output):
ret = False
#for info in tarobj.getmembers():
for info in tarobj:
port = None
try:
if info.isfile():
type = ftypetar(tarobj, info)
if type == "gzip":
port = GzipPortInTar(tarobj, info, tarnames)
elif type == "bz2":
port = BZ2PortInTar(tarobj, info, tarnames)
elif type == "tar":
with FileCtx( tarobj.extractfile(info) ) as fe:
with FileCtx( tarfile.open(fileobj=fe) ) as ftar:
if match_tarobj(ftar, [decode_any(info.name)] + tarnames,
pattern, enclist, output): ret = True
continue
elif type == "zip":
with FileCtx( tarobj.extractfile(info) ) as fe:
with FileCtx( zipfile.ZipFile(fe, "r") ) as fzip:
if match_zipobj(fzip, [decode_any(info.name)] + tarnames,
pattern, enclist, output): ret = True
continue
elif type is None:
port = PlainPortInTar(tarobj, info, tarnames)
if port:
if matchport(pattern, port, enclist, output): ret = True
except Exception as ex:
if _isdebug:
import traceback
tblist = traceback.format_tb(sys.exc_info()[2])
for tbstr in tblist: sys.stderr.write(tbstr)
sys.stderr.write("[Error]: " + str(ex) + "\n")
return ret
def match_zipobj(zipobj, zipnames, pattern, enclist, output):
rxdir = re.compile(r'/\Z')
ret = False
#for info in tarobj.getmembers():
for info in zipobj.infolist():
if (info.file_size == 0) or rxdir.search(info.filename): continue
port = None
try:
type = ftypezip(zipobj, info)
if type == "gzip":
port = GzipPortInZip(zipobj, info, zipnames)
elif type == "bz2":
port = BZ2PortInZip(zipobj, info, zipnames)
elif type == "tar":
with ZOpenWithTemp(zipobj, info) as tmp:
with FileCtx( tarfile.open(fileobj=tmp, mode='r') ) as ftar:
if match_tarobj(ftar, [decode_any(info.filename)] + zipnames,
pattern, enclist, output): ret = True
continue
elif type == "zip":
with ZOpenWithTemp(zipobj, info) as tmp:
with FileCtx( zipfile.ZipFile(tmp, "r")) as fzip:
if match_zipobj(fzip, [decode_any(info.filename)] + zipnames,
pattern, enclist, output): ret = True
continue
elif type is None:
port = PlainPortInZip(zipobj, info, zipnames)
if port:
if matchport(pattern, port, enclist, output): ret = True
except Exception as ex:
if _isdebug:
import traceback
tblist = traceback.format_tb(sys.exc_info()[2])
for tbstr in tblist: sys.stderr.write(tbstr)
sys.stderr.write("[Error]: " + str(ex) + "\n")
return ret
def make_output(print_text=True, print_lineno=False,
print_file=False, print_entry=True):
def output(**args):
if args.get("endcall"): return
olst = []
path= join_t( [ (print_file and args.get("filename") ),
(print_entry and args.get("entryname")) ], u"#")
if path: olst.append( path )
if print_lineno:
olst.append( u"%4.0d" % args["lineno"] )
if print_text:
# enc = args["encoding"] or "utf-8"
enc = args["encoding"]
olst.append( args["text"].decode(enc, "replace") )
outnl(u":".join(olst))
return output
if __name__ == '__main__':
class DisplayException(Exception):pass
class UsageError(DisplayException):
def __init__(self, msg=None): self.msg = msg
def display(self):
if self.msg: eoutnl(self.msg)
usage()
class DisplayVersion(DisplayException):
def display(self):
outnl(u"%s %s" % (_program, _version))
def output_filename_only(**args):
if args.get("endcall") and args.get("match"):
outnl( join_t( [args.get("filename"), args.get("entryname")], u"#") )
def usage():
import locale
isjapanese = False
try: isjapanese = (locale.getdefaultlocale()[0] == "ja_JP")
except Exception:pass
if isjapanese: usage_jp()
else: usage_en()
exit(1)
def usage_en():
sys.stdout.write( (r'''
jzgrep.py is a grep-like command-line utility.
It can also search recursively in compressed archives such as zip
and tar.gz as well as in plain-text file. (except for STDIN)
And, supports various Japanese encodings.
Requirements:
python2.6 or newer - not including python3
Platforms:
It's only tested on Linux and os-x10.6.
Supported file-types:
gzip, bz2, zip, tar, tar.gz, tar.bz2, plain-text
Supported encodings:
utf-8, cp932, euc-jp, iso2022_jp
Restrictions:
Very slow performance
Usage: %s [OPTIONS] PATTERN [FILE ...]
PATTERN -- regular expression pattern (using Python's `re' module)
FILE... -- input FILEs (default is STDIN)
OPTIONS
--help -- Print this help message, and exit
-? -- Print this help message, and exit
-n -- Output with the line number
-l -- Suppress normal output; instead print the name of each
input file
-H -- Print the filename for each match.
-h -- Suppress the filenames on output when multiple files are
searched.
-i -- Ignore case (re.IGNORECASE)
-s -- Suppress Warning messages. (e.g [Warning]: Cannot decode ...)
-V -- Print version number, and exit
''' % os.path.basename(sys.argv[0]) ))
def usage_jp():
sys.stdout.write( (ur'''
日本語ファイル対応の grep like なコマンドです。
指定したファイルの、日本語エンコーディングがそれぞれ異なっていても、行毎に、
iso2022_jp, utf-8, euc-jp, cp932
の順でエンコードを試し、適宜照合させることができます。
また、通常の テキストファイル以外に、gzip, bz2 の圧縮ファイルや、tar.gz, zip
のアーカイブについても、自動認識してその中を再帰的に検索します。
(但し、入力が stdinの場合を除く)
対応しているファイル・タイプ:
gzip, bz2, zip, tar, tar.gz, tar.bz2, plain-text
制限:
Python2.6以上が必要です(2.7推奨)。
Python3では動きません。
遅いです。しかも、たまに間違って余計なものにヒットすることもあります。
動作確認は、linux と os-x10.6 でしか行なっておりません。
Usage: %s [OPTIONS] PATTERN [FILE ...]
PATTERN -- 正規表現パターン (Python の `re' モジュールを使っています)
FILE... -- PATTERN で検索するファイル。省略時は stdin。
OPTIONS
--help -- このメッセージを表示して終了する
-? -- --helpと同じ
-n -- 行番号を表示
-l -- PATTERN に一致する行を含むファイル名だけを出力
-H -- 出力する行の先頭にファイル名を付ける。
(検索するファイルが複数の場合は、これがデフォルト)
-h -- 出力する行の先頭にファイル名を付けない
(検索するファイルが一つの場合は、これがデフォルト)
-i -- 大文字小文字を区別しない (re.IGNORECASE)
-u -- \w,\W,\b,\B,\d,\D,\s,\Sを pythonのunicodeデータベースに依存
させます。(re.UNICODE)
-s -- 警告メッセージを出さない (e.g [Warning]: Cannot decode ...)
-V -- バージョン番号を表示して終了する
--osenc ENC -- Operating System の入出力エンコード (defaultは utf-8)
''' % os.path.basename(sys.argv[0])).encode(_ioenc) )
files = []; pattern = None
pr_lineno = False
pr_text = True
pr_file = output = None
reflag = 0
arglist = sys.argv[1:]
try:
while arglist:
arg = arglist.pop(0)
m = re.match(r'-(.+)', arg, re.I)
if m:
option = m.group(1)
if option[0] == "-":
# --option
opt = option[1:]
if opt == "osenc" and arglist:
_ioenc = arglist.pop(0)
elif opt == "help": raise UsageError()
else: raise UsageError(u"[Error]: Illegal option, --%s" % opt)
else:
# -options
for opt in list(option):
if opt == "n": pr_lineno = True
elif opt == "l": output = output_filename_only
elif opt == "h": pr_file = False
elif opt == "H": pr_file = True
elif opt == "i": reflag |= re.I
elif opt == "u": reflag |= re.U
elif opt == "M": reflag |= re.M
elif opt == "S": reflag |= re.S
elif opt == "X": reflag |= re.X
elif opt == "s": _iswarout = False
elif opt == "D": _isdebug = True
elif opt == "?": raise UsageError()
elif opt == "V": raise DisplayVersion()
else:
raise UsageError(u"[Error]: Illegal option -- %s" % opt)
else:
if pattern is None: pattern = arg
else:
files.append(arg)
# end of while
if pattern is None: raise UsageError()
except DisplayException as dspex:
dspex.display()
exit(1)
# Make port list
if not files:
files = [ sys.stdin ]
pr_file = False
if pr_file is None:
pr_file = len(files) > 1
pattern = re.compile(U(pattern), reflag)
if not output :
output = make_output(print_text=pr_text, print_lineno=pr_lineno,
print_file=pr_file)
# import pdb; pdb.set_trace()
ret = match_files(files, pattern, _encodings, output)
exit(0 if ret else 1 )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment