Created
November 4, 2019 20:12
-
-
Save kk7ds/83aa03b9263adb3ac4f25cb32ab4346c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import logging | |
import json | |
import sys | |
import threading | |
import time | |
from oslo_config import cfg | |
import oslo_messaging | |
logging.basicConfig(level=logging.WARNING) | |
LOG = logging.getLogger() | |
class NotificationEndpoint(object): | |
def __init__(self): | |
self.done = threading.Event() | |
self.image_errors = collections.defaultdict(int) | |
self.host_errors = set() | |
self.completed = 0 | |
self.max_width = 0 | |
self.start_time = 0 | |
@property | |
def errors(self): | |
return sum(v for v in self.image_errors.values()) | |
def handle_start(self, data): | |
# We noticed an operation started, record the time | |
print('Image cache started on %i hosts' % ( | |
len(data['hosts']))) | |
self.start_time = time.time() | |
def handle_end(self, data): | |
# Calculate the time from start to finish, | |
# and print some stats about the process, including failures. | |
duration = time.time() - self.start_time | |
print('\nCompleted %i hosts, %i errors in %im%02is' % ( | |
self.completed, self.errors, | |
duration / 60, duration % 60)) | |
if self.host_errors: | |
print('Errors from hosts:') | |
for host in sorted(self.host_errors): | |
print(host) | |
for image, errors in self.image_errors.items(): | |
print('Image %s failed %i times' % (image, errors)) | |
# Let the main thread know we are finished | |
self.done.set() | |
def handle_progress(self, data): | |
# Keep track of how many hosts have finished | |
self.completed = data['index'] | |
# Record the IDs of images and hostnames that failed for some reason | |
for image in data['images_failed']: | |
self.image_errors[image] += 1 | |
self.host_errors.add(data['host']) | |
# Print out information about progress | |
msg = 'Aggregate "%s" host %i: %3i%% complete (%i errors)\r' % ( | |
data['name'], | |
data['index'], | |
100 * data['index'] / data['total'], | |
self.errors) | |
self.max_width = max(self.max_width, len(msg)) | |
pad = self.max_width - len(msg) | |
sys.stdout.write(msg) | |
sys.stdout.write(' ' * pad) | |
sys.stdout.write('\r') | |
def info(self, ctxt, publisher_id, event_type, payload, metadata): | |
"""Handle incoming notifications. | |
Dispatches to individual handlers based on what type of event it is. | |
""" | |
try: | |
data = payload['nova_object.data'] | |
if 'cache_images.progress' in event_type: | |
self.handle_progress(data) | |
elif 'cache_images.start' in event_type: | |
self.handle_start(data) | |
elif 'cache_images.end' in event_type: | |
self.handle_end(data) | |
except Exception as e: | |
print('Failed: %s' % e) | |
print(payload) | |
def main(): | |
cfg.CONF() | |
transport = oslo_messaging.get_notification_transport( | |
cfg.CONF, | |
url='rabbit://stackrabbit:0235f26b14bfe007fab9@192.168.201.41:5672/') | |
target = oslo_messaging.Target(topic='versioned_notifications') | |
endpoint = NotificationEndpoint() | |
server = oslo_messaging.get_notification_listener( | |
transport, [target], [endpoint], executor='threading') | |
server.start() | |
# Poll so we can be interrpted by C-c if necessary | |
while not endpoint.done.is_set(): | |
try: | |
endpoint.done.wait(timeout=1) | |
except KeyboardInterrupt: | |
break | |
server.stop() | |
server.wait() | |
# Nonzero exit if there were failures | |
return endpoint.errors and 1 or 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment