Skip to content

Instantly share code, notes, and snippets.

@aydinemre
Created May 9, 2020 16:22
Show Gist options
  • Save aydinemre/171a86ad3ca567753cb646a251fb6066 to your computer and use it in GitHub Desktop.
Save aydinemre/171a86ad3ca567753cb646a251fb6066 to your computer and use it in GitHub Desktop.
Multiprocess image annotation example with using clarifai object detection api.
#!/usr/bin/env python
# coding: utf-8
import json
import os
from datetime import datetime
from multiprocessing import Pool, cpu_count
from pathlib import Path
import pandas as pd
from PIL import Image
from clarifai.rest import ClarifaiApp
from clarifai.rest import Model
from tqdm import tqdm
RAW_IMAGE_DIR = 'Data/images/raw'
CROPPED_IMAGE_DIR = 'Data/images/cropped'
IMAGE_RESPONSE_DIR = 'Data/clarifai_responses'
app = ClarifaiApp(api_key='#API_KEY#')
model = Model(app.api, model_id='72c523807f93e18b431676fb9a58e6ad')
def api_call(image_path):
# Get file name.
filename = os.path.splitext(image_path)[0]
# JSON Response path.
response_file = os.path.join(IMAGE_RESPONSE_DIR, filename + '.json')
# Check response saved earlier.
if os.path.exists(response_file):
# If exist load from file system.
# print("Response loading from file.")
with open(response_file, 'r') as f:
response = json.load(f)
else:
# Otherwise api call.
print(datetime.now())
response = model.predict_by_filename(filename=os.path.join(RAW_IMAGE_DIR, image_path))
with open(os.path.join(response_file), 'w+') as f:
json.dump(response, f)
return response
def converter(bounding_box, width, height):
result = {
'x': bounding_box['left_col'] * width,
'y': bounding_box['top_row'] * height,
'w': (bounding_box['right_col'] * width) - (bounding_box['left_col'] * width),
'h': (bounding_box['bottom_row'] * height) - (bounding_box['top_row'] * height)
}
return result['x'], result['y'], result['x'] + result['w'], result['y'] + result['h']
def process_response(image_path, response):
file_name = os.path.splitext(image_path)[0]
im = Image.open(os.path.join(RAW_IMAGE_DIR, image_path))
if response["outputs"][0]['data'].get('regions', False):
for index, box in enumerate(response["outputs"][0]['data']['regions']):
# Convert bounding box to known format.
bounding_box = box['region_info']['bounding_box']
bounding_box = converter(bounding_box, width=im.width, height=im.height)
# Create unique file name.
data = box['data']['concepts'][0]
crop_file_name = "crop_" + file_name + "_" + data['name'].replace('/', '') + "_" + str('%.5f' % data['value']) + "_" + str(
index) + '.jpg'
if not os.path.exists(os.path.join(CROPPED_IMAGE_DIR,crop_file_name)):
im.crop(bounding_box).save(os.path.join(CROPPED_IMAGE_DIR, crop_file_name))
return True
def image_annotation(image_path):
response = api_call(image_path)
return process_response(image_path, response)
samples = os.listdir(RAW_IMAGE_DIR)
print(len(samples))
with Pool(2) as p:
print(p.map(image_annotation, samples))
print("End")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment