Skip to content

Instantly share code, notes, and snippets.

@zhangchunlin
Created May 23, 2014 14:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save zhangchunlin/f6cedbbcb6f7a4319ad4 to your computer and use it in GitHub Desktop.
Save zhangchunlin/f6cedbbcb6f7a4319ad4 to your computer and use it in GitHub Desktop.
rucopy
#! /usr/bin/env python
#coding=utf-8
import re
import os
import sys
import shutil
import types
import time
import socket
import json
from optparse import OptionParser
ERR_PATH_NOT_FOUND = 1
ERR_SRCDP_NOT_FOUND = 2
ERR_RULES_FILE_NOT_FOUND = 3
ERR_JSON_FORMAT_ERROR = 4
ERR_RULES_ERR = 5
ERR_UNKNOWN_ERR = 6
class infodict(object):
def __init__(self):
self.d = {}
def merge_misc_info(self):
#time
tsecs = time.time()
self.d["time_secs"] = tsecs
self.d["time_local"] = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(tsecs))
#user
self.d["build_user"] = os.environ["USER"]
#hostname
self.d["build_hostname"] = socket.gethostname()
def merge_kv(self,k,v):
self.d[k]=v
def merge_dict(self,d):
self.d = dict(self.d,**d)
def merge_dict_json(self,fp,kprefix=""):
try:
f = open(fp)
d = json.load(f)
f.close()
if kprefix:
pd = {}
for k in d:
pd["%s%s"%(kprefix,k)]=d[k]
d = pd
self.d = dict(self.d,**d)
print "merge %s"%(fp)
except IOError,e:
#print >>sys.stderr,"warning: %s"%(e)
print "%s not found"%(fp)
def dump(self,fp):
f = open(fp,"w")
json.dump(self.d,f,indent=1,sort_keys=True)
f.close()
print "dump to %s"%(fp)
def _check_p7zip():
p7zip_fp = "/usr/bin/7za"
if not os.path.exists(p7zip_fp):
print >> sys.stderr,"error:7zip not install,pls install it: sudo apt-get install p7zip-full"
sys.exit(ERR_PATH_NOT_FOUND)
def _p7zip_it(dp,fn=None,no_compress=False,remove_src = True):
'''p7zip_it(dp,fn=None,no_compress=False,remove_src = True)
7zip a directory
example: p7zip_it(options.dstbase,no_compress=True)'''
_check_p7zip()
srcdp,srcfn = os.path.split(dp)
if fn == None:
fn = "%s.7z"%(srcfn)
fp7z = os.path.join(srcdp,fn)
if os.path.exists(fp7z):
print "remove old %s"%(fp7z)
os.remove(fp7z)
old_cwd = os.getcwd()
os.chdir(srcdp)
print "change current dir to %s"%(os.getcwd())
if os.path.exists(fn):
os.remove(fn)
if no_compress:
switch = " -mx0 "
else:
switch = " "
cmd = "7za a %s%s%s"%(fn,switch,srcfn)
print cmd
sys.stdout.flush()
os.system(cmd)
if remove_src:
print "remove %s ..."%(srcfn)
shutil.rmtree(srcfn)
os.chdir(old_cwd)
print "change current dir back to %s"%(os.getcwd())
return os.path.join(srcdp,fn)
def _cp_prepare(dstfp):
dstdp = os.path.split(dstfp)[0]
if not os.path.exists(dstdp):
print "make dir: %s"%(dstdp)
os.makedirs(dstdp)
def _copyfiles_with_rules(srcdp,dstdp,frules,drules):
def get_cobj(rs):
if type(rs)==type(()):
rs,flags = rs
else:
flags = None
if rs:
if flags:
cobj = re.compile(rs,flags)
else:
cobj = re.compile(rs)
return cobj
def path_match(path,rs,drs):
match = False
cobj = get_cobj(rs)
if cobj:
mobj = cobj.search(path)
match = bool(mobj)
else:
match = True
cobj = get_cobj(drs)
if cobj:
mobj = cobj.search(path)
if mobj:
match = False
return match
for root,dirs,files in os.walk(srcdp):
if root.startswith(dstdp):
print "%s in dstdp,ignore it"%(root)
dirs = []
files = []
if frules:
for fn in files:
fp = os.path.join(root,fn)
fp_rel = os.path.relpath(fp,srcdp)
for rs,drs,dstfn in frules:
match = path_match(fp_rel,rs,drs)
if match:
if type(dstfn)==types.FunctionType:
dstfn = dstfn(fp_rel)
if not dstfn:
dstfn = fp_rel
else:
rdict = {}
rdict["$ALL$"] = fp_rel
rdict["$HEAD$"],rdict["$TAIL$"] = os.path.split(fp_rel)
rdict["$ROOT$"],rdict["$EXT$"] = os.path.splitext(fp_rel)
for k in rdict:
if dstfn.find(k)!=-1:
dstfn = dstfn.replace(k,rdict[k])
if dstfn[-3:]==".7z" and (fp_rel[-3:]!=".7z"):
srcfp = os.path.join(srcdp,fp_rel)
fn7z = dstfn
_p7zip_it(srcfp,fn=fn7z,remove_src=False)
srcfp = os.path.join(srcdp,fn7z)
dstfp = os.path.join(dstdp,fn7z)
print "move %s -> %s"%(srcfp,dstfp)
_cp_prepare(dstfp)
shutil.move(srcfp,dstfp)
print "remove %s"%(srcfp)
else:
srcfp = os.path.join(srcdp,fp_rel)
dstfp = os.path.join(dstdp,dstfn)
print "copy %s -> %s"%(srcfp,dstfp)
_cp_prepare(dstfp)
shutil.copy2(srcfp,dstfp)
break
if drules:
for dn in dirs:
dp = os.path.join(root,dn)
dp_rel = os.path.relpath(dp,srcdp)
for rs,drs,dstdn in drules:
match = path_match(dp_rel,rs,drs)
if match:
if not dstdn:
dstdn = dp_rel
else:
rdict = {}
rdict["$ALL$"] = dp_rel
rdict["$HEAD$"],rdict["$TAIL$"] = os.path.split(dp_rel)
rdict["$ROOT$"],rdict["$EXT$"] = os.path.splitext(dp_rel)
for k in rdict:
if dstdn.find(k)!=-1:
dstdn = dstdn.replace(k,rdict[k])
if dstdn[-3:]==".7z" and (dp_rel[-3:]!=".7z"):
srcdp2 = os.path.join(srcdp,dp_rel)
fn7z = os.path.split(dstdn)[1]
_p7zip_it(srcdp2,fn=fn7z,remove_src=False)
srcfp = os.path.join(os.path.split(srcdp2)[0],fn7z)
dstfp = os.path.join(dstdp,dstdn)
print "move %s -> %s"%(srcfp,dstfp)
_cp_prepare(dstfp)
shutil.move(srcfp,dstfp)
else:
srcdp2 = os.path.join(srcdp,dp_rel)
dstdp2 = os.path.join(dstdp,dstdn)
print "rsync %s -> %s"%(srcdp2,dstdp2)
_cp_prepare(dstdp2)
sys.stdout.flush()
os.system("rsync -a %s/ %s/"%(srcdp2,dstdp2))
dirs.remove(dn)
break
def _samepath(path1, path2):
"Return True if both pathname arguments refer to the same"
return os.path.realpath(path1)==os.path.realpath(path2)
def _copydirs_with_rules(rules,srcbase=".",dstbase="."):
'''copydirs_with_rules(rules,srcbase=".",dstbase=".")
example:
rules = {
"apks": {
"srcdp":"out",
"files": [ #file rules
["\.apk$",None,None],#[SOURCE MATCH RE,IGNORE MATCH RE,DEST PATH]
],
"dirs": [ #directory rules
["proguard$",None,"$ALL$.7z"],
]
}
}
copydirs_with_rules(rules,options.srcbase,options.dstbase)
args:
rules: copy rules
srcbase: copy source base directory
dstbase: copy destination base directory
destination path $VAR$:
$ALL$: whole path
path can be split to $HEAD$,$TAIL$
example: for "/tmp/test.txt" ,$HEAD$="/tmp" $TAIL$="test.txt"
path can be split to $ROOT$,$EXT$
example: for "/tmp/test.txt" ,$ROOT$="/tmp/test" $EXT$=".txt"'''
for k in rules:
srcdp = rules[k].get('srcdp','.')
dstdp = rules[k].get('dstdp','.')
frules = rules[k].get('files')
drules = rules[k].get('dirs')
if type(srcdp)==type([]):
fodp = None
for dp in srcdp:
if os.path.exists(dp):
fodp = dp
break
if not fodp:
print "%s's source dir can not be found in %s"%(k,srcdp)
sys.exit(ERR_SRCDP_NOT_FOUND)
srcdp = fodp
srcdp = os.path.normpath(os.path.join(srcbase,srcdp))
dstdp = os.path.normpath(os.path.join(dstbase,dstdp))
if _samepath(srcdp,dstdp):
print >>sys.stderr,"error: source path same as destination path (%s)"%(srcdp)
else:
print "%s from %s to %s"%(k,srcdp,dstdp)
_copyfiles_with_rules(srcdp,dstdp,frules,drules)
def _copyfiles2subdirs(srcfp,dstdp,srcbase=".",dstbase="."):
'''copyfiles2subdirs(srcfp,dstdp,srcbase=".",dstbase=".")
copy files to every subdir of dest dir
example: copyfiles2subdirs("Android.mk",".",options.srcbase,options.dstbase)'''
if type(srcfp)==types.StringType:
srcfp = [srcfp]
for fp in srcfp:
srcfp2 = os.path.normpath(os.path.join(srcbase,fp))
if not os.path.exists(srcfp2):
print >>sys.stderr,"%s not exists"%(srcfp2)
continue
dstdp2 = os.path.normpath(os.path.join(dstbase,dstdp))
for n in os.listdir(dstdp2):
dstdp3 = os.path.join(dstdp2,n)
if os.path.isdir(dstdp3):
print "%s -> %s"%(srcfp2,dstdp3)
shutil.copy(srcfp2,dstdp3)
def _copy(src,dst,srcbase=".",dstbase="."):
'''copy(src,dst,srcbase=".",dstbase=".")
Copy the file src to the file or directory dst
example: copy("./copyrules",".",options.srcbase,options.dstbase)'''
srcfp = os.path.normpath(os.path.join(srcbase,src))
dstp = os.path.normpath(os.path.join(dstbase,dst))
if os.path.exists(srcfp):
if _samepath(srcfp,dstp):
print >>sys.stderr,"error: source path(%s) same as destination path"%(srcfp)
else:
print "%s -> %s"%(srcfp,dstp)
shutil.copy(srcfp,dstp)
else:
print >>sys.stderr,"%s not exists"%(srcfp)
def _print_target_help(runlocals):
print "rucopy targets help:"
count = 1
for k in runlocals:
if k[:7]=='target_':
f = runlocals[k]
if f and type(f)==types.FunctionType and f.__doc__:
if count!=1:
print "-----"
print "%d. %s:"%(count,k[7:])
print f.__doc__
count+=1
def _print_func_help(runlocals):
print "rucopy functions help:"
count = 1
for k in runlocals:
if k[:7]!='target_':
f = runlocals[k]
if f and type(f)==types.FunctionType and f.__doc__:
if count!=1:
print "-----"
print "%d. %s:"%(count,k)
print f.__doc__
count+=1
def _print_rucopy_help(parser,runlocals):
print "---------------------"
parser.print_help()
print "---------------------"
_print_target_help(runlocals)
print "---------------------"
_print_func_help(runlocals)
class OptionParserTry(OptionParser):
def exit(self, status=0, msg=None):
pass
def error(self, msg):
pass
def main():
def parser_add_common_options(parser):
parser.add_option('--srcbase', dest='srcbase', default='.', help='source base directory path')
parser.add_option('--dstbase', dest='dstbase', default='.', help='destination base directory path')
parser.add_option('--cwd', dest='cwd', help='set current working directory')
parser.add_option('--crules', dest='crules', default='./copyrules', help='copyrules file path for customization')
parser.add_option('--funchelp', dest='funchelp', action='store_true', help='copyrules functions help,these functions can be used in target')
parser = OptionParserTry(add_help_option=False)
parser_add_common_options(parser)
options, args = parser.parse_args()
#export functions
copydirs_with_rules = _copydirs_with_rules
copyfiles2subdirs = _copyfiles2subdirs
copy = _copy
p7zip_it = _p7zip_it
#export targets
def target_dstclean():
'''clean remove destination diretory'''
if os.path.exists(options.dstbase):
print "remove tree %s"%(options.dstbase)
shutil.rmtree(options.dstbase)
def target_apkarch():
"""copy apks to apk archive"""
rules = {
"apks archive": {
"srcdp":"out",
"files": [ #file rules
["\.apk$|\.apk\.txt",None,None],#[SOURCE MATCH RE,IGNORE MATCH RE,DEST PATH]
["buildinfo\.json|gitlog\.txt|gitlog\.json",None,None],
],
"dirs": [ #directory rules
["proguard$",None,"$ALL$.7z"],
]
}
}
copydirs_with_rules(rules,options.srcbase,options.dstbase)
if os.path.exists(os.path.join(options.srcbase,options.crules)):
copy(options.crules,".",options.srcbase,options.dstbase)
def target_dsttree():
'''tree view of destination directory'''
sys.stdout.flush()
os.system("tree %s"%(options.dstbase))
def target_dst7z():
'''7z destination directory'''
p7zip_it(options.dstbase,no_compress=True)
runlocals = locals()
if os.path.exists(options.crules):
print "apply local copyrules:%s"%(options.crules)
execfile(options.crules,runlocals,runlocals)
usage = "usage: %prog [options] target1 target2 ..."
parser = OptionParser(usage=usage)
parser_add_common_options(parser)
for arg in args:
k = "option_%s"%(arg)
f = runlocals.get(k)
if f and type(f)==types.FunctionType:
f(parser)
options, args = parser.parse_args()
if options.funchelp:
_print_func_help(runlocals)
return
if _samepath(options.srcbase,options.dstbase):
print >> sys.stderr, "error: srcbase and dstbase should not be the same"
print "---------------------"
parser.print_help()
print "---------------------"
_print_target_help(runlocals)
return
#keep old cwd
if options.cwd and options.cwd!=".":
oldcwd = os.getcwd()
print "change current directory to %s"%(options.cwd)
os.chdir(options.cwd)
else:
oldcwd = None
#run
run = False
ignorelist = []
count=1
for arg in args:
if arg:
arg = "target_%s"%(arg)
if arg and (arg not in ignorelist):
f = runlocals.get(arg)
if f and type(f)==types.FunctionType:
print "%d. rucopy running %s ..."%(count,arg);count+=1
f()
run = True
if not run:
print "nothing happen..."
print "---------------------"
_print_target_help(runlocals)
#return old cwd
if oldcwd:
print "change current directory back to %s"%(oldcwd)
os.chdir(oldcwd)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment