Skip to content

Instantly share code, notes, and snippets.

@djosix
Last active June 25, 2021 10:01
Show Gist options
  • Save djosix/071a03bcda9dae8ae9ca7cf8726e8798 to your computer and use it in GitHub Desktop.
Save djosix/071a03bcda9dae8ae9ca7cf8726e8798 to your computer and use it in GitHub Desktop.
Python directory-based command queue executor
import os
import json
import re
import glob
import time
import traceback
import functools
import subprocess as sp
from datetime import datetime
from threading import Thread, Event
def no_raise_wrapper(function):
@functools.wraps(function)
def _wrapper(*args, **kwargs):
try:
return function(*args, **kwargs)
except:
print(traceback.format_exc())
return _wrapper
def sanitize_env_dict(env_dict):
env_dict = env_dict.copy()
for key in list(env_dict.keys()):
assert re.match(r'^[a-zA-Z0-9_]+$', key) is not None
env_dict[key] = str(env_dict[key])
return env_dict
class Worker(Thread):
def __init__(self, name, env={}, attrs={}):
super().__init__(name=name, daemon=True)
self.env = sanitize_env_dict(env)
self.attrs = attrs
self.__running_flag = Event()
self.__current_task = None
def submit(self, task):
assert self.is_available()
assert isinstance(task, (dict, type(None)))
self.__current_task = task
self.__running_flag.set()
def is_available(self):
is_available = self.is_alive()
is_available &= not self.__running_flag.is_set()
return is_available
def retrieve(self):
if not self.__running_flag.is_set():
task = self.__current_task
self.__current_task = None
return task
def run(self):
while True:
self.__running_flag.wait()
if self.__current_task is None:
self.__running_flag.clear()
break
if isinstance(self.__current_task, dict):
self.__execute_task()
else:
self.__current_task = {
'__invalid_task': self.__current_task,
'run': {'completed': False}
}
self.__running_flag.clear()
def __execute_task(self):
self.__current_task['run'] = {}
self.__current_task['run']['start_time'] = time.time()
try:
kwargs = self.__current_task['popen'].copy()
env = self.env.copy()
env.update(kwargs.get('env', {}))
kwargs['env'] = env
sp.check_call(**kwargs)
self.__current_task['run']['completed'] = True
except:
self.__current_task['run']['completed'] = False
self.__current_task['run']['traceback'] = traceback.format_exc()
self.__current_task['run']['end_time'] = time.time()
self.__current_task['run']['worker'] = self # XXX: not serializable
class QueueExecutor:
def __init__(
self,
queue_dir = 'queue',
polling_interval = 0.5,
):
self.queue_dir = queue_dir
self.polling_interval = polling_interval
self.workers = []
self.callbacks = {}
self.subdirs = {
name: os.path.join(self.queue_dir, name)
for name in ['pending', 'running', 'done', 'failed']
}
for dir_path in self.subdirs.values():
os.makedirs(dir_path, exist_ok=True)
def run(self):
assert len(self.workers) > 0
for worker in self.workers:
worker.start()
@no_raise_wrapper
def handle_retrieved_task(task):
next_stage = ['failed', 'done'][task['run']['completed']]
self.__move_to_subdir(task, next_stage)
self.__callback(next_stage, task)
while True:
for worker in self.workers:
if not worker.is_available():
continue
task = worker.retrieve()
if task is not None:
handle_retrieved_task(task)
new_task = self.__get_pending()
if new_task is not None:
worker.submit(new_task)
self.__move_to_subdir(new_task, 'running')
self.__callback('running', new_task)
else:
break
time.sleep(self.polling_interval)
def add_worker(self, name, env={}, attrs={}):
self.workers.append(Worker(name, env, attrs))
def add_callback(self, stage, callback):
assert stage in ('running', 'done', 'failed')
stage_callbacks = self.callbacks.setdefault(stage, [])
stage_callbacks.append(no_raise_wrapper(callback))
def submit_task(
self,
name,
args,
executable = None,
stdin = None,
stdout = None,
stderr = None,
cwd = None,
env = None,
attrs = {},
):
popen_kwargs = {}
assert isinstance(args, (str, tuple, list))
if isinstance(args, str):
shell = True
elif isinstance(args, (tuple, list)):
args = list(map(str, args))
shell = False
popen_kwargs['args'] = args
popen_kwargs['shell'] = shell
if executable is not None:
assert isinstance(executable, str)
popen_kwargs['executable'] = executable
if stdin is not None:
assert isinstance(stdin, str) or stdin == sp.DEVNULL
popen_kwargs['stdin'] = stdin
if stdout is not None:
assert isinstance(stdout, str)
popen_kwargs['stdout'] = stdout
if stderr is not None:
assert isinstance(stderr, str) or stderr == sp.STDOUT
popen_kwargs['stderr'] = stderr
if cwd is not None:
assert isinstance(cwd, str)
popen_kwargs['cwd'] = cwd
if env is not None:
assert isinstance(env, dict)
popen_kwargs['env'] = sanitize_env_dict(env)
create_time = time.time()
datetime_str = datetime.fromtimestamp(create_time).strftime('%Y%m%d_%H%M%S')
file_name = '{}-{}.task.json'.format(datetime_str, name)
json_path = os.path.join(self.subdirs['pending'], file_name)
attrs = attrs.copy()
attrs.setdefault('name', name)
attrs.setdefault('file_name', file_name)
attrs.setdefault('create_time', create_time)
task_dict = {
'popen': popen_kwargs,
'attrs': attrs
}
with open(json_path, 'w') as f:
json.dump(task_dict, f, indent=2)
def __get_pending(self):
pattern = os.path.join(self.subdirs['pending'], '*.task.json')
pending_files = glob.glob(pattern)
if len(pending_files) > 0:
pending_path = min(pending_files, key=os.path.getmtime)
with open(pending_path) as f:
try:
task = json.load(f)
task['__path'] = pending_path
return task
except:
self.__move_to_subdir({'__path': pending_path}, 'failed')
@no_raise_wrapper
def __move_to_subdir(self, task, stage):
assert stage in ('running', 'done', 'failed')
source_path = task.pop('__path')
source_name = os.path.basename(source_path)
target_path = os.path.join(self.subdirs[stage], source_name)
os.rename(source_path, target_path)
task['__path'] = target_path
def __callback(self, stage, task):
for callback in self.callbacks[stage]:
callback(task)
if __name__ == '__main__':
import random
qe = QueueExecutor()
for i in range(30):
duration = random.randint(1, 3)
qe.submit_task(f'task_{i}', f'''
echo Training with CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES
sleep {duration}
''')
for gpu_id in [0, 1, 2, 3]:
qe.add_worker(f'GPU-{gpu_id}', {'CUDA_VISIBLE_DEVICES': gpu_id})
print('workers:', qe.workers)
def make_callback(stage):
def _callback(task):
task_name = task['attrs']['name']
if 'run' in task and 'traceback' in task['run']:
print(task['run']['traceback'])
print(f'[callback at stage={stage!r} task={task_name!r}]')
return _callback
qe.add_callback('running', make_callback('running'))
qe.add_callback('done', make_callback('done'))
qe.add_callback('failed', make_callback('failed'))
print('callbacks:', qe.callbacks)
print('run')
qe.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment