Skip to content

Instantly share code, notes, and snippets.

@IlyaSkriblovsky
Created January 13, 2016 07:37
Show Gist options
  • Save IlyaSkriblovsky/7c27ae970ee38c20e73a to your computer and use it in GitHub Desktop.
Save IlyaSkriblovsky/7c27ae970ee38c20e73a to your computer and use it in GitHub Desktop.
Dependency-Ordered start/stop demo
#!/usr/bin/env python3
import codecs
import sys
import time
from threading import Thread
import six
from six.moves.queue import Queue, Empty
class APIError(Exception):
def __init__(self, explanation):
super().__init__()
self.explanation = explanation
class Service:
def __init__(self, name, dependencies=None, start_delay=0, stop_delay=None, start_error=None, stop_error=None):
self.name = name
self.dependencies = dependencies or set()
self.start_delay = start_delay
self.stop_delay = stop_delay if stop_delay is not None else start_delay
self.start_error = start_error
self.stop_error = stop_error
self.dependents = set()
for dep in self.dependencies:
dep.dependents.add(self)
def __str__(self):
return self.name
__repr__ = __str__
def start(self):
time.sleep(self.start_delay)
if self.start_error:
raise self.start_error
def stop(self):
time.sleep(self.stop_delay)
if self.stop_error:
raise self.stop_error
def ordered_start(services):
_ordered_op(services, 'Starting', 'start', 'dependencies')
def ordered_stop(services):
_ordered_op(services, 'Stopping', 'stop', 'dependents')
def _ordered_op(services, operation_name, method, deps_field):
stream = get_output_stream(sys.stdout)
lines = []
for svc in services:
write_out_msg(stream, lines, str(svc), operation_name)
results = Queue()
def do_stop(service):
try:
result = getattr(service, method)()
results.put((service, result, None))
except Exception as e:
results.put((service, None, e))
stopping = set() # services, stopping threads were started for
stopped = set() # services already stopped
def feed():
for svc in services:
can_be_stopped = svc not in stopping and all(
dep not in services or dep in stopped
for dep in getattr(svc, deps_field)
)
if can_be_stopped:
stopping.add(svc)
t = Thread(target=do_stop, args=(svc,))
t.daemon = True
t.start()
done = 0
expect = len(services)
errors = {}
error_to_reraise = None
feed()
while done < expect:
try:
svc, result, exception = results.get(timeout=1)
if exception is None:
write_out_msg(stream, lines, str(svc), operation_name)
elif isinstance(exception, APIError):
errors[svc.name] = exception.explanation
write_out_msg(stream, lines, str(svc), operation_name, status='error')
else:
errors[svc.name] = exception
error_to_reraise = exception
stopped.add(svc)
feed()
done += 1
except Empty:
pass
if errors:
stream.write('\n')
for svc_name, error in errors.items():
stream.write("ERROR: for {} {}\n".format(svc_name, error))
if error_to_reraise:
raise error_to_reraise
def get_output_stream(stream):
if six.PY3:
return stream
return codecs.getwriter('utf-8')(stream)
def write_out_msg(stream, lines, msg_index, msg, status="done"):
"""
Using special ANSI code characters we can write out the msg over the top of
a previous status message, if it exists.
"""
obj_index = msg_index
if msg_index in lines:
position = lines.index(obj_index)
diff = len(lines) - position
# move up
stream.write("%c[%dA" % (27, diff))
# erase
stream.write("%c[2K\r" % 27)
stream.write("{} {} ... {}\r".format(msg, obj_index, status))
# move back down
stream.write("%c[%dB" % (27, diff))
else:
diff = 0
lines.append(obj_index)
stream.write("{} {} ... \r\n".format(msg, obj_index))
stream.flush()
if __name__ == '__main__':
redis = Service('redis', start_delay=1)
mongo = Service('mongo', start_delay=1)
backend1 = Service('backend1', dependencies={redis, mongo}, start_delay=1) #, start_error = APIError('BOOM'))
backend2 = Service('backend2', dependencies={redis, mongo}, start_delay=1)
nginx = Service('nginx', dependencies={backend2, backend1}, start_delay=1)
services = {
redis, mongo,
backend1, backend2,
nginx
}
ordered_start(services)
print('\n# Running... Press Ctrl+C to stop')
while True:
try:
time.sleep(999)
except KeyboardInterrupt:
break
print('\n')
ordered_stop(services)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment