Skip to content

Instantly share code, notes, and snippets.

Last active January 7, 2022 18:20
Show Gist options
  • Save Mikael-Lovqvist/b578a58f00de5271cf0aabb4bb06f7d8 to your computer and use it in GitHub Desktop.
Save Mikael-Lovqvist/b578a58f00de5271cf0aabb4bb06f7d8 to your computer and use it in GitHub Desktop.
Helper script to utilize multiple processes when scanning. Will divy up files, starting with the biggest ones to give an even workload.
import argparse, multiprocessing, subprocess, pathlib, tempfile
scanner_tool = ('clamscan', '--no-summary', '--quiet')
log_arg = '-l'
log_prefix = 'log-'
file_list_arg = '-f' #Set this to something false-like in order to use file lists as command line arguments instead
file_list_name = 'file-list' #Must not be matching log_prefix
process_count = multiprocessing.cpu_count()
def process_file_list(files):
new_entries = set()
for file in files:
absolute_file = file.absolute()
if absolute_file in processed_files:
if file.is_symlink():
#print(f'Warning, not following {file}')
elif file.is_dir():
if absolute_file not in processed_files:
new_entries |= set(absolute_file.iterdir())
elif file.is_file():
print(f'Warning, not handling {file}')
except PermissionError:
print(f'Warning - could not access {absolute_file}')
return new_entries
class cycle:
def __init__(self, entries):
self.entries = entries
self.count = len(entries)
self.current = 0
def __call__(self):
result = self.entries[self.current]
self.current = (self.current + 1) % self.count
return result
def divy_up_list(source_list, count):
targets = [list() for i in range(count)]
current_target = cycle(targets)
while source_list:
return targets
def scan_files(file_list, log_file=None):
#return subprocess.Popen(('python', ''))
additional_arguments = ()
if log_arg and log_file:
additional_arguments += (log_arg, log_file)
return subprocess.Popen((*scanner_tool, *additional_arguments, *file_list))
def scan_files_using_file(file, log_file=None):
additional_arguments = (file_list_arg, file)
if log_arg and log_file:
additional_arguments += (log_arg, log_file)
return subprocess.Popen((*scanner_tool, *additional_arguments))
#Init state
directories_to_scan = set()
files_to_scan = list()
processed_files = set()
#Process arguments
parser = argparse.ArgumentParser()
parser.add_argument("files", type=pathlib.Path, nargs='*', default=[pathlib.Path('./')])
args = parser.parse_args()
#Collect files for scanning
pending_files = args.files
while pending_files := process_file_list(pending_files):
#Divy up per process
per_process_scan_lists = divy_up_list(sorted(files_to_scan, key=lambda file: file.stat().st_size, reverse=True), process_count)
#Create temporary directory for logfiles
with tempfile.TemporaryDirectory() as temp_dir_path:
temp_dir = pathlib.Path(temp_dir_path)
should_be_logs = False
#Start external processes
external_processes = list()
for index, scan_list in enumerate(per_process_scan_lists):
if scan_list:
log_file = temp_dir / f'{log_prefix}{index:03}'
if file_list_arg:
scan_file = temp_dir / file_list_name
scan_file.write_bytes(b'\n'.join(map(lambda f: str(f).encode('utf-8', 'surrogateescape'), scan_list)))
external_processes.append(scan_files_using_file(scan_file, log_file=log_file))
external_processes.append(scan_files(scan_list, log_file=log_file))
#Wait for processes to finish
for process in external_processes:
result = process.wait()
if result == 0:
#print(f'{process.args[0]} finished')
print(f'{process.args[0]} process {} returned with status {result}')
should_be_logs = True
#Retrieve logs
if should_be_logs:
scan_result = 1
print('Retrieving logs')
for file in temp_dir.glob('{log_prefix}*'):
for line in file.read_text().split('\n'):
if set(line) - {'-', ' '}: #Ignore lines that only consists of spaces and hyphens
print(f' {line}')
scan_result = 0
print('Scan result nominal')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment