Skip to content

Instantly share code, notes, and snippets.

@iamOgunyinka
Created August 15, 2019 04:51
Show Gist options
  • Save iamOgunyinka/d8c116a40c506604ca8aecbd72194659 to your computer and use it in GitHub Desktop.
Save iamOgunyinka/d8c116a40c506604ca8aecbd72194659 to your computer and use it in GitHub Desktop.
Cooperative sequenced-before tasks
#!/usr/bin/env python
import subprocess
from threading import Event, Thread, Lock, get_ident
from concurrent.futures import ThreadPoolExecutor
from queue import Empty
lock = Lock()
my_list = []
def add_data(data):
with lock:
print(get_ident())
my_list.append(data)
class Task(object):
def __init__(self, command):
self.task = command
self._lock = None
self._waiting_for_task = False
def run(self):
if not self._waiting_for_task:
self._run_task()
if self._lock: self._lock.set()
else:
self._lock.wait()
self._run_task()
def wait_for(self, lock):
self._lock = lock
self._waiting_for_task = True
def set_lock(self):
if not self._lock:
self._lock = Event()
self._lock.clear()
def name(self):
return self.task
def get_lock(self):
return self._lock
def __call__(self):
self.run()
def _run_task(self):
add_data(self.task)
class TaskBlock(Task):
def __init__(self, name):
super().__init__('')
self._name = name
self.tasks = []
def name(self):
return self._name
def add_task(self, task):
self.tasks.append(task)
def last(self):
return self.tasks[-1]
def _run_task(self):
for task in self.tasks:
task()
def __iter__(self):
return self.tasks.__iter__()
def __len__(self):
return len(self.tasks)
class Worker(object):
def __init__(self, task_queue):
self.queue = task_queue
def __call__(self):
while True:
try:
task = self.queue.pop(0)
task()
except IndexError:
break
class TaskScheduler(object):
def __init__(self, max_workers, tasks):
self._num_threads = min(len(tasks), max_workers)
self.task_queue = list(tasks)
def run_tasks(self):
workers = [Worker(self.task_queue) for w in range(self._num_threads)]
threads = []
for worker in workers:
thread = Thread(target=worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
class InputHelper(object):
@staticmethod
def _pre_process_commands(command_list, task_name):
task_block = TaskBlock(task_name)
parent_task = None
for command in command_list:
command = str(command).strip()
if not command:
continue
if command.startswith('_block:') and command.endswith('_'):
new_task_name = command.split('_block:')[1][:-1].strip()
if task_name == new_task_name:
return task_block
task = InputHelper._pre_process_commands(command_list, new_task_name)
else:
task = Task(command)
if command == '_blocker_':
parent_task = task_block.last()
parent_task.set_lock()
continue
if parent_task:
task.wait_for(parent_task.get_lock())
task_block.add_task(task)
return task_block
def main():
command_list = open('untitled.txt', 'r')
commands = InputHelper._pre_process_commands(command_list, 'Global Task')
command_list.close()
task_scheduler = TaskScheduler(7, commands)
task_scheduler.run_tasks()
return 0
if __name__ == '__main__':
main()
for m in my_list:
print(m)
echo hello
_block:file-creation_
mkdir _target_
mkdir _target_/output
mkdir _target_/output/scans
_block:file-creation_
_blocker_
copy foo bar
exit 0
_block:nmap_
mkdir ___target___
mkdir ___target____/output
mkdir ___target___/output/scans
nmap ___target___ -oN _target_/output/scans/_target_-nmap
_block:nmap_
_blocker_
nikto --host _target_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment