Last active
June 2, 2024 20:57
-
-
Save danslinky/c927972bd3d980be9d6711cb68ea9a7a to your computer and use it in GitHub Desktop.
Python multi-threaded worker pool to process jobs, with status and execution time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed | |
import random | |
import time | |
import os | |
import pandas as pd | |
file_blacklist = ['file2.pdf', 'malware2.txt'] | |
class TextExtractor: | |
def __init__(self, filename): | |
self.filename = filename | |
def extract_text(self): | |
try: | |
# random sleep to simulate processing time | |
time.sleep(random.randint(1, 2)) | |
# simulate a long running job | |
if self.filename == 'file1.pdf': | |
time.sleep(1) | |
text = f"Text from {self.filename}" | |
# simulate an error | |
if self.filename == 'file5.pdf': | |
raise Exception("Text Extraction Error.") | |
except Exception as e: | |
print(f"ERROR: {self.filename} - {e}") | |
raise e | |
return text | |
class EntityExtractor: | |
def __init__(self): | |
return | |
def extract_entities(self, text): | |
text = text.lower() | |
entities = [{'entity': 'entity1', 'count': text.count('entity1')},{ 'entity': 'entity2', 'count': text.count('entity2')}] | |
return entities | |
class JobProcessor: | |
def __init__(self, jobs): | |
self.jobs = jobs | |
self.results = [] | |
def extract_text_from_file(self, job): | |
filename = job['filename'] | |
job['text'] = "" | |
text = TextExtractor(filename).extract_text() | |
job['output'] = text | |
return job | |
def get_entities(self, job): | |
text = job['text'] | |
entities = EntityExtractor().extract_entities(text) | |
job['output'] = entities | |
return job | |
def get_summary(self, job): | |
text = job['text'] | |
summary = f"summary of {job['filename']}: {text[:10]}..." | |
job['output'] = summary | |
return job | |
def process_job(self, process_executor, job): | |
if job['type'] == 'text_extract': | |
worker = self.extract_text_from_file | |
elif job['type'] == 'entity_extract': | |
if 'text' not in job: | |
job = process_executor.submit(self.extract_text_from_file, job).result() | |
worker = self.get_entities | |
elif job['type'] == 'text_summarize': | |
if 'text' not in job: | |
job = process_executor.submit(self.extract_text_from_file, job).result() | |
worker = self.get_summary | |
else: | |
job['failed'] = True | |
job['status_message'] = "Unknown job type." | |
return job | |
start = time.time() | |
try: | |
job_result = process_executor.submit(worker, job).result() | |
except Exception as e: | |
end = time.time() | |
duration = end - start | |
job['duration'] = duration | |
job['failed'] = True | |
job['status_message'] = str(e) | |
return job | |
end = time.time() | |
duration = end - start | |
if job_result is None: | |
job['failed'] = True | |
else: | |
job = job_result | |
job['failed'] = False | |
job['duration'] = duration | |
return job | |
def thread_worker(self, process_executor, job): | |
if job['filename'] in file_blacklist: | |
job['failed'] = True | |
job['status_message'] = "File is blacklisted." | |
return job | |
return self.process_job(process_executor, job) | |
def process_jobs(self): | |
with ProcessPoolExecutor() as process_executor: | |
with ThreadPoolExecutor(os.cpu_count()) as thread_executor: | |
futures = [ | |
thread_executor.submit(self.thread_worker, process_executor, job) | |
for job in self.jobs | |
] | |
for future in as_completed(futures): | |
result = future.result() | |
self.results.append(result) | |
def get_results(self): | |
df = pd.DataFrame(self.results) | |
df['duration'] = df['duration'].fillna(0) | |
df.fillna("", inplace=True) | |
df['duration'] = df['duration'].apply(lambda x: round(x)) | |
df['status_message'] = df['status_message'].fillna("") | |
df['status'] = df['failed'].apply(lambda x: 'FAILED' if x else 'SUCCESS') | |
return df | |
if __name__ == '__main__': | |
process_start = time.time() | |
# Create some jobs | |
limit = 10 | |
jobs = [{'filename': f'file{i}.pdf', 'type': 'text_extract'} for i in range(1, limit + 1)] | |
jobs.append({'filename': f'entites.pdf', 'type': 'entity_extract'}) | |
jobs.append({'filename': f'malware.zip', 'type': 'dunno'}) | |
jobs.append({'filename': f'malware2.txt', 'type': 'text_summarize'}) | |
jobs.append({'filename': f'summary.txt', 'type': 'text_summarize'}) | |
jobs.append({'filename': f'more_entities.txt', 'type': 'entity_extract'}) | |
jobs_df = pd.DataFrame(jobs) | |
# process the jobs | |
processor = JobProcessor(jobs_df.to_dict('records')) | |
processor.process_jobs() | |
# get job results as they finish | |
results = processor.get_results() | |
process_end = time.time() | |
process_duration = process_end - process_start | |
print(f"Executed {len(jobs)} jobs") | |
print(f"Total duration: {round(process_duration)} seconds") | |
print(f"Average job duration: {round(results['duration'].mean())} seconds") | |
from matplotlib import pyplot as plt | |
plt.figure(figsize=(3, 1)) | |
counts = results['failed'].value_counts() | |
colors = ['red' if index else 'green' for index in counts.index] | |
counts.plot(kind='bar', title='Job report', ylabel='Count', xlabel="Status", color=colors) | |
percentage_failed = round(counts[True] / len(results) * 100) | |
percentage_success = round(counts[False] / len(results) * 100) | |
plt.xticks([0, 1], [f'{percentage_success}% Success', f'{percentage_failed}% Failed'], rotation=0) | |
plt.show() | |
plt.figure(figsize=(3, 1)) | |
results[results['failed'] == False]['duration'].plot(kind='box', vert=False, patch_artist=True) | |
plt.xlabel('Seconds') | |
plt.show() | |
from IPython.display import display | |
display(results[['filename', 'type', 'status', 'status_message','output','duration']]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment