Skip to content

Instantly share code, notes, and snippets.

@simone-gasparini
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simone-gasparini/ce923dd794e67cbecf67 to your computer and use it in GitHub Desktop.
Save simone-gasparini/ce923dd794e67cbecf67 to your computer and use it in GitHub Desktop.
Azure Event Hub SAS Token creation
import time
from azure import _validate_not_none
from azure.http import HTTPRequest
from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication
service_namespace = 'test-namespace'
key_name = 'Test'
key_value = 'YOUR-TOKEN='
class ServiceBusSASAuthenticationExt(ServiceBusSASAuthentication, object):
def __init__(self, *args, **kwargs):
self.max_time = int(kwargs.pop('max_time', 5))
super(ServiceBusSASAuthenticationExt, self).__init__(*args, **kwargs)
def _get_expiry(self):
'''Returns the UTC datetime, in seconds since Epoch, when this signed
request expires (max_time minutes from now).'''
return int(round(time.time() + (self.max_time * 60)))
class ServiceBusServiceExt(ServiceBusService):
def __init__(self, *args, **kwargs):
super(ServiceBusServiceExt, self).__init__(*args, **kwargs)
if isinstance(self.authentication, ServiceBusSASAuthentication):
self.authentication = ServiceBusSASAuthenticationExt(
kwargs.get('shared_access_key_name'),
kwargs.get('shared_access_key_value'),
max_time=365*24*60
)
def get_event_hub_auth(self, hub_name, device_id=None):
'''
Retrieves an event hub SAS Token.
'''
_validate_not_none('hub_name', hub_name)
request = HTTPRequest()
request.method = 'POST'
request.host = self._get_host()
if device_id:
request.path = '/{0}/publishers/{1}/messages'.format(hub_name, device_id)
else:
request.path = '/{0}/messages'.format(hub_name)
request.headers = self._update_service_bus_header(request)
sas_token = None
for header in request.headers:
if header[0] == 'Authorization':
sas_token = header[1]
return sas_token
sbs = ServiceBusServiceExt(service_namespace, shared_access_key_name=key_name, shared_access_key_value=key_value)
print 'SAS Token: "%s"' % sbs.get_event_hub_auth('hubname', 'sensor0001')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment