Last active
May 4, 2021 14:49
-
-
Save elbruno/6aa03bc9d28011d473fb639b1e006fa1 to your computer and use it in GitHub Desktop.
azureiotmodulecustomvision.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bruno Capuano - 2021 | |
# Azure IoT Module running an Azure Custom Vision webapp | |
import time | |
import os | |
import sys | |
import asyncio | |
from six.moves import input | |
import threading | |
from azure.iot.device.aio import IoTHubModuleClient | |
# Custom Vision for ARM imports | |
import json | |
import io | |
# Imports for the REST API | |
from flask import Flask, request, jsonify | |
# Imports for image procesing | |
from PIL import Image | |
# Imports for prediction | |
from predict import initialize, predict_image, predict_url | |
print ( "init flask app" ) | |
app = Flask(__name__) | |
# 4MB Max image size limit | |
app.config['MAX_CONTENT_LENGTH'] = 4 * 1024 * 1024 | |
print ( "Load and intialize the model" ) | |
# Load and intialize the model | |
initialize() | |
async def initAzureIoTModule(): | |
try: | |
if not sys.version >= "3.5.3": | |
raise Exception( "The sample requires python 3.5.3+. Current version of Python: %s" % sys.version ) | |
print ( "IoT Hub Client for Python" ) | |
# The client object is used to interact with your Azure IoT hub. | |
module_client = IoTHubModuleClient.create_from_edge_environment() | |
# connect the client. | |
await module_client.connect() | |
# define behavior for receiving an input message on input1 | |
async def input1_listener(module_client): | |
while True: | |
input_message = await module_client.receive_message_on_input("input1") # blocking call | |
print("the data in the message received on input1 was ") | |
print(input_message.data) | |
print("custom properties are") | |
print(input_message.custom_properties) | |
print("forwarding mesage to output1") | |
await module_client.send_message_to_output(input_message, "output1") | |
# Schedule task for C2D Listener | |
listeners = asyncio.gather(input1_listener(module_client)) | |
print ( "Azure IoT Module registerd and waiting for messages.") | |
except Exception as e: | |
print('EXCEPTION:', str(e)) | |
# ============================================= | |
# webapp for custom vision | |
# ============================================= | |
async def webAppCustomVision(): | |
print ( "Start CV webapp" ) | |
# Run the server | |
app.run(host='0.0.0.0', port=8089) | |
# Default route just shows simple text | |
@app.route('/') | |
def index(): | |
return 'CustomVision.ai simple drawing server.' | |
# Like the CustomVision.ai Prediction service /image route handles either | |
# - octet-stream image file | |
# - a multipart/form-data with files in the imageData parameter | |
@app.route('/image', methods=['POST']) | |
@app.route('/<project>/image', methods=['POST']) | |
@app.route('/<project>/image/nostore', methods=['POST']) | |
@app.route('/<project>/classify/iterations/<publishedName>/image', methods=['POST']) | |
@app.route('/<project>/classify/iterations/<publishedName>/image/nostore', methods=['POST']) | |
@app.route('/<project>/detect/iterations/<publishedName>/image', methods=['POST']) | |
@app.route('/<project>/detect/iterations/<publishedName>/image/nostore', methods=['POST']) | |
def predict_image_handler(project=None, publishedName=None): | |
try: | |
imageData = None | |
if ('imageData' in request.files): | |
imageData = request.files['imageData'] | |
elif ('imageData' in request.form): | |
imageData = request.form['imageData'] | |
else: | |
imageData = io.BytesIO(request.get_data()) | |
img = Image.open(imageData) | |
results = predict_image(img) | |
return jsonify(results) | |
except Exception as e: | |
print('EXCEPTION:', str(e)) | |
return 'Error processing image', 500 | |
# Like the CustomVision.ai Prediction service /url route handles url's | |
# in the body of hte request of the form: | |
# { 'Url': '<http url>'} | |
@app.route('/url', methods=['POST']) | |
@app.route('/<project>/url', methods=['POST']) | |
@app.route('/<project>/url/nostore', methods=['POST']) | |
@app.route('/<project>/classify/iterations/<publishedName>/url', methods=['POST']) | |
@app.route('/<project>/classify/iterations/<publishedName>/url/nostore', methods=['POST']) | |
@app.route('/<project>/detect/iterations/<publishedName>/url', methods=['POST']) | |
@app.route('/<project>/detect/iterations/<publishedName>/url/nostore', methods=['POST']) | |
def predict_url_handler(project=None, publishedName=None): | |
try: | |
image_url = json.loads(request.get_data().decode('utf-8'))['url'] | |
results = predict_url(image_url) | |
return jsonify(results) | |
except Exception as e: | |
print('EXCEPTION:', str(e)) | |
return 'Error processing image' | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(initAzureIoTModule()) | |
loop.run_until_complete(webAppCustomVision()) | |
#loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment