Skip to content

Instantly share code, notes, and snippets.

@bzshang
Last active October 14, 2015 17:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bzshang/d609470e8dbe15e0ac1d to your computer and use it in GitHub Desktop.
Save bzshang/d609470e8dbe15e0ac1d to your computer and use it in GitHub Desktop.
Send random normal number at even intervals from Digi Transport to Azure Event Hub
import urllib2, urllib, random, time, hmac, hashlib, base64
from datetime import datetime
def eh():
print 'starting'
uri = 'https://<Event Hub namespace>.servicebus.windows.net/<Event Hub path>/messages'
sas = _sign_string(uri, '<Event Hub key>', '<Event Hub key name>')
headers = {'Content-Type': 'application/atom;type=entry;charset=utf-8', 'Accept': '*/*', 'Authorization': sas}
for i in range(0,10):
ts = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f') + 'Z'
print ts
body = str({"Timestamp": ts, "Value": random.normalvariate(0,1)})
request = urllib2.Request(uri, headers = headers)
request.add_data(body)
response = urllib2.urlopen(request).read()
time.sleep(1)
def _sign_string(uri, key, key_name):
'''
100000 = milsecond expiry
'''
expiry = int(time.time() + 10000)
string_to_sign = urllib.quote_plus(uri) + '\n' + str(expiry)
print 'url: ' + uri
print 'url encoded: ' + string_to_sign
key = key.encode('utf-8')
string_to_sign = string_to_sign.encode('utf-8')
print 'url encoded: ' + string_to_sign
signed_hmac_sha256 = hmac.HMAC(key, string_to_sign, hashlib.sha256)
signature = signed_hmac_sha256.digest()
signature = base64.b64encode(signature)
return 'SharedAccessSignature sr=' + urllib.quote_plus(uri) + '&sig=' + urllib.quote(signature) + '&se=' + str(expiry) + '&skn=' + key_name
if __name__ == '__main__':
eh()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment