Skip to content

Instantly share code, notes, and snippets.

@dbr
Created January 16, 2010 17:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dbr/278916 to your computer and use it in GitHub Desktop.
Save dbr/278916 to your computer and use it in GitHub Desktop.
fxphd.com class download processor
#!/usr/bin/env python2.6
"""Processes fxphd.com class ZIP files, by copying the archive to a specified
location, then extracting this into a specified path.
Directory structure is:
/example/fxphd/10jan_term/
archives/
preview_classes/
my_classes/
Courses listed (by the "abc123" format course name) are put in my_classes,
others are put in "preview_classes"
New classes are labelled green using Finder's labels, to mark then as
unwatched.
The script was written on OS X. It is by no means perfect, and you should
verify files are copied/extracted correctly. "Works on my machine". If it
works on yours, great. Completely unsupported script. Consider it public domain
"""
import re
import os
import sys
import shutil
import subprocess
from zipfile import ZipFile
from AppKit import NSURL
from ScriptingBridge import SBApplication
def config():
conf = {}
conf['base_dir'] = '/Volumes/eggDrive/downloads/fxphd/{term_name}'
conf['my_classes_dir'] = conf['base_dir'] + '/my_classes'
conf['preview_classes_dir'] = conf['base_dir'] + '/preview_classes'
conf['archives_dir'] = conf['base_dir'] + '/archives'
conf['terms'] = {
'11_07_july_term' : ['bkd221', 'mya214', 'nuk305', 'hou203'],
'11_10_oct_term': ['bkd222', 'pyt201', 'rms101', 'mar301'],
'12_01_jan_term': ['bkd223', 'dct301', 'hou204', 'pft303'],
}
conf['current'] = '12_01_jan_term'
return conf
def getInfoFromZipName(fname):
i = re.match("(?P<course>\w{3,4}\d{3})-class(?P<classno>\d\d)(?:-updated|r)?.zip", fname)
if i:
info = i.groupdict()
info['type'] = 'regularclass'
return info
j = re.match("(?P<course>\w{3}\d{3})-files((?P<classno>\d\d))r?.zip", fname)
if j:
info = j.groupdict()
info['type'] = 'files'
return info
k = re.match("(?P<course>\w{3}\d{3})-class(?P<classno>\d\d)r?.(mov|mp4)", fname)
if k:
info = k.groupdict()
info['type'] = 'uncompressedqt'
return info
raise NotImplementedError(fname)
def pathFromInfo(info):
conf = config()
# Check if course is being taken
for term_name, courses in conf['terms'].items():
if info['course'] in courses:
if info['type'] in ['regularclass']:
fname = "{0[course]}/{0[course]}-class{0[classno]}".format(info)
fpath = conf['my_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
elif info['type'] == 'uncompressedqt':
# QT's go into root directory
fname = "{0[course]}/".format(info)
fpath = conf['my_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
elif info['type'] == 'files':
fname = "{0[course]}/{0[course]}-files{0[classno]}".format(info)
fpath = conf['my_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
else:
raise NotImplementedError(info['type'])
# Preview classes
term_name = conf['current']
print "Preview class for {0}".format(term_name)
if info['type'] in ['regularclass']:
fname = "{0[course]}/{0[course]}-class{0[classno]}".format(info)
fpath = conf['preview_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
elif info['type'] == 'uncompressedqt':
# QT's go into root directory
fname = "{0[course]}/".format(info)
fpath = conf['preview_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
elif info['type'] == 'files':
fname = "{0[course]}/{0[course]}-files{0[classno]}".format(info)
fpath = conf['preview_classes_dir'].format(term_name = term_name)
return os.path.join(fpath, fname)
else:
raise NotImplementedError(info['type'])
def copy_file(source, destdir):
print "Copy file {0} to folder {1}".format(source, destdir)
path, sourcefname = os.path.split(source)
try:
os.makedirs(destdir)
except OSError:
pass
targetfilething = os.path.join(destdir, sourcefname)
if os.path.isfile(targetfilething):
print "Archive exists at {0}, not copying".format(targetfilething)
return
shutil.copyfile(source, targetfilething)
return targetfilething
def make_dir_green(path):
applescript = """
tell application "Finder"
set x to POSIX file "{0}" as alias
set the label index of x to 6 -- 6 = green
end tell
""".format(path)
cmd = subprocess.Popen(["osascript"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
cmd.stdin.write(applescript)
def zip_has_single_subdir(sourcezipfile):
zf = ZipFile(sourcezipfile)
allfilenames = [cur.filename for cur in zf.filelist]
zf.close()
if any(x.find("../") > -1 for x in allfilenames):
raise ValueError("ZIP file contains relative .. dirs in filename")
if any(x.startswith("/") for x in allfilenames):
raise ValueError("Files contain absolute dirs")
firstdirs = [x.split("/")[0] for x in allfilenames]
if (len(set(firstdirs)) == 1):
return (True, firstdirs[0])
else:
return (False, '')
def extract_zip_to_dir(sourcezipfile, destdir):
hasonesubdir, subdirname = zip_has_single_subdir(sourcezipfile)
if hasonesubdir:
wheretosir, _ = os.path.split(destdir)
print "Extract to one level up from {0}".format(wheretosir)
else:
wheretosir = destdir
print "Extract to {0}".format(wheretosir)
try:
os.makedirs(wheretosir)
except OSError, e:
if e.errno == 17:
print "Directory already exists"
else:
raise
else:
print "Making full path to {0}".format(wheretosir)
cmd = ["unzip", "-d", wheretosir, sourcezipfile]
print "Running cmd:"
print " ".join(cmd)
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, stdin = subprocess.PIPE)
proc.stdin.write("None")
so, se = proc.communicate()
make_dir_green(os.path.join(os.path.split(destdir)[0], subdirname))
make_dir_green(os.path.split(destdir)[0])
def trashPath(path):
"""Trashes a path using the Finder, via OS X's Scripting Bridge.
"""
targetfile = NSURL.fileURLWithPath_(path)
finder = SBApplication.applicationWithBundleIdentifier_("com.apple.Finder")
items = finder.items().objectAtLocation_(targetfile)
items.delete()
def main():
for srcpath in sys.argv[1:]:
srcpath = os.path.abspath(srcpath)
fpath, fname = os.path.split(srcpath)
try:
curinfo = getInfoFromZipName(fname)
except NotImplementedError, e:
print "Unable to parse %s (Why: %s)" % (fname, e)
else:
newpath = pathFromInfo(curinfo)
if curinfo['type'] == 'uncompressedqt':
print
print "#" * 20
print "Creating directory..."
try:
os.makedirs(newpath)
except OSError, e:
if e.errno != 17: # Err 17 is "File exists"
raise
print "...directory already exists"
print
else:
print
print "Copying file: %s to %s" % (fname, newpath)
if os.path.isfile(os.path.join(newpath, fname)):
print "File already exists, not copying"
else:
print copy_file(srcpath, newpath)
print "...copied"
print "Colouring files green", newpath
make_dir_green(newpath)
make_dir_green(os.path.join(newpath, fname))
print "Trashing source file %s" % srcpath
trashPath(srcpath)
else:
print
print "#" * 20
print "Copying archive"
print
new_archive = copy_file(srcpath, os.path.join(config()['archives_dir'].format(term_name = config()['current']), curinfo['course']))
print
print "#" * 20
print "Extracting archive"
print
extract_zip_to_dir(new_archive, newpath)
print
print "#" * 20
print "Trashing source archive"
trashPath(srcpath)
print
print
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment