Last active
August 29, 2015 14:07
-
-
Save poseidon4o/1c14cc6af767d8ab7357 to your computer and use it in GitHub Desktop.
Extract script for source files downloaded from moodle's 'mass download' option.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from re import compile, findall, match, sub | |
from argparse import ArgumentParser | |
from mimetypes import guess_type | |
from subprocess import call | |
from datetime import date | |
from random import randint | |
from collections import defaultdict | |
from sys import exit | |
from shutil import rmtree | |
import os | |
arg_parser = ArgumentParser() | |
arg_parser.add_argument('-pr', '--project', help='Project number - 1 or 2') | |
arg_parser.add_argument('-hw', '--homework', help='Homework number - 1 to 5') | |
arg_parser.add_argument('-clean', '--clean', | |
help='Clean up fixer files', action='store_true') | |
class Log(): | |
class LogLevel(): | |
def __init__(self, value, name=''): | |
self.name = name | |
self.value = value | |
def __str__(self): | |
return self.name.upper() | |
def __int__(self): | |
return int(self.value) | |
def __ge__(self, other): | |
return self.value >= other.value | |
NOTICE = LogLevel(1, 'NOTICE') | |
WARNING = LogLevel(2, 'WARNING') | |
ERROR = LogLevel(3, 'ERROR') | |
_log_file_name = 'fixer-{}.log'.format(date.today().isoformat()) | |
open(_log_file_name, 'w').write('') | |
@staticmethod | |
def log(message, level=ERROR): | |
pretty = '{}: [{}]'.format(level, message) | |
level >= Log.WARNING and print(pretty) | |
with open(Log._log_file_name, 'a') as ouput: | |
ouput.write(pretty + '\n') | |
class HWException(Exception): | |
def __init__(self, *arg): | |
super(HWException, self).__init__(arg) | |
class HomewrokFixer(): | |
source_types = {'text/x-c++src'} | |
archive_types = {'application/x-tar', | |
'application/rar', | |
'application/zip', | |
'application/x-7z-compressed'} | |
def __init__(self, project, homework): | |
self.project = project | |
self.homework = homework | |
self.root_dir = os.getcwd() | |
self.project_dir = os.path.join(self.root_dir, | |
'project{}-hw{}'.format(self.project, | |
self.homework)) | |
self.hw_dir = os.path.join(self.project_dir, 'extracted') | |
Log.log('Homework dir {}'.format(self.hw_dir), Log.NOTICE) | |
self.fn_matcher = compile('(\d{3,6})') | |
self.fn_low = compile('71\d{3}') | |
self.fn_high = compile('855\d{3}') | |
self.unmached = set() | |
self.ext_match = compile('^(c|h){1,2}(p|x|\+){0,2}$') | |
self.unmached_folders = defaultdict( | |
lambda: '--ERROR_' + str(randint(10 ** 6, 10 ** 7 - 1)) | |
) | |
self.init_project() | |
def get_moss_dir(self): | |
return self.hw_dir + '/*/*' | |
def init_project(self): | |
for file_name in os.listdir(self.project_dir): | |
file_type = guess_type(file_name)[0] | |
if file_type.split('/')[-1] == 'zip': | |
res = call(['unzip', os.path.join(self.project_dir, file_name), | |
'-d', os.path.join(self.project_dir, 'extracted')], | |
stdout=open('/dev/null')) | |
if res != 0: | |
raise HWException('Can\'t init project dir') | |
def check(self): | |
if self.project not in ('1', '2'): | |
raise HWException('Invalid project number') | |
if self.homework not in ('1', '2', '3', '4', '5'): | |
raise HWException('Invalid homework number') | |
if not os.path.isdir(self.project_dir): | |
raise HWException( | |
'Target path not found ({})'.format(self.project_dir) | |
) | |
def extract_fn(self, file_name): | |
return '_'.join(set(findall(self.fn_matcher, file_name))) | |
def init_student(self, name): | |
fn = self.extract_fn(name) | |
if fn == None: | |
fn = self.unmached_folders[name] | |
Log.log('File name parsed - invalid FN ({})'.format(name)) | |
Log.log('File name parsed ({}) -> ({})'.format(name, fn), Log.NOTICE) | |
student_dir = os.path.join(self.hw_dir, str(fn)) | |
if student_dir == None: | |
raise HWException('Failed to get FN for {}'.format(name)) | |
if not os.path.isdir(student_dir): | |
try: | |
os.mkdir(student_dir) | |
Log.log( | |
'Student dir created {}'.format(student_dir), Log.NOTICE) | |
except Exception as e: | |
Log.log('mkdir error {}'.format(e)) | |
elif student_dir != 'ERROR': | |
Log.log('Multiple inits for {}'.format(name), Log.WARNING) | |
return student_dir | |
def process_archive(self, name, file_type): | |
student_dir = self.init_student(name) | |
if student_dir[-1] != '/': | |
student_dir += '/' | |
archiver = file_type.split('/')[-1] | |
target = os.path.join(self.hw_dir, name) | |
if archiver == 'zip': | |
arg_list = ['unzip', '-qq', '-o', target, '-d', student_dir] | |
elif archiver == 'x-tar': | |
arg_list = ['tar', 'xf', target, '-C', student_dir] | |
elif archiver == 'rar': | |
arg_list = ['unrar', 'e', '-y', target, student_dir] | |
elif archiver == 'x-7z-compressed': | |
arg_list = ['7z', 'x', target, '-o{}'.format(student_dir), '-y'] | |
else: | |
Log.log('Unknown archiver {}:[{}]'.format(file_name, file_type)) | |
if call(arg_list, stdout=open('/dev/null')) != 0: | |
Log.log( | |
'Failed to extract archive [{}]'.format(' '.join(arg_list)) | |
) | |
os.remove(target) | |
Log.log('Extracted {} to {}'.format(name, student_dir), Log.NOTICE) | |
def process_text_file(self, name): | |
student_dir = self.init_student(name) | |
os.rename(os.path.join(self.hw_dir, name), | |
os.path.join(student_dir, name)) | |
def run(self): | |
self.check() | |
for file_name in os.listdir(self.hw_dir): | |
file_type = guess_type(os.path.join(self.hw_dir, file_name))[0] | |
if file_type in self.source_types: | |
self.process_text_file(file_name) | |
elif file_type in self.archive_types: | |
self.process_archive(file_name, file_type) | |
for sub_dir in os.listdir(self.hw_dir): | |
sub_dir = os.path.join(self.hw_dir, sub_dir) | |
if os.path.isdir(sub_dir): | |
self.flatten_dir(sub_dir) | |
for user_file in os.listdir(sub_dir): | |
if not self.is_filename_allowed(user_file): | |
to_remove = os.path.join(sub_dir, user_file) | |
if os.path.isdir(to_remove): | |
rmtree(to_remove) | |
else: | |
os.remove(to_remove) | |
Log.log('File deleted: {}'.format(user_file), | |
Log.NOTICE) | |
else: | |
os.rename(os.path.join(sub_dir, user_file), | |
os.path.join(sub_dir, | |
self.normalize_name(user_file))) | |
Log.log('File skipped: {}'.format(user_file), | |
Log.NOTICE) | |
def normalize_name(self, name): | |
return sub('[^a-zA-Z0-9_\-.]+', '_', name) | |
def is_filename_allowed(self, file_name): | |
file_ext = os.path.splitext(file_name)[1][1:] | |
return file_name in {'Makefile'} or match(self.ext_match, file_ext) | |
def flatten_dir(self, dir_to_flatten): | |
for dirpath, dirnames, filenames in os.walk(dir_to_flatten): | |
for file_name in filenames: | |
os.rename(os.path.join(dirpath, file_name), | |
os.path.join(dir_to_flatten, file_name)) | |
def clean_files(): | |
for file_name in os.listdir(os.getcwd()): | |
if match(r'^fixer-.*?\.log$', file_name): | |
os.remove(file_name) | |
print('Deleted {}'.format(file_name)) | |
if os.path.isdir(file_name): | |
for sub_dir in os.listdir(file_name): | |
if sub_dir == 'extracted': | |
rmtree(os.path.join(file_name, sub_dir)) | |
if __name__ == '__main__': | |
argv = arg_parser.parse_args() | |
if argv.clean: | |
clean_files() | |
exit() | |
hw = HomewrokFixer(argv.project, argv.homework) | |
try: | |
hw.run() | |
except HWException as e: | |
Log.log(e) | |
Log.log('UNMACHED: {}'.format(' '.join([str(fn) for fn in hw.unmached])), | |
Log.NOTICE) | |
arg_list = ['perl', 'moss.pl', | |
'-l', 'cc', | |
'-m', '1000', | |
'-d', hw.get_moss_dir()] | |
print(' '.join(arg_list)) | |
# cant expand dir/* from call, so print command for now | |
# if call(arg_list) != 0: | |
# Log.log('Moss plugin failed!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment