Skip to content

Instantly share code, notes, and snippets.

@OElesin
Last active March 13, 2021 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OElesin/b1653ef6f40d57c524291e1aebd2575b to your computer and use it in GitHub Desktop.
Save OElesin/b1653ef6f40d57c524291e1aebd2575b to your computer and use it in GitHub Desktop.
Detect Anomalies with Amazon Lookout for Vision and publish to Amazon Kinesis Data Firehose
from time import sleep, time
from picamera import PiCamera
from datetime import datetime, timedelta
from boto3 import client
import json
import os
firehose = client('firehose')
lookout_vision = client('lookoutvision')
def detect_anomaly(img_file_path):
"""
Function will detect anomaly with Amazon Lookout for vision
and publish inference details to Amazon Kinesis Data Firehose.
"""
inference_response = lookout_vision.detect_anomalies(
ProjectName='my-cassava-project-name',
ModelVersion='4',
Body=open(img_file_path, 'rb'),
ContentType='image/jpeg'
)
inference_metadata = {
'InferenceTime': int(time()*1000.0),
'InferenceImageSize': os.stat(img_file_path).st_size,
# You may use PIL to get the image dimensions, this may be good for analytics later
}
payload = json.dumps({**inference_response, **inference_metadata})
firehose.put_record(
DeliveryStreamName='<my-delivery-stream-name>',
Record={
'Data': payload
}
)
# You may delete the file once inference is completed to converse space
# on your Raspberry Pi microSD
def wait():
# Calculate the delay to the start of the work day
start_of_work_day = datetime.today().replace(hour=8, minute=0, second=0, microsecond=0)
daily_work_hours = 9
end_of_work_day = (start_of_work_day + timedelta(hours=daily_work_hours)).replace(minute=0, second=0, microsecond=0)
delay = (end_of_work_day - start_of_work_day).seconds
sleep(delay)
camera = PiCamera()
camera.start_preview()
sleep(5)
for filename in camera.capture_continuous('img{timestamp:%Y-%m-%d-%H-%M}.jpg'):
print('Captured %s' % filename)
detect_anomaly(filename)
wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment