Skip to content

Instantly share code, notes, and snippets.

@dcollien
Created May 22, 2019 06:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcollien/4fcd8de165fa152d40c30d97c4a0ab2a to your computer and use it in GitHub Desktop.
Save dcollien/4fcd8de165fa152d40c30d97c4a0ab2a to your computer and use it in GitHub Desktop.
import time
import urllib.parse
import hmac
import hashlib
import base64
SB_NAME = "ol-events"
EH_NAME = "ol-user-metrics"
SAS_NAME = "collect-user-metric"
SAS_VALUE = "EXAMPLE-TOKEN"
def get_auth_token(sb_name, eh_name, sas_name, sas_value):
"""
Returns an authorization token dictionary
for making calls to Event Hubs REST API.
"""
uri = urllib.parse.quote_plus("https://{}.servicebus.windows.net/{}" \
.format(sb_name, eh_name))
sas = sas_value.encode('utf-8')
expiry = str(int(time.time() + 10000))
string_to_sign = (uri + '\n' + expiry).encode('utf-8')
signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
return {"sb_name": sb_name,
"eh_name": eh_name,
"token":'SharedAccessSignature sr={}&sig={}&se={}&skn={}' \
.format(uri, signature, expiry, sas_name)
}
def get_request_details():
token = get_auth_token(SB_NAME, EH_NAME, SAS_NAME, SAS_VALUE)
sb_url = "https://{}.servicebus.windows.net/".format(SB_NAME)
endpoint = "{}/messages".format(EH_NAME)
query = "?timeout=60&api-version=2014-01"
print("POST {}".format(sb_url + endpoint + query))
print("Authorization: {}".format(token))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment