import collections | |
import logging | |
import json | |
import sys | |
import threading | |
import time | |
from oslo_config import cfg | |
import oslo_messaging | |
logging.basicConfig(level=logging.WARNING) | |
LOG = logging.getLogger() | |
class NotificationEndpoint(object): | |
def __init__(self): | |
self.done = threading.Event() | |
self.image_errors = collections.defaultdict(int) | |
self.host_errors = set() | |
self.completed = 0 | |
self.max_width = 0 | |
self.start_time = 0 | |
@property | |
def errors(self): | |
return sum(v for v in self.image_errors.values()) | |
def handle_start(self, data): | |
# We noticed an operation started, record the time | |
print('Image cache started on %i hosts' % ( | |
len(data['hosts']))) | |
self.start_time = time.time() | |
def handle_end(self, data): | |
# Calculate the time from start to finish, | |
# and print some stats about the process, including failures. | |
duration = time.time() - self.start_time | |
print('\nCompleted %i hosts, %i errors in %im%02is' % ( | |
self.completed, self.errors, | |
duration / 60, duration % 60)) | |
if self.host_errors: | |
print('Errors from hosts:') | |
for host in sorted(self.host_errors): | |
print(host) | |
for image, errors in self.image_errors.items(): | |
print('Image %s failed %i times' % (image, errors)) | |
# Let the main thread know we are finished | |
self.done.set() | |
def handle_progress(self, data): | |
# Keep track of how many hosts have finished | |
self.completed = data['index'] | |
# Record the IDs of images and hostnames that failed for some reason | |
for image in data['images_failed']: | |
self.image_errors[image] += 1 | |
self.host_errors.add(data['host']) | |
# Print out information about progress | |
msg = 'Aggregate "%s" host %i: %3i%% complete (%i errors)\r' % ( | |
data['name'], | |
data['index'], | |
100 * data['index'] / data['total'], | |
self.errors) | |
self.max_width = max(self.max_width, len(msg)) | |
pad = self.max_width - len(msg) | |
sys.stdout.write(msg) | |
sys.stdout.write(' ' * pad) | |
sys.stdout.write('\r') | |
def info(self, ctxt, publisher_id, event_type, payload, metadata): | |
"""Handle incoming notifications. | |
Dispatches to individual handlers based on what type of event it is. | |
""" | |
try: | |
data = payload['nova_object.data'] | |
if 'cache_images.progress' in event_type: | |
self.handle_progress(data) | |
elif 'cache_images.start' in event_type: | |
self.handle_start(data) | |
elif 'cache_images.end' in event_type: | |
self.handle_end(data) | |
except Exception as e: | |
print('Failed: %s' % e) | |
print(payload) | |
def main(): | |
cfg.CONF() | |
transport = oslo_messaging.get_notification_transport( | |
cfg.CONF, | |
url='rabbit://stackrabbit:0235f26b14bfe007fab9@192.168.201.41:5672/') | |
target = oslo_messaging.Target(topic='versioned_notifications') | |
endpoint = NotificationEndpoint() | |
server = oslo_messaging.get_notification_listener( | |
transport, [target], [endpoint], executor='threading') | |
server.start() | |
# Poll so we can be interrpted by C-c if necessary | |
while not endpoint.done.is_set(): | |
try: | |
endpoint.done.wait(timeout=1) | |
except KeyboardInterrupt: | |
break | |
server.stop() | |
server.wait() | |
# Nonzero exit if there were failures | |
return endpoint.errors and 1 or 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment