Skip to content

Instantly share code, notes, and snippets.

@danslinky
Last active June 2, 2024 20:57
Show Gist options
  • Save danslinky/c927972bd3d980be9d6711cb68ea9a7a to your computer and use it in GitHub Desktop.
Save danslinky/c927972bd3d980be9d6711cb68ea9a7a to your computer and use it in GitHub Desktop.
Python multi-threaded worker pool to process jobs, with status and execution time.
# %%
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import random
import time
import os
import pandas as pd
file_blacklist = ['file2.pdf', 'malware2.txt']
class TextExtractor:
def __init__(self, filename):
self.filename = filename
def extract_text(self):
try:
# random sleep to simulate processing time
time.sleep(random.randint(1, 2))
# simulate a long running job
if self.filename == 'file1.pdf':
time.sleep(1)
text = f"Text from {self.filename}"
# simulate an error
if self.filename == 'file5.pdf':
raise Exception("Text Extraction Error.")
except Exception as e:
print(f"ERROR: {self.filename} - {e}")
raise e
return text
class EntityExtractor:
def __init__(self):
return
def extract_entities(self, text):
text = text.lower()
entities = [{'entity': 'entity1', 'count': text.count('entity1')},{ 'entity': 'entity2', 'count': text.count('entity2')}]
return entities
class JobProcessor:
def __init__(self, jobs):
self.jobs = jobs
self.results = []
def extract_text_from_file(self, job):
filename = job['filename']
job['text'] = ""
text = TextExtractor(filename).extract_text()
job['output'] = text
return job
def get_entities(self, job):
text = job['text']
entities = EntityExtractor().extract_entities(text)
job['output'] = entities
return job
def get_summary(self, job):
text = job['text']
summary = f"summary of {job['filename']}: {text[:10]}..."
job['output'] = summary
return job
def process_job(self, process_executor, job):
if job['type'] == 'text_extract':
worker = self.extract_text_from_file
elif job['type'] == 'entity_extract':
if 'text' not in job:
job = process_executor.submit(self.extract_text_from_file, job).result()
worker = self.get_entities
elif job['type'] == 'text_summarize':
if 'text' not in job:
job = process_executor.submit(self.extract_text_from_file, job).result()
worker = self.get_summary
else:
job['failed'] = True
job['status_message'] = "Unknown job type."
return job
start = time.time()
try:
job_result = process_executor.submit(worker, job).result()
except Exception as e:
end = time.time()
duration = end - start
job['duration'] = duration
job['failed'] = True
job['status_message'] = str(e)
return job
end = time.time()
duration = end - start
if job_result is None:
job['failed'] = True
else:
job = job_result
job['failed'] = False
job['duration'] = duration
return job
def thread_worker(self, process_executor, job):
if job['filename'] in file_blacklist:
job['failed'] = True
job['status_message'] = "File is blacklisted."
return job
return self.process_job(process_executor, job)
def process_jobs(self):
with ProcessPoolExecutor() as process_executor:
with ThreadPoolExecutor(os.cpu_count()) as thread_executor:
futures = [
thread_executor.submit(self.thread_worker, process_executor, job)
for job in self.jobs
]
for future in as_completed(futures):
result = future.result()
self.results.append(result)
def get_results(self):
df = pd.DataFrame(self.results)
df['duration'] = df['duration'].fillna(0)
df.fillna("", inplace=True)
df['duration'] = df['duration'].apply(lambda x: round(x))
df['status_message'] = df['status_message'].fillna("")
df['status'] = df['failed'].apply(lambda x: 'FAILED' if x else 'SUCCESS')
return df
if __name__ == '__main__':
process_start = time.time()
# Create some jobs
limit = 10
jobs = [{'filename': f'file{i}.pdf', 'type': 'text_extract'} for i in range(1, limit + 1)]
jobs.append({'filename': f'entites.pdf', 'type': 'entity_extract'})
jobs.append({'filename': f'malware.zip', 'type': 'dunno'})
jobs.append({'filename': f'malware2.txt', 'type': 'text_summarize'})
jobs.append({'filename': f'summary.txt', 'type': 'text_summarize'})
jobs.append({'filename': f'more_entities.txt', 'type': 'entity_extract'})
jobs_df = pd.DataFrame(jobs)
# process the jobs
processor = JobProcessor(jobs_df.to_dict('records'))
processor.process_jobs()
# get job results as they finish
results = processor.get_results()
process_end = time.time()
process_duration = process_end - process_start
print(f"Executed {len(jobs)} jobs")
print(f"Total duration: {round(process_duration)} seconds")
print(f"Average job duration: {round(results['duration'].mean())} seconds")
from matplotlib import pyplot as plt
plt.figure(figsize=(3, 1))
counts = results['failed'].value_counts()
colors = ['red' if index else 'green' for index in counts.index]
counts.plot(kind='bar', title='Job report', ylabel='Count', xlabel="Status", color=colors)
percentage_failed = round(counts[True] / len(results) * 100)
percentage_success = round(counts[False] / len(results) * 100)
plt.xticks([0, 1], [f'{percentage_success}% Success', f'{percentage_failed}% Failed'], rotation=0)
plt.show()
plt.figure(figsize=(3, 1))
results[results['failed'] == False]['duration'].plot(kind='box', vert=False, patch_artist=True)
plt.xlabel('Seconds')
plt.show()
from IPython.display import display
display(results[['filename', 'type', 'status', 'status_message','output','duration']])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment