Skip to content

Instantly share code, notes, and snippets.

@sileht
Created March 11, 2015 13:40
Show Gist options
  • Save sileht/571a0a9204c3a96896ab to your computer and use it in GitHub Desktop.
Save sileht/571a0a9204c3a96896ab to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import argparse
import logging
import sys
import time
from oslo.config import cfg
from oslo import messaging
from oslo.messaging import rpc
from oslo.messaging import notify
class NoParsingFilter(logging.Filter):
def filter(self, record):
msg = record.getMessage()
for i in ['received {', 'MSG_ID is ']:
if i in msg:
return False
return True
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
LOG = logging.getLogger()
LOG.handlers[0].addFilter(NoParsingFilter())
for i in [
'kombu',
'amqp',
'oslo.messaging._drivers.amqp',
'stevedore',
'qpid.messaging'
]:
logging.getLogger(i).setLevel(logging.WARN)
cfg.CONF.heartbeat_interval = 10
cfg.CONF.notification_topics = "notif"
cfg.CONF.notification_driver = "messaging"
#url='kombu+qpid://guest:password@localhost/test'
url='rabbit://guest:password@localhost/'
transport = messaging.get_transport(cfg.CONF, url=url)
class NotifyEndpoint(object):
def __init__(self):
self.cache = []
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info('msg rcv')
LOG.info("%s %s %s %s" % (ctxt, publisher_id, event_type, payload))
if payload not in self.cache:
LOG.info('requeue msg')
self.cache.append(payload)
for i in range(15):
eventlet.sleep(1)
return messaging.NotificationResult.REQUEUE
else:
LOG.info('ack msg')
return messaging.NotificationResult.HANDLED
def notify_server():
endpoints = [NotifyEndpoint()]
target = messaging.Target(topic='n-t1')
server = notify.get_notification_listener(transport, [target],
endpoints, executor='eventlet')
server.start()
server.wait()
class RpcEndpoint(object):
def __init__(self, wait_before_answer):
self.count = None
self.wait_before_answer = wait_before_answer
def info(self, ctxt, message):
i = int(message.replace('test ', ''))
if self.count is None:
self.count = i
elif i == 0:
self.count = 0
else:
self.count += 1
LOG.info("######## RCV: %s/%s" % (self.count, message))
if self.wait_before_answer > 0:
time.sleep(self.wait_before_answer)
return "OK: %s" % message
def rpc_server(wait_before_answer):
endpoints = [RpcEndpoint(wait_before_answer)]
target = messaging.Target(topic='t1', server='moi')
server = rpc.get_rpc_server(transport, target,
endpoints, executor='eventlet')
server.start()
server.wait()
def threads_spawner(threads, method, *args, **kwargs):
p = eventlet.GreenPool(size=threads)
for i in range(0, threads):
p.spawn_n(method, i, *args, **kwargs)
p.waitall()
def rpc_call(_id, messages, wait_after_msg, timeout):
target = messaging.Target(topic='t1', server='moi')
c = rpc.RPCClient(transport, target)
c = c.prepare(timeout=timeout)
for i in range(0, messages):
payload = "test %d" % i
LOG.info("SEND: %s" % payload)
try:
res = c.call({}, 'info', message=payload)
except Exception:
LOG.exception('no RCV for %s' % i)
else:
LOG.info("RCV: %s" % res)
if wait_after_msg > 0:
time.sleep(wait_after_msg)
def notifier(_id, messages, wait_after_msg, timeout):
n1 = notify.Notifier(transport, topic="n-t1").prepare(
publisher_id='publisher-%d' % _id)
msg = 0
for i in range(0, messages):
msg = 1 + msg
ctxt = {}
payload = dict(msg=msg, vm='test', otherdata='ahah')
LOG.info("send msg")
LOG.info(payload)
n1.info(ctxt, 'compute.start1', payload)
if wait_after_msg > 0:
time.sleep(wait_after_msg)
def main():
parser = argparse.ArgumentParser(description='RPC DEMO')
subparsers = parser.add_subparsers(dest='mode',
help='notify/rpc server/client mode')
server = subparsers.add_parser('notify-server')
client = subparsers.add_parser('notify-client')
client.add_argument('-p', dest='threads', type=int, default=1,
help='number of client threads')
client.add_argument('-m', dest='messages', type=int, default=1,
help='number of call per threads')
client.add_argument('-w', dest='wait_after_msg', type=int, default=-1,
help='sleep time between two messages')
client.add_argument('-t', dest='timeout', type=int, default=3,
help='client timeout')
server = subparsers.add_parser('rpc-server')
server.add_argument('-w', dest='wait_before_answer', type=int, default=-1)
client = subparsers.add_parser('rpc-client')
client.add_argument('-p', dest='threads', type=int, default=1,
help='number of client threads')
client.add_argument('-m', dest='messages', type=int, default=1,
help='number of call per threads')
client.add_argument('-w', dest='wait_after_msg', type=int, default=-1,
help='sleep time between two messages')
client.add_argument('-t', dest='timeout', type=int, default=3,
help='client timeout')
args = parser.parse_args()
if args.mode == 'rpc-server':
rpc_server(args.wait_before_answer)
elif args.mode == 'notify-server':
notify_server()
elif args.mode == 'notify-client':
threads_spawner(args.threads, notifier, args.messages,
args.wait_after_msg, args.timeout)
elif args.mode == 'rpc-client':
threads_spawner(args.threads, rpc_call, args.messages,
args.wait_after_msg, args.timeout)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment