Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created September 5, 2011 15:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robcowie/1195200 to your computer and use it in GitHub Desktop.
Save robcowie/1195200 to your computer and use it in GitHub Desktop.
pika credentials from uri
import pika
amqp_re = re.compile(r'^amqp://'\
'((?P<username>[^:]*)[:](?P<password>[^@]*)[@])?' \
'((?P<host>[^/:]*)([:](?P<port>[^/]*))?)' \
'(?P<virtual_host>/[^/]*)/?' \
'([?](?P<query>[^/]*))?$')
def pika_credentials_from_url(amqp_url = None):
if not amqp_url:
amqp_url = 'amqp://127.0.0.1/'
params = amqp_re.match(amqp_url).groupdict()
credentials = pika.PlainCredentials(
params['username'] or 'guest',
params['password'] or 'guest',
)
conn_params = pika.ConnectionParameters(
params['host'] or '127.0.0.1',
port = int(params['port'] or '5672'),
virtual_host = params['virtual_host'] or '/',
credentials = credentials,
)
return conn_params
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment