Created
January 13, 2016 07:37
-
-
Save IlyaSkriblovsky/7c27ae970ee38c20e73a to your computer and use it in GitHub Desktop.
Dependency-Ordered start/stop demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import codecs | |
import sys | |
import time | |
from threading import Thread | |
import six | |
from six.moves.queue import Queue, Empty | |
class APIError(Exception): | |
def __init__(self, explanation): | |
super().__init__() | |
self.explanation = explanation | |
class Service: | |
def __init__(self, name, dependencies=None, start_delay=0, stop_delay=None, start_error=None, stop_error=None): | |
self.name = name | |
self.dependencies = dependencies or set() | |
self.start_delay = start_delay | |
self.stop_delay = stop_delay if stop_delay is not None else start_delay | |
self.start_error = start_error | |
self.stop_error = stop_error | |
self.dependents = set() | |
for dep in self.dependencies: | |
dep.dependents.add(self) | |
def __str__(self): | |
return self.name | |
__repr__ = __str__ | |
def start(self): | |
time.sleep(self.start_delay) | |
if self.start_error: | |
raise self.start_error | |
def stop(self): | |
time.sleep(self.stop_delay) | |
if self.stop_error: | |
raise self.stop_error | |
def ordered_start(services): | |
_ordered_op(services, 'Starting', 'start', 'dependencies') | |
def ordered_stop(services): | |
_ordered_op(services, 'Stopping', 'stop', 'dependents') | |
def _ordered_op(services, operation_name, method, deps_field): | |
stream = get_output_stream(sys.stdout) | |
lines = [] | |
for svc in services: | |
write_out_msg(stream, lines, str(svc), operation_name) | |
results = Queue() | |
def do_stop(service): | |
try: | |
result = getattr(service, method)() | |
results.put((service, result, None)) | |
except Exception as e: | |
results.put((service, None, e)) | |
stopping = set() # services, stopping threads were started for | |
stopped = set() # services already stopped | |
def feed(): | |
for svc in services: | |
can_be_stopped = svc not in stopping and all( | |
dep not in services or dep in stopped | |
for dep in getattr(svc, deps_field) | |
) | |
if can_be_stopped: | |
stopping.add(svc) | |
t = Thread(target=do_stop, args=(svc,)) | |
t.daemon = True | |
t.start() | |
done = 0 | |
expect = len(services) | |
errors = {} | |
error_to_reraise = None | |
feed() | |
while done < expect: | |
try: | |
svc, result, exception = results.get(timeout=1) | |
if exception is None: | |
write_out_msg(stream, lines, str(svc), operation_name) | |
elif isinstance(exception, APIError): | |
errors[svc.name] = exception.explanation | |
write_out_msg(stream, lines, str(svc), operation_name, status='error') | |
else: | |
errors[svc.name] = exception | |
error_to_reraise = exception | |
stopped.add(svc) | |
feed() | |
done += 1 | |
except Empty: | |
pass | |
if errors: | |
stream.write('\n') | |
for svc_name, error in errors.items(): | |
stream.write("ERROR: for {} {}\n".format(svc_name, error)) | |
if error_to_reraise: | |
raise error_to_reraise | |
def get_output_stream(stream): | |
if six.PY3: | |
return stream | |
return codecs.getwriter('utf-8')(stream) | |
def write_out_msg(stream, lines, msg_index, msg, status="done"): | |
""" | |
Using special ANSI code characters we can write out the msg over the top of | |
a previous status message, if it exists. | |
""" | |
obj_index = msg_index | |
if msg_index in lines: | |
position = lines.index(obj_index) | |
diff = len(lines) - position | |
# move up | |
stream.write("%c[%dA" % (27, diff)) | |
# erase | |
stream.write("%c[2K\r" % 27) | |
stream.write("{} {} ... {}\r".format(msg, obj_index, status)) | |
# move back down | |
stream.write("%c[%dB" % (27, diff)) | |
else: | |
diff = 0 | |
lines.append(obj_index) | |
stream.write("{} {} ... \r\n".format(msg, obj_index)) | |
stream.flush() | |
if __name__ == '__main__': | |
redis = Service('redis', start_delay=1) | |
mongo = Service('mongo', start_delay=1) | |
backend1 = Service('backend1', dependencies={redis, mongo}, start_delay=1) #, start_error = APIError('BOOM')) | |
backend2 = Service('backend2', dependencies={redis, mongo}, start_delay=1) | |
nginx = Service('nginx', dependencies={backend2, backend1}, start_delay=1) | |
services = { | |
redis, mongo, | |
backend1, backend2, | |
nginx | |
} | |
ordered_start(services) | |
print('\n# Running... Press Ctrl+C to stop') | |
while True: | |
try: | |
time.sleep(999) | |
except KeyboardInterrupt: | |
break | |
print('\n') | |
ordered_stop(services) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment