Last active
January 7, 2022 18:20
-
-
Save Mikael-Lovqvist/b578a58f00de5271cf0aabb4bb06f7d8 to your computer and use it in GitHub Desktop.
Helper script to utilize multiple processes when scanning. Will divy up files, starting with the biggest ones to give an even workload.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse, multiprocessing, subprocess, pathlib, tempfile | |
scanner_tool = ('clamscan', '--no-summary', '--quiet') | |
log_arg = '-l' | |
log_prefix = 'log-' | |
file_list_arg = '-f' #Set this to something false-like in order to use file lists as command line arguments instead | |
file_list_name = 'file-list' #Must not be matching log_prefix | |
process_count = multiprocessing.cpu_count() | |
def process_file_list(files): | |
new_entries = set() | |
for file in files: | |
absolute_file = file.absolute() | |
if absolute_file in processed_files: | |
continue | |
try: | |
if file.is_symlink(): | |
#print(f'Warning, not following {file}') | |
pass | |
elif file.is_dir(): | |
if absolute_file not in processed_files: | |
new_entries |= set(absolute_file.iterdir()) | |
elif file.is_file(): | |
files_to_scan.append(absolute_file) | |
else: | |
print(f'Warning, not handling {file}') | |
except PermissionError: | |
print(f'Warning - could not access {absolute_file}') | |
processed_files.add(absolute_file) | |
return new_entries | |
class cycle: | |
def __init__(self, entries): | |
self.entries = entries | |
self.count = len(entries) | |
self.current = 0 | |
def __call__(self): | |
result = self.entries[self.current] | |
self.current = (self.current + 1) % self.count | |
return result | |
def divy_up_list(source_list, count): | |
targets = [list() for i in range(count)] | |
current_target = cycle(targets) | |
while source_list: | |
current_target().append(source_list.pop()) | |
return targets | |
def scan_files(file_list, log_file=None): | |
#return subprocess.Popen(('python', 'pretend-work.py')) | |
additional_arguments = () | |
if log_arg and log_file: | |
additional_arguments += (log_arg, log_file) | |
return subprocess.Popen((*scanner_tool, *additional_arguments, *file_list)) | |
def scan_files_using_file(file, log_file=None): | |
additional_arguments = (file_list_arg, file) | |
if log_arg and log_file: | |
additional_arguments += (log_arg, log_file) | |
return subprocess.Popen((*scanner_tool, *additional_arguments)) | |
#Init state | |
directories_to_scan = set() | |
files_to_scan = list() | |
processed_files = set() | |
#Process arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument("files", type=pathlib.Path, nargs='*', default=[pathlib.Path('./')]) | |
args = parser.parse_args() | |
#Collect files for scanning | |
pending_files = args.files | |
while pending_files := process_file_list(pending_files): | |
pass | |
processed_files.clear() | |
#Divy up per process | |
per_process_scan_lists = divy_up_list(sorted(files_to_scan, key=lambda file: file.stat().st_size, reverse=True), process_count) | |
#Create temporary directory for logfiles | |
with tempfile.TemporaryDirectory() as temp_dir_path: | |
temp_dir = pathlib.Path(temp_dir_path) | |
should_be_logs = False | |
#Start external processes | |
external_processes = list() | |
for index, scan_list in enumerate(per_process_scan_lists): | |
if scan_list: | |
log_file = temp_dir / f'{log_prefix}{index:03}' | |
if file_list_arg: | |
scan_file = temp_dir / file_list_name | |
scan_file.write_bytes(b'\n'.join(map(lambda f: str(f).encode('utf-8', 'surrogateescape'), scan_list))) | |
external_processes.append(scan_files_using_file(scan_file, log_file=log_file)) | |
else: | |
external_processes.append(scan_files(scan_list, log_file=log_file)) | |
#Wait for processes to finish | |
for process in external_processes: | |
result = process.wait() | |
if result == 0: | |
pass | |
#print(f'{process.args[0]} finished') | |
else: | |
print(f'{process.args[0]} process {process.pid} returned with status {result}') | |
should_be_logs = True | |
#Retrieve logs | |
if should_be_logs: | |
scan_result = 1 | |
print('Retrieving logs') | |
for file in temp_dir.glob('{log_prefix}*'): | |
for line in file.read_text().split('\n'): | |
if set(line) - {'-', ' '}: #Ignore lines that only consists of spaces and hyphens | |
print(f' {line}') | |
else: | |
scan_result = 0 | |
print('Scan result nominal') | |
exit(scan_result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment