Created
July 9, 2019 15:40
-
-
Save SpicySyntax/50fd369966ca46ba64a615c627e19214 to your computer and use it in GitHub Desktop.
Queue Trigger not working
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import azure.functions as func | |
import json | |
from . import predict | |
from ..shared import logger | |
# Azure Function Main Method | |
async def main(classifyBatch: func.QueueMessage, res: func.Out[func.QueueMessage]) -> None: | |
await queue_classification_results(classifyBatch, res) | |
# Method to obtain and enqueue classification results | |
async def queue_classification_results(classifyBatch: func.QueueMessage, res: func.Out[func.QueueMessage]) -> None: | |
"""Queue Trigger Azure Function that obtains and enqueus classification results | |
Args: | |
classifyBatch: meta-data queue message for a batch images to be classified | |
res: resulting queue message with classification results | |
Returns: | |
void | |
""" | |
# Get Message Args | |
infer_batch_message = classifyBatch.get_json() | |
jwt = infer_batch_message.get('Jwt') | |
image_uris = infer_batch_message.get('BlobUris') | |
activity_id = infer_batch_message.get('ActivityId') | |
image_metas = infer_batch_message.get('ImageMetas') | |
image_ids = list(map(lambda _ : _['FileId'], image_metas)) | |
user_id = infer_batch_message.get('UserId') | |
job_id = infer_batch_message.get('JobId') | |
# Init Dependencies | |
log = logger.Logger(activity_id) | |
predictor = predict.Predictor(log) | |
log.log_info('Python Classificaion Queue Triggered') | |
# Initialize model | |
log.log_info("Initializing model...") | |
predictor.initialize() | |
# Prediction step | |
log.log_info('Classifying images: {}'.format(image_uris)) | |
results = predictor.predict_uris(image_uris) | |
inference_results = [] | |
for i, result in enumerate(results): | |
inference_result = { | |
"ImageId": image_ids[i], | |
"ClassificationResults": [result], | |
"SegmentationResults": [] | |
} | |
inference_results.append(inference_result) | |
inference_results = { | |
"Results": inference_results, | |
"UserId": user_id, | |
"JobId": job_id | |
} | |
# Push Inference Results back onto Azure Message Queue | |
result_json = json.dumps(inference_results) | |
log.log_info('Obtained results: {}'.format(result_json)) | |
log.dispatch_logs(jwt) | |
res.set(result_json) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Extensions.Logging; | |
using Microsoft.WindowsAzure.Storage.Queue; | |
using Newtonsoft.Json; | |
using ObjectDetection.Models; | |
using ObjectDetection.Services; | |
namespace ObjectDetection.Functions | |
{ | |
// Queue Trigger for processing initial upload requests, forwarding to be Resized or Infered | |
public class ImageQueueTrigger: BaseFunc | |
{ | |
private readonly IInferenceClient _inferenceClient; | |
private readonly IAzureStorageClient _azureStorageClient; | |
private readonly IAuthorizationHandler _authHandler; | |
public ImageQueueTrigger( | |
IInferenceClient inferenceClient, | |
IAzureStorageClient azureStorageClient, | |
ILogDispatcher logger, | |
IAuthorizationHandler authHandler): base(logger, "ImageQueueTriggerFunc") | |
{ | |
_inferenceClient = inferenceClient; | |
_azureStorageClient = azureStorageClient; | |
_authHandler = authHandler; | |
} | |
[FunctionName("ImageQueueTrigger")] | |
public async Task Run([QueueTrigger("image-batch-items", Connection = "AzureWebJobsStorage")]InferenceQueueMessage inferenceQueueMessage) | |
{ | |
try | |
{ | |
var jwt = await _authHandler.GetClientCredentialsToken(Logger.Scope()); | |
Logger.LogInformation($"Fetched Service AuthToken{jwt}"); | |
var resizeFiles = inferenceQueueMessage.ImageMetas.Where(_ => _.Resize).ToList(); | |
if (resizeFiles.Any()) | |
{ | |
var resizeQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage) | |
{ | |
Jwt = jwt, | |
ActivityId = ActivityId, | |
ImageMetas = resizeFiles, | |
}; | |
var seriResizeQueueMessage = JsonConvert.SerializeObject(resizeQueueMessage); | |
await _azureStorageClient.AddQueueMessage("image-resize-items", | |
new CloudQueueMessage(seriResizeQueueMessage)); | |
} | |
var readyFiles = inferenceQueueMessage.ImageMetas.Where(_ => !_.Resize).ToList(); | |
if (readyFiles.Any()) | |
{ | |
Logger.LogInformation("Fetching Blob Uris With Sas Token"); | |
var sasUriTasks = new Task<string>[readyFiles.Count]; | |
foreach ((int index, var fileMeta) in readyFiles.Enumerate()) | |
sasUriTasks[index] = _azureStorageClient.GetBlobUriWithSasToken(fileMeta.FileId); | |
Task.WaitAll(sasUriTasks); | |
var sasUris = new string[sasUriTasks.Length]; | |
foreach ((int index, var sasUriTask) in sasUriTasks.Enumerate()) | |
sasUris[index] = await sasUriTask; | |
var inferenceBackendQueueMessage = new InferenceBackendQueueMessage(inferenceQueueMessage) | |
{ | |
Jwt = jwt, | |
ActivityId = ActivityId, | |
BlobUris = sasUris, | |
ImageMetas = readyFiles, | |
}; | |
var modelValid = | |
CvModelDefs.Models.TryGetValue(inferenceBackendQueueMessage.ModelName, out CvModelType type); | |
if (modelValid) | |
{ | |
if (type.Value == CvModelType.Classification.Value) | |
{ | |
Logger.LogInformation("Queueing Classification Requests For Python Function"); | |
if (await _inferenceClient.RequestClassificationResults(inferenceBackendQueueMessage)) | |
Logger.LogInformation("Queued Classification Request"); | |
else | |
throw new Exception("Error Queueing Classicifation Request"); | |
} | |
if (type.Value == CvModelType.InstanceSegmentation.Value) | |
{ | |
Logger.LogInformation("Queueing Segmentation Requests For Python Function"); | |
if (await _inferenceClient.RequestSegmentationResults(inferenceBackendQueueMessage)) | |
Logger.LogInformation("Queued Segmentation Request"); | |
else | |
throw new Exception("Error Queueing Segmentation Request"); | |
} | |
} | |
else | |
{ | |
throw new Exception("Error getting Model Type"); | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Logger.LogError($"Error Processing Blob or Queueing Inference Request: {e.ToString()}"); | |
} | |
await Logger.DispatchLogs(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.ObjectModel; | |
using System.Linq; | |
using System.Net.Http; | |
using System.Net; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.WebJobs; | |
using Microsoft.Azure.WebJobs.Extensions.Http; | |
using Microsoft.Extensions.Logging; | |
using Microsoft.WindowsAzure.Storage.Queue; | |
using ObjectDetection.Services; | |
using ObjectDetection.Models; | |
using Newtonsoft.Json; | |
namespace ObjectDetection.Functions | |
{ | |
// Http Trigger Function for Image File Upload | |
public class UploadImage: BaseFunc | |
{ | |
private readonly ICosmosDbClient<ImageRecord> _imageCosmosDbClient; | |
private readonly ICosmosDbClient<CvJob> _jobCosmosDbClient; | |
private readonly IAzureStorageClient _azureStorageClient; | |
private readonly IAuthorizationHandler _authHandler; | |
private const int MaxNumberOfFiles = 10; | |
private const int MaxFileSize = 20000000; // in bytes => 20 MB | |
private const int MaxBlobSize = 400000; // in bytes => 400KB | |
private const double FileSizeDivisor = 700000.0; // Used to ensure resized output is arount max-blob-size | |
public UploadImage(ICosmosDbClient<ImageRecord> imageCosmosDbClient, | |
ICosmosDbClient<CvJob> jobCosmosDbClient, IAzureStorageClient azureStorageClient, | |
IAuthorizationHandler authHandler, ILogDispatcher logger) : | |
base(logger, "UploadImagesFunc") | |
{ | |
_imageCosmosDbClient = imageCosmosDbClient; | |
_jobCosmosDbClient = jobCosmosDbClient; | |
_azureStorageClient = azureStorageClient; | |
_authHandler = authHandler; | |
} | |
[FunctionName("UploadImage")] | |
public async Task<HttpResponseMessage> Run( | |
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "upload-image")] HttpRequestMessage req) | |
{ | |
Logger.LogInformation("Upload Image HTTP trigger function processed a request."); | |
var names = string.Empty; | |
var userInfo = await _authHandler.GetUserInfo(req); | |
if (string.IsNullOrEmpty(userInfo?.Email)) | |
{ | |
Logger.LogInformation("Unauthorized Request"); | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.Unauthorized)); | |
} | |
var queryNameVals = req.RequestUri?.ParseQueryString(); | |
string modelName = null; | |
string jobId = null; | |
if (queryNameVals != null) | |
{ | |
modelName = queryNameVals.Get("modelName"); | |
jobId = queryNameVals.Get("jobId"); | |
} | |
if (modelName == null || !CvModelDefs.IsValidName(modelName) | |
|| jobId == null || !ParamValidation.IsValidCosmosId(jobId)) | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.BadRequest)); | |
var userId = userInfo.Sub; | |
var userJobTableName = _jobCosmosDbClient.AppendCollectionSuffix(userId); | |
var targetJob = await _jobCosmosDbClient.GetRecordById(jobId, userJobTableName); | |
if (targetJob == null) | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.NotFound)); | |
Logger.LogInformation($"File Upload request for model: {modelName}"); | |
Logger.LogInformation("Reading Multipart Async"); | |
var uploadTasks = await BeginUploadTasks(req, userInfo, modelName, jobId); | |
if (uploadTasks.Count < 1) | |
{ | |
Logger.LogInformation("Invalid Number of Files uploaded"); | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.BadRequest)); | |
} | |
try | |
{ | |
var imageMetas = await ResolveFileMetas(uploadTasks); | |
names = JsonConvert.SerializeObject(await EnqueueInferenceReqeust(imageMetas, | |
userInfo, userJobTableName, targetJob, modelName, jobId)); | |
} | |
catch (Exception e) | |
{ | |
Logger.LogError($"Error updating cosmos or uploading file: {e}"); | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.InternalServerError)); | |
} | |
return await ExitWith(new HttpResponseMessage(HttpStatusCode.Created) | |
{ | |
Content = new StringContent(names) | |
}); | |
} | |
public async Task<IEnumerable<string>> EnqueueInferenceReqeust(IList<FileMeta> imageMetas, UserInfo userInfo, | |
string userJobTableName, CvJob targetJob, string modelName, string jobId) | |
{ | |
var inferenceQueueMessage = new InferenceQueueMessage() | |
{ | |
ImageMetas = imageMetas, | |
ModelName = modelName, | |
UserId = userInfo.Sub, | |
JobId = jobId | |
}; | |
var fileIds = imageMetas.Select(_ => _.FileId); | |
await UpdateJobWithIds(targetJob, fileIds, userJobTableName); | |
var seriQueueMessage = JsonConvert.SerializeObject(inferenceQueueMessage); | |
await _azureStorageClient.AddQueueMessage("image-batch-items", new CloudQueueMessage(seriQueueMessage)); | |
return fileIds; | |
} | |
public async Task<IList<FileMeta>> ResolveFileMetas(IList<Task<FileMeta>> uploadTasks) | |
{ | |
var imageMetas = new List<FileMeta>(); | |
while (uploadTasks.Count > 0) | |
{ | |
var imageNameTask = await Task.WhenAny(uploadTasks); | |
uploadTasks.Remove(imageNameTask); | |
var imageMeta = await imageNameTask; | |
if (imageMeta == null) | |
throw new Exception("Unable to upload one image"); | |
imageMetas.Add(imageMeta); | |
} | |
return imageMetas; | |
} | |
public async Task<IList<Task<FileMeta>>> BeginUploadTasks(HttpRequestMessage req, | |
UserInfo userInfo, string modelName, string jobId) | |
{ | |
var provider = new MultipartMemoryStreamProvider(); | |
try | |
{ | |
await req.Content.ReadAsMultipartAsync(provider); | |
} | |
catch (Exception e) | |
{ | |
Logger.LogWarning(new EventId(), e, "Unable to read multipart content"); | |
} | |
var uploadTasks = new List<Task<FileMeta>>(); | |
if (ContentsAreValid(provider.Contents)) | |
foreach (var content in provider.Contents) | |
uploadTasks.Add(UploadFile(content, userInfo, modelName, jobId)); | |
return uploadTasks; | |
} | |
public async Task<bool> UpdateJobWithIds(CvJob job, IEnumerable<string> newIds, string collectionName) | |
{ | |
job.ImageIds = job.ImageIds.Concat(newIds).ToArray(); | |
var updates = new List<CosmosDbUpdate<CvJob>>() | |
{ | |
job.GetDbUpdate() | |
}; | |
return await _jobCosmosDbClient.UpdateRecords(updates, collectionName); | |
} | |
public async Task<FileMeta> UploadFile(HttpContent content, UserInfo userInfo, string targetModelName, string jobId) | |
{ | |
Logger.LogInformation("Converting File to Byte array to be uploaded"); | |
var fileStream = await content.ReadAsStreamAsync(); | |
byte[] fileBytes = null; | |
using (fileStream) | |
{ | |
using (var ms = new System.IO.MemoryStream()) | |
{ | |
await fileStream.CopyToAsync(ms); | |
fileBytes = ms.ToArray(); | |
} | |
} | |
var fileInfo = content.Headers.ContentDisposition; | |
var fileName = fileInfo.FileName.Replace("\"", ""); | |
var fileType = FileType.Resolve(content.Headers.ContentType.ToString().ToLower()); | |
var resize = fileBytes.LongLength >= MaxBlobSize; | |
double resizeFactor = resize ? (double)fileBytes.LongLength / FileSizeDivisor : 1; | |
var newImage = new ImageRecord() | |
{ | |
FileName = fileName, | |
Size = resize ? (long)MaxBlobSize : fileBytes.LongLength, | |
Status = ImageStatus.Recieved.Value, | |
DateTimeCreated = DateTime.UtcNow, | |
TargetModelName = targetModelName | |
}; | |
try | |
{ | |
var imageTableSuffix = $"{userInfo.Sub}-{jobId}"; | |
var userImageTableName = _imageCosmosDbClient.AppendCollectionSuffix(imageTableSuffix); | |
Logger.LogInformation("Creating Image Record"); | |
var imageMeta = await _imageCosmosDbClient.CreateRecord(newImage, userImageTableName); | |
var imageName = imageMeta.Id; | |
if (!(await _azureStorageClient.SaveToBlobStorage(imageName, fileName, | |
content.Headers.ContentType.ToString().ToLower(), fileBytes))) | |
throw new Exception("Error Saving image file to blob storage"); | |
var ret = new FileMeta() | |
{ | |
FileName = fileName, | |
FileId = imageName, | |
Resize = resize, | |
FileType = fileType.Value, | |
ResizeFactor = resizeFactor, | |
}; | |
return ret; | |
} | |
catch (Exception e) | |
{ | |
Logger.LogError($"Error updating cosmos or uploading file: {e}"); | |
return null; | |
} | |
} | |
public bool ContentsAreValid(Collection<HttpContent> contents) | |
{ | |
var uploadCount = contents.Count; | |
return uploadCount > 0 && contents.All(_ => FileType.IsValid(_.Headers.ContentType.ToString().ToLower()) | |
&& uploadCount <= MaxNumberOfFiles || _.Headers.ContentLength <= MaxFileSize); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment