Skip to content

Instantly share code, notes, and snippets.

@poseidon4o
Last active August 29, 2015 14:07
Show Gist options
  • Save poseidon4o/1c14cc6af767d8ab7357 to your computer and use it in GitHub Desktop.
Save poseidon4o/1c14cc6af767d8ab7357 to your computer and use it in GitHub Desktop.
Extract script for source files downloaded from moodle's 'mass download' option.
#!/usr/bin/python3
from re import compile, findall, match, sub
from argparse import ArgumentParser
from mimetypes import guess_type
from subprocess import call
from datetime import date
from random import randint
from collections import defaultdict
from sys import exit
from shutil import rmtree
import os
arg_parser = ArgumentParser()
arg_parser.add_argument('-pr', '--project', help='Project number - 1 or 2')
arg_parser.add_argument('-hw', '--homework', help='Homework number - 1 to 5')
arg_parser.add_argument('-clean', '--clean',
help='Clean up fixer files', action='store_true')
class Log():
class LogLevel():
def __init__(self, value, name=''):
self.name = name
self.value = value
def __str__(self):
return self.name.upper()
def __int__(self):
return int(self.value)
def __ge__(self, other):
return self.value >= other.value
NOTICE = LogLevel(1, 'NOTICE')
WARNING = LogLevel(2, 'WARNING')
ERROR = LogLevel(3, 'ERROR')
_log_file_name = 'fixer-{}.log'.format(date.today().isoformat())
open(_log_file_name, 'w').write('')
@staticmethod
def log(message, level=ERROR):
pretty = '{}: [{}]'.format(level, message)
level >= Log.WARNING and print(pretty)
with open(Log._log_file_name, 'a') as ouput:
ouput.write(pretty + '\n')
class HWException(Exception):
def __init__(self, *arg):
super(HWException, self).__init__(arg)
class HomewrokFixer():
source_types = {'text/x-c++src'}
archive_types = {'application/x-tar',
'application/rar',
'application/zip',
'application/x-7z-compressed'}
def __init__(self, project, homework):
self.project = project
self.homework = homework
self.root_dir = os.getcwd()
self.project_dir = os.path.join(self.root_dir,
'project{}-hw{}'.format(self.project,
self.homework))
self.hw_dir = os.path.join(self.project_dir, 'extracted')
Log.log('Homework dir {}'.format(self.hw_dir), Log.NOTICE)
self.fn_matcher = compile('(\d{3,6})')
self.fn_low = compile('71\d{3}')
self.fn_high = compile('855\d{3}')
self.unmached = set()
self.ext_match = compile('^(c|h){1,2}(p|x|\+){0,2}$')
self.unmached_folders = defaultdict(
lambda: '--ERROR_' + str(randint(10 ** 6, 10 ** 7 - 1))
)
self.init_project()
def get_moss_dir(self):
return self.hw_dir + '/*/*'
def init_project(self):
for file_name in os.listdir(self.project_dir):
file_type = guess_type(file_name)[0]
if file_type.split('/')[-1] == 'zip':
res = call(['unzip', os.path.join(self.project_dir, file_name),
'-d', os.path.join(self.project_dir, 'extracted')],
stdout=open('/dev/null'))
if res != 0:
raise HWException('Can\'t init project dir')
def check(self):
if self.project not in ('1', '2'):
raise HWException('Invalid project number')
if self.homework not in ('1', '2', '3', '4', '5'):
raise HWException('Invalid homework number')
if not os.path.isdir(self.project_dir):
raise HWException(
'Target path not found ({})'.format(self.project_dir)
)
def extract_fn(self, file_name):
return '_'.join(set(findall(self.fn_matcher, file_name)))
def init_student(self, name):
fn = self.extract_fn(name)
if fn == None:
fn = self.unmached_folders[name]
Log.log('File name parsed - invalid FN ({})'.format(name))
Log.log('File name parsed ({}) -> ({})'.format(name, fn), Log.NOTICE)
student_dir = os.path.join(self.hw_dir, str(fn))
if student_dir == None:
raise HWException('Failed to get FN for {}'.format(name))
if not os.path.isdir(student_dir):
try:
os.mkdir(student_dir)
Log.log(
'Student dir created {}'.format(student_dir), Log.NOTICE)
except Exception as e:
Log.log('mkdir error {}'.format(e))
elif student_dir != 'ERROR':
Log.log('Multiple inits for {}'.format(name), Log.WARNING)
return student_dir
def process_archive(self, name, file_type):
student_dir = self.init_student(name)
if student_dir[-1] != '/':
student_dir += '/'
archiver = file_type.split('/')[-1]
target = os.path.join(self.hw_dir, name)
if archiver == 'zip':
arg_list = ['unzip', '-qq', '-o', target, '-d', student_dir]
elif archiver == 'x-tar':
arg_list = ['tar', 'xf', target, '-C', student_dir]
elif archiver == 'rar':
arg_list = ['unrar', 'e', '-y', target, student_dir]
elif archiver == 'x-7z-compressed':
arg_list = ['7z', 'x', target, '-o{}'.format(student_dir), '-y']
else:
Log.log('Unknown archiver {}:[{}]'.format(file_name, file_type))
if call(arg_list, stdout=open('/dev/null')) != 0:
Log.log(
'Failed to extract archive [{}]'.format(' '.join(arg_list))
)
os.remove(target)
Log.log('Extracted {} to {}'.format(name, student_dir), Log.NOTICE)
def process_text_file(self, name):
student_dir = self.init_student(name)
os.rename(os.path.join(self.hw_dir, name),
os.path.join(student_dir, name))
def run(self):
self.check()
for file_name in os.listdir(self.hw_dir):
file_type = guess_type(os.path.join(self.hw_dir, file_name))[0]
if file_type in self.source_types:
self.process_text_file(file_name)
elif file_type in self.archive_types:
self.process_archive(file_name, file_type)
for sub_dir in os.listdir(self.hw_dir):
sub_dir = os.path.join(self.hw_dir, sub_dir)
if os.path.isdir(sub_dir):
self.flatten_dir(sub_dir)
for user_file in os.listdir(sub_dir):
if not self.is_filename_allowed(user_file):
to_remove = os.path.join(sub_dir, user_file)
if os.path.isdir(to_remove):
rmtree(to_remove)
else:
os.remove(to_remove)
Log.log('File deleted: {}'.format(user_file),
Log.NOTICE)
else:
os.rename(os.path.join(sub_dir, user_file),
os.path.join(sub_dir,
self.normalize_name(user_file)))
Log.log('File skipped: {}'.format(user_file),
Log.NOTICE)
def normalize_name(self, name):
return sub('[^a-zA-Z0-9_\-.]+', '_', name)
def is_filename_allowed(self, file_name):
file_ext = os.path.splitext(file_name)[1][1:]
return file_name in {'Makefile'} or match(self.ext_match, file_ext)
def flatten_dir(self, dir_to_flatten):
for dirpath, dirnames, filenames in os.walk(dir_to_flatten):
for file_name in filenames:
os.rename(os.path.join(dirpath, file_name),
os.path.join(dir_to_flatten, file_name))
def clean_files():
for file_name in os.listdir(os.getcwd()):
if match(r'^fixer-.*?\.log$', file_name):
os.remove(file_name)
print('Deleted {}'.format(file_name))
if os.path.isdir(file_name):
for sub_dir in os.listdir(file_name):
if sub_dir == 'extracted':
rmtree(os.path.join(file_name, sub_dir))
if __name__ == '__main__':
argv = arg_parser.parse_args()
if argv.clean:
clean_files()
exit()
hw = HomewrokFixer(argv.project, argv.homework)
try:
hw.run()
except HWException as e:
Log.log(e)
Log.log('UNMACHED: {}'.format(' '.join([str(fn) for fn in hw.unmached])),
Log.NOTICE)
arg_list = ['perl', 'moss.pl',
'-l', 'cc',
'-m', '1000',
'-d', hw.get_moss_dir()]
print(' '.join(arg_list))
# cant expand dir/* from call, so print command for now
# if call(arg_list) != 0:
# Log.log('Moss plugin failed!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment