Skip to content

Instantly share code, notes, and snippets.

@SpicySyntax
Created July 9, 2019 15:40
Show Gist options
  • Save SpicySyntax/50fd369966ca46ba64a615c627e19214 to your computer and use it in GitHub Desktop.
Save SpicySyntax/50fd369966ca46ba64a615c627e19214 to your computer and use it in GitHub Desktop.
Queue Trigger not working
import azure.functions as func
import json
from . import predict
from ..shared import logger
# Azure Function Main Method
async def main(classifyBatch: func.QueueMessage, res: func.Out[func.QueueMessage]) -> None:
await queue_classification_results(classifyBatch, res)
# Method to obtain and enqueue classification results
async def queue_classification_results(classifyBatch: func.QueueMessage, res: func.Out[func.QueueMessage]) -> None:
"""Queue Trigger Azure Function that obtains and enqueus classification results
Args:
classifyBatch: meta-data queue message for a batch images to be classified
res: resulting queue message with classification results
Returns:
void
"""
# Get Message Args
infer_batch_message = classifyBatch.get_json()
jwt = infer_batch_message.get('Jwt')
image_uris = infer_batch_message.get('BlobUris')
activity_id = infer_batch_message.get('ActivityId')
image_metas = infer_batch_message.get('ImageMetas')
image_ids = list(map(lambda _ : _['FileId'], image_metas))
user_id = infer_batch_message.get('UserId')
job_id = infer_batch_message.get('JobId')
# Init Dependencies
log = logger.Logger(activity_id)
predictor = predict.Predictor(log)
log.log_info('Python Classificaion Queue Triggered')
# Initialize model
log.log_info("Initializing model...")
predictor.initialize()
# Prediction step
log.log_info('Classifying images: {}'.format(image_uris))
results = predictor.predict_uris(image_uris)
inference_results = []
for i, result in enumerate(results):
inference_result = {
"ImageId": image_ids[i],
"ClassificationResults": [result],
"SegmentationResults": []
}
inference_results.append(inference_result)
inference_results = {
"Results": inference_results,
"UserId": user_id,
"JobId": job_id
}
# Push Inference Results back onto Azure Message Queue
result_json = json.dumps(inference_results)
log.log_info('Obtained results: {}'.format(result_json))
log.dispatch_logs(jwt)
res.set(result_json)
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
using ObjectDetection.Models;
using ObjectDetection.Services;
namespace ObjectDetection.Functions
{
// Queue Trigger for processing initial upload requests, forwarding to be Resized or Infered
public class ImageQueueTrigger: BaseFunc
{
private readonly IInferenceClient _inferenceClient;
private readonly IAzureStorageClient _azureStorageClient;
private readonly IAuthorizationHandler _authHandler;
public ImageQueueTrigger(
IInferenceClient inferenceClient,
IAzureStorageClient azureStorageClient,
ILogDispatcher logger,
IAuthorizationHandler authHandler): base(logger, "ImageQueueTriggerFunc")
{
_inferenceClient = inferenceClient;
_azureStorageClient = azureStorageClient;
_authHandler = authHandler;
}
[FunctionName("ImageQueueTrigger")]
public async Task Run([QueueTrigger("image-batch-items", Connection = "AzureWebJobsStorage")]InferenceQueueMessage inferenceQueueMessage)
{
try
{
var jwt = await _authHandler.GetClientCredentialsToken(Logger.Scope());
Logger.LogInformation($"Fetched Service AuthToken{jwt}");
var resizeFiles = inferenceQueueMessage.ImageMetas.Where(_ => _.Resize).ToList();
if (resizeFiles.Any())
{
var resizeQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage)
{
Jwt = jwt,
ActivityId = ActivityId,
ImageMetas = resizeFiles,
};
var seriResizeQueueMessage = JsonConvert.SerializeObject(resizeQueueMessage);
await _azureStorageClient.AddQueueMessage("image-resize-items",
new CloudQueueMessage(seriResizeQueueMessage));
}
var readyFiles = inferenceQueueMessage.ImageMetas.Where(_ => !_.Resize).ToList();
if (readyFiles.Any())
{
Logger.LogInformation("Fetching Blob Uris With Sas Token");
var sasUriTasks = new Task<string>[readyFiles.Count];
foreach ((int index, var fileMeta) in readyFiles.Enumerate())
sasUriTasks[index] = _azureStorageClient.GetBlobUriWithSasToken(fileMeta.FileId);
Task.WaitAll(sasUriTasks);
var sasUris = new string[sasUriTasks.Length];
foreach ((int index, var sasUriTask) in sasUriTasks.Enumerate())
sasUris[index] = await sasUriTask;
var inferenceBackendQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage)
{
Jwt = jwt,
ActivityId = ActivityId,
BlobUris = sasUris,
ImageMetas = readyFiles,
};
var modelValid =
CvModelDefs.Models.TryGetValue(inferenceBackendQueueMessage.ModelName, out CvModelType type);
if (modelValid)
{
if (type.Value == CvModelType.Classification.Value)
{
Logger.LogInformation("Queueing Classification Requests For Python Function");
if (await _inferenceClient.RequestClassificationResults(inferenceBackendQueueMessage))
Logger.LogInformation("Queued Classification Request");
else
throw new Exception("Error Queueing Classicifation Request");
}
if (type.Value == CvModelType.InstanceSegmentation.Value)
{
Logger.LogInformation("Queueing Segmentation Requests For Python Function");
if (await _inferenceClient.RequestSegmentationResults(inferenceBackendQueueMessage))
Logger.LogInformation("Queued Segmentation Request");
else
throw new Exception("Error Queueing Segmentation Request");
}
}
else
{
throw new Exception("Error getting Model Type");
}
}
}
catch (Exception e)
{
Logger.LogError($"Error Processing Blob or Queueing Inference Request: {e.ToString()}");
}
await Logger.DispatchLogs();
}
}
}
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net.Http;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
using ObjectDetection.Services;
using ObjectDetection.Models;
using Newtonsoft.Json;
namespace ObjectDetection.Functions
{
// Http Trigger Function for Image File Upload
public class UploadImage: BaseFunc
{
private readonly ICosmosDbClient<ImageRecord> _imageCosmosDbClient;
private readonly ICosmosDbClient<CvJob> _jobCosmosDbClient;
private readonly IAzureStorageClient _azureStorageClient;
private readonly IAuthorizationHandler _authHandler;
private const int MaxNumberOfFiles = 10;
private const int MaxFileSize = 20000000; // in bytes => 20 MB
private const int MaxBlobSize = 400000; // in bytes => 400KB
private const double FileSizeDivisor = 700000.0; // Used to ensure resized output is arount max-blob-size
public UploadImage(ICosmosDbClient<ImageRecord> imageCosmosDbClient,
ICosmosDbClient<CvJob> jobCosmosDbClient, IAzureStorageClient azureStorageClient,
IAuthorizationHandler authHandler, ILogDispatcher logger) :
base(logger, "UploadImagesFunc")
{
_imageCosmosDbClient = imageCosmosDbClient;
_jobCosmosDbClient = jobCosmosDbClient;
_azureStorageClient = azureStorageClient;
_authHandler = authHandler;
}
[FunctionName("UploadImage")]
public async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "upload-image")] HttpRequestMessage req)
{
Logger.LogInformation("Upload Image HTTP trigger function processed a request.");
var names = string.Empty;
var userInfo = await _authHandler.GetUserInfo(req);
if (string.IsNullOrEmpty(userInfo?.Email))
{
Logger.LogInformation("Unauthorized Request");
return await ExitWith(new HttpResponseMessage(HttpStatusCode.Unauthorized));
}
var queryNameVals = req.RequestUri?.ParseQueryString();
string modelName = null;
string jobId = null;
if (queryNameVals != null)
{
modelName = queryNameVals.Get("modelName");
jobId = queryNameVals.Get("jobId");
}
if (modelName == null || !CvModelDefs.IsValidName(modelName)
|| jobId == null || !ParamValidation.IsValidCosmosId(jobId))
return await ExitWith(new HttpResponseMessage(HttpStatusCode.BadRequest));
var userId = userInfo.Sub;
var userJobTableName = _jobCosmosDbClient.AppendCollectionSuffix(userId);
var targetJob = await _jobCosmosDbClient.GetRecordById(jobId, userJobTableName);
if (targetJob == null)
return await ExitWith(new HttpResponseMessage(HttpStatusCode.NotFound));
Logger.LogInformation($"File Upload request for model: {modelName}");
Logger.LogInformation("Reading Multipart Async");
var uploadTasks = await BeginUploadTasks(req, userInfo, modelName, jobId);
if (uploadTasks.Count < 1)
{
Logger.LogInformation("Invalid Number of Files uploaded");
return await ExitWith(new HttpResponseMessage(HttpStatusCode.BadRequest));
}
try
{
var imageMetas = await ResolveFileMetas(uploadTasks);
names = JsonConvert.SerializeObject(await EnqueueInferenceReqeust(imageMetas,
userInfo, userJobTableName, targetJob, modelName, jobId));
}
catch (Exception e)
{
Logger.LogError($"Error updating cosmos or uploading file: {e}");
return await ExitWith(new HttpResponseMessage(HttpStatusCode.InternalServerError));
}
return await ExitWith(new HttpResponseMessage(HttpStatusCode.Created)
{
Content = new StringContent(names)
});
}
public async Task<IEnumerable<string>> EnqueueInferenceReqeust(IList<FileMeta> imageMetas, UserInfo userInfo,
string userJobTableName, CvJob targetJob, string modelName, string jobId)
{
var inferenceQueueMessage = new InferenceQueueMessage()
{
ImageMetas = imageMetas,
ModelName = modelName,
UserId = userInfo.Sub,
JobId = jobId
};
var fileIds = imageMetas.Select(_ => _.FileId);
await UpdateJobWithIds(targetJob, fileIds, userJobTableName);
var seriQueueMessage = JsonConvert.SerializeObject(inferenceQueueMessage);
await _azureStorageClient.AddQueueMessage("image-batch-items", new CloudQueueMessage(seriQueueMessage));
return fileIds;
}
public async Task<IList<FileMeta>> ResolveFileMetas(IList<Task<FileMeta>> uploadTasks)
{
var imageMetas = new List<FileMeta>();
while (uploadTasks.Count > 0)
{
var imageNameTask = await Task.WhenAny(uploadTasks);
uploadTasks.Remove(imageNameTask);
var imageMeta = await imageNameTask;
if (imageMeta == null)
throw new Exception("Unable to upload one image");
imageMetas.Add(imageMeta);
}
return imageMetas;
}
public async Task<IList<Task<FileMeta>>> BeginUploadTasks(HttpRequestMessage req,
UserInfo userInfo, string modelName, string jobId)
{
var provider = new MultipartMemoryStreamProvider();
try
{
await req.Content.ReadAsMultipartAsync(provider);
}
catch (Exception e)
{
Logger.LogWarning(new EventId(), e, "Unable to read multipart content");
}
var uploadTasks = new List<Task<FileMeta>>();
if (ContentsAreValid(provider.Contents))
foreach (var content in provider.Contents)
uploadTasks.Add(UploadFile(content, userInfo, modelName, jobId));
return uploadTasks;
}
public async Task<bool> UpdateJobWithIds(CvJob job, IEnumerable<string> newIds, string collectionName)
{
job.ImageIds = job.ImageIds.Concat(newIds).ToArray();
var updates = new List<CosmosDbUpdate<CvJob>>()
{
job.GetDbUpdate()
};
return await _jobCosmosDbClient.UpdateRecords(updates, collectionName);
}
public async Task<FileMeta> UploadFile(HttpContent content, UserInfo userInfo, string targetModelName, string jobId)
{
Logger.LogInformation("Converting File to Byte array to be uploaded");
var fileStream = await content.ReadAsStreamAsync();
byte[] fileBytes = null;
using (fileStream)
{
using (var ms = new System.IO.MemoryStream())
{
await fileStream.CopyToAsync(ms);
fileBytes = ms.ToArray();
}
}
var fileInfo = content.Headers.ContentDisposition;
var fileName = fileInfo.FileName.Replace("\"", "");
var fileType = FileType.Resolve(content.Headers.ContentType.ToString().ToLower());
var resize = fileBytes.LongLength >= MaxBlobSize;
double resizeFactor = resize ? (double)fileBytes.LongLength / FileSizeDivisor : 1;
var newImage = new ImageRecord()
{
FileName = fileName,
Size = resize ? (long)MaxBlobSize : fileBytes.LongLength,
Status = ImageStatus.Recieved.Value,
DateTimeCreated = DateTime.UtcNow,
TargetModelName = targetModelName
};
try
{
var imageTableSuffix = $"{userInfo.Sub}-{jobId}";
var userImageTableName = _imageCosmosDbClient.AppendCollectionSuffix(imageTableSuffix);
Logger.LogInformation("Creating Image Record");
var imageMeta = await _imageCosmosDbClient.CreateRecord(newImage, userImageTableName);
var imageName = imageMeta.Id;
if (!(await _azureStorageClient.SaveToBlobStorage(imageName, fileName,
content.Headers.ContentType.ToString().ToLower(), fileBytes)))
throw new Exception("Error Saving image file to blob storage");
var ret = new FileMeta()
{
FileName = fileName,
FileId = imageName,
Resize = resize,
FileType = fileType.Value,
ResizeFactor = resizeFactor,
};
return ret;
}
catch (Exception e)
{
Logger.LogError($"Error updating cosmos or uploading file: {e}");
return null;
}
}
public bool ContentsAreValid(Collection<HttpContent> contents)
{
var uploadCount = contents.Count;
return uploadCount > 0 && contents.All(_ => FileType.IsValid(_.Headers.ContentType.ToString().ToLower())
&& uploadCount <= MaxNumberOfFiles || _.Headers.ContentLength <= MaxFileSize);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment