Skip to content

Instantly share code, notes, and snippets.

@Ray901
Created January 20, 2017 07:27
Show Gist options
  • Save Ray901/7e485c5971c9df293fd144a28af8d026 to your computer and use it in GitHub Desktop.
Save Ray901/7e485c5971c9df293fd144a28af8d026 to your computer and use it in GitHub Desktop.
retrain azure ML model using azure BES
# coding=UTF-8
# How this works:
#
# 1. Assume the input is present in a local file (if the web service accepts input)
# 2. Upload the file to an Azure blob - you"d need an Azure storage account
# 3. Call BES to process the data in the blob.
# 4. The results get written to another Azure blob.
# 5. Download the output blob to a local file
#
# Note: You may need to download/install the Azure SDK for Python.
# See: http://azure.microsoft.com/en-us/documentation/articles/python-how-to-install/
import csv
import pymssql
import urllib2
import json
import time
from azure.storage.blob import *
setServer = ""
setUser = ""
setPass = ""
setDB = ""
setLocalInputBlob1 = ""
setAzureInputBlob1 = ""
setBlobOutput = ""
set_storage_account_name = "" # Replace this with your Azure Storage Account name
set_storage_account_key = "" # Replace this with your Azure Storage Key
set_storage_container_name = "" # Replace this with your Azure Storage Container name
setAzureWebApiKey = "" # Replace this with the API key for the web service
setAzureWebUrl = ""
# Connect to MSSQL Server
conn = pymssql.connect(server=setServer,
user=setUser,
password=setPass,
database=setDB)
# Create a database cursor
cursor = conn.cursor()
# Replace this nonsense with your own query :)
query = """
SELECT * FROM DB
"""
# Execute the query
cursor.execute(query)
# Go through the results row-by-row and write the output to a CSV file
# (QUOTE_NONNUMERIC applies quotes to non-numeric data; change this to
# QUOTE_NONE for no quotes. See https://docs.python.org/2/library/csv.html
# for other settings options)
with open(setLocalInputBlob1, "wb") as outfile:
writer = csv.writer(outfile, quoting=csv.QUOTE_NONNUMERIC)
column_names = [unicode(i[0]).encode('utf-8') for i in cursor.description]
writer.writerow(column_names)
for row in cursor:
row=[unicode(s).encode('utf-8') for s in row]
writer.writerow(row)
# Close the cursor and the database connection
cursor.close()
conn.close()
##############################################################################################
def printHttpError(httpError):
print("The request failed with status code: " + str(httpError.code))
# Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
print(httpError.info())
print(json.loads(httpError.read()))
return
def saveBlobToFile(blobUrl, resultsLabel):
output_file = setBlobOutput # Replace this with the location you would like to use for your output file, and valid file extension (usually .csv for scoring results, or .ilearner for trained models)
print("Reading the result from " + blobUrl)
try:
response = urllib2.urlopen(blobUrl)
except urllib2.HTTPError, error:
printHttpError(error)
return
with open(output_file, "w+") as f:
f.write(response.read())
print(resultsLabel + " have been written to the file " + output_file)
return
def processResults(result):
first = True
results = result["Results"]
for outputName in results:
result_blob_location = results[outputName]
sas_token = result_blob_location["SasBlobToken"]
base_url = result_blob_location["BaseLocation"]
relative_url = result_blob_location["RelativeLocation"]
print("The results for " + outputName + " are available at the following Azure Storage location:")
print("BaseLocation: " + base_url)
print("RelativeLocation: " + relative_url)
print("SasBlobToken: " + sas_token)
if (first):
first = False
url3 = base_url + relative_url + sas_token
saveBlobToFile(url3, "The results for " + outputName)
return
def uploadFileToBlob(input_file, input_blob_name, storage_container_name, storage_account_name, storage_account_key):
blob_service = BlockBlobService(account_name=storage_account_name, account_key=storage_account_key)
print("Uploading the input to blob storage...")
blob_service.create_blob_from_path(storage_container_name, input_blob_name, input_file)
def invokeBatchExecutionService():
storage_account_name = set_storage_account_name # Replace this with your Azure Storage Account name
storage_account_key = set_storage_account_key # Replace this with your Azure Storage Key
storage_container_name = set_storage_container_name # Replace this with your Azure Storage Container name
connection_string = "DefaultEndpointsProtocol=https;AccountName=" + storage_account_name + ";AccountKey=" + storage_account_key
api_key = setAzureWebApiKey # Replace this with the API key for the web service
url = setAzureWebUrl
uploadFileToBlob(setLocalInputBlob1, # Replace this with the location of your input file, and valid file extension (usually .csv)
setAzureInputBlob1, # Replace this with the name you would like to use for your Azure blob; this needs to have the same extension as the input file
storage_container_name, storage_account_name, storage_account_key);
payload = {
"Inputs": {
"input1":
{
"ConnectionString": connection_string,
"RelativeLocation": "/" + storage_container_name + "/input1datablob.csv"
},
},
"Outputs": {
"output2":
{
"ConnectionString": connection_string,
"RelativeLocation": "/" + storage_container_name + "/output2results.ilearner" # Replace this with the location you would like to use for your output file, and valid file extension (usually .csv for scoring results, or .ilearner for trained models)
},
"output1":
{
"ConnectionString": connection_string,
"RelativeLocation": "/" + storage_container_name + "/output1results.csv" # Replace this with the location you would like to use for your output file, and valid file extension (usually .csv for scoring results, or .ilearner for trained models)
},
},
"GlobalParameters": {
}
}
body = str.encode(json.dumps(payload))
headers = { "Content-Type":"application/json", "Authorization":("Bearer " + api_key)}
print("Submitting the job...")
# submit the job
req = urllib2.Request(url + "?api-version=2.0", body, headers)
try:
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
printHttpError(error)
return
result = response.read()
job_id = result[1:-1]
print("Job ID: " + job_id)
# start the job
print("Starting the job...")
body = str.encode(json.dumps({}))
req = urllib2.Request(url + "/" + job_id + "/start?api-version=2.0", body, headers)
try:
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
printHttpError(error)
return
url2 = url + "/" + job_id + "?api-version=2.0"
while True:
print("Checking the job status...")
req = urllib2.Request(url2, headers = { "Authorization":("Bearer " + api_key) })
try:
response = urllib2.urlopen(req)
except urllib2.HTTPError, error:
printHttpError(error)
return
result = json.loads(response.read())
status = result["StatusCode"]
if (status == 0 or status == "NotStarted"):
print("Job " + job_id + " not yet started...")
elif (status == 1 or status == "Running"):
print("Job " + job_id + " running...")
elif (status == 2 or status == "Failed"):
print("Job " + job_id + " failed!")
print("Error details: " + result["Details"])
break
elif (status == 3 or status == "Cancelled"):
print("Job " + job_id + " cancelled!")
break
elif (status == 4 or status == "Finished"):
print("Job " + job_id + " finished!")
processResults(result)
break
time.sleep(1) # wait one second
return
invokeBatchExecutionService()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment