Skip to content

Instantly share code, notes, and snippets.

@kk7ds
Created November 4, 2019 20:12
Show Gist options
  • Save kk7ds/83aa03b9263adb3ac4f25cb32ab4346c to your computer and use it in GitHub Desktop.
Save kk7ds/83aa03b9263adb3ac4f25cb32ab4346c to your computer and use it in GitHub Desktop.
import collections
import logging
import json
import sys
import threading
import time
from oslo_config import cfg
import oslo_messaging
logging.basicConfig(level=logging.WARNING)
LOG = logging.getLogger()
class NotificationEndpoint(object):
def __init__(self):
self.done = threading.Event()
self.image_errors = collections.defaultdict(int)
self.host_errors = set()
self.completed = 0
self.max_width = 0
self.start_time = 0
@property
def errors(self):
return sum(v for v in self.image_errors.values())
def handle_start(self, data):
# We noticed an operation started, record the time
print('Image cache started on %i hosts' % (
len(data['hosts'])))
self.start_time = time.time()
def handle_end(self, data):
# Calculate the time from start to finish,
# and print some stats about the process, including failures.
duration = time.time() - self.start_time
print('\nCompleted %i hosts, %i errors in %im%02is' % (
self.completed, self.errors,
duration / 60, duration % 60))
if self.host_errors:
print('Errors from hosts:')
for host in sorted(self.host_errors):
print(host)
for image, errors in self.image_errors.items():
print('Image %s failed %i times' % (image, errors))
# Let the main thread know we are finished
self.done.set()
def handle_progress(self, data):
# Keep track of how many hosts have finished
self.completed = data['index']
# Record the IDs of images and hostnames that failed for some reason
for image in data['images_failed']:
self.image_errors[image] += 1
self.host_errors.add(data['host'])
# Print out information about progress
msg = 'Aggregate "%s" host %i: %3i%% complete (%i errors)\r' % (
data['name'],
data['index'],
100 * data['index'] / data['total'],
self.errors)
self.max_width = max(self.max_width, len(msg))
pad = self.max_width - len(msg)
sys.stdout.write(msg)
sys.stdout.write(' ' * pad)
sys.stdout.write('\r')
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""Handle incoming notifications.
Dispatches to individual handlers based on what type of event it is.
"""
try:
data = payload['nova_object.data']
if 'cache_images.progress' in event_type:
self.handle_progress(data)
elif 'cache_images.start' in event_type:
self.handle_start(data)
elif 'cache_images.end' in event_type:
self.handle_end(data)
except Exception as e:
print('Failed: %s' % e)
print(payload)
def main():
cfg.CONF()
transport = oslo_messaging.get_notification_transport(
cfg.CONF,
url='rabbit://stackrabbit:0235f26b14bfe007fab9@192.168.201.41:5672/')
target = oslo_messaging.Target(topic='versioned_notifications')
endpoint = NotificationEndpoint()
server = oslo_messaging.get_notification_listener(
transport, [target], [endpoint], executor='threading')
server.start()
# Poll so we can be interrpted by C-c if necessary
while not endpoint.done.is_set():
try:
endpoint.done.wait(timeout=1)
except KeyboardInterrupt:
break
server.stop()
server.wait()
# Nonzero exit if there were failures
return endpoint.errors and 1 or 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment