Skip to content

Instantly share code, notes, and snippets.

@affo
Created August 3, 2016 14:48
Show Gist options
  • Save affo/c543dccdbc84f7cc85c6f91f251c8c09 to your computer and use it in GitHub Desktop.
Save affo/c543dccdbc84f7cc85c6f91f251c8c09 to your computer and use it in GitHub Desktop.
Python scripts for producing inputs and consuming outputs for an Azure Streaming Analytics Job
from azure.storage.blob import BlockBlobService
from utils import get_storage_credentials
if __name__ == '__main__':
import sys, time
if len(sys.argv) < 2:
print 'Specify container name, please.'
sys.exit(1)
C_NAME = sys.argv[1]
account, key = get_storage_credentials()
blob_service = BlockBlobService(account, key)
blobs = blob_service.list_blobs(C_NAME)
for blob in blobs:
blob_service.delete_blob(C_NAME, blob.name)
print 'Blob deleted: {}/{}'.format(C_NAME, blob.name)
'''
Consume only from service bus queues.
'''
if __name__ == '__main__':
import sys, json
from utils import get_servicebus
if len(sys.argv) < 2:
print 'Specify the queue name, please.'
sys.exit(1)
Q_NAME = sys.argv[1]
bus_service = get_servicebus()
def window_repr(data):
r = ', '.join([t.get('value') for t in data.get('collect')])
return '[' + r + ']'
while True:
msg = bus_service.receive_queue_message(Q_NAME, peek_lock=False)
if not msg.body:
continue
start = msg.body.find('{')
end = msg.body.rfind('}')
body = msg.body[start : end + 1]
try:
body = json.loads(body)
window_end = body.get('ts')
body = window_repr(body)
print body, window_end
except ValueError:
print body
import json
from utils import *
MAX_SIM_TUPLES = 5
MAX_GAP = 5
if __name__ == '__main__':
import sys, time
STARTUP_TIME = int(round(time.time() * 1000))
if len(sys.argv) < 2:
print 'Specify resource name, please.'
sys.exit(1)
RES_NAME = sys.argv[1] # could be hub, queue or container
dense = 1
if len(sys.argv) >= 3:
dense = int(sys.argv[2])
# get your production method
prod = get_eventhub_prod_fn(RES_NAME)
# or prod = get_blobstorage_prod_fn(RES_NAME)
# or prod = get_queue_prod_fn(RES_NAME)
def pub(ts, tid):
tapp = STARTUP_TIME + ts * 1000
value = '({},t-{})'.format(ts, tid)
data = dict(tapp=tapp, value=value)
msg = json.dumps(data, encoding='utf-8')
prod(msg)
# or, in the case of blobs
# prod(msg, 'blob' + str(tid))
print '>>>', msg
last_ts = -1
last_id = 0
while True:
# choose number of simultaneous tuples
no_sim_tuples = 1
if not dense:
no_sim_tuples = unbalanced_randint(MAX_SIM_TUPLES)
# choose a gap in seconds
gap = 1
if not dense:
gap = unbalanced_randint(MAX_GAP)
# sleep for gap
time.sleep(gap)
# update timestamps
last_ts += gap
for _ in xrange(no_sim_tuples):
pub(last_ts, last_id)
last_id += 1
azure-servicebus
azure-storage
Endpoint=[...];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[...]
{
"account": "foo",
"key": "..."
}
import random
from azure.servicebus import ServiceBusService
from azure.storage.blob import BlockBlobService
STORAGE_CREDS = 'storage_credentials.json'
SERVICEBUS_CREDS = 'servicebus_credentials.txt'
def get_storage_credentials():
creds = {}
with open(STORAGE_CREDS, 'r') as fp:
creds = json.load(fp)
return creds['account'], creds['key']
def get_servicebus_credentials():
creds = {}
with open(SERVICEBUS_CREDS, 'r') as f:
line = f.readline()
kvs = line.split(';')
for kv in kvs:
k, v = kv.strip().split('=', 1)
creds[k] = v
return creds
def get_servicebus():
creds = get_servicebus_credentials()
namespace = creds['Endpoint'].split('.')[0][5:]
sak_name = creds['SharedAccessKeyName']
sak_value = creds['SharedAccessKey']
return ServiceBusService(
service_namespace=namespace,
shared_access_key_name=sak_name,
shared_access_key_value=sak_value
)
def get_eventhub_prod_fn(hub_name):
bs = get_servicebus()
return lambda msg: bs.send_event(hub_name, msg, device_id="mylaptop")
def get_queue_prod_fn(q_name):
bs = get_servicebus()
return lambda msg: bs.send_queue_message(q_name, msg)
def get_blobstorage_prod_fn(c_name):
account, key = get_storage_credentials()
bs = BlockBlobService(account, key)
return lambda msg, blob_name: bs.create_blob_from_text(c_name, blob_name, msg)
def unbalanced_randint(upper):
assert upper >= 2
if random.random() < 0.7:
return 1
return random.randint(2, upper)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment