Skip to content

Instantly share code, notes, and snippets.

@supernlogn
Last active March 2, 2018 13:39
Show Gist options
  • Save supernlogn/d9e92731cd0fd10d8ac76e497ffdc3ce to your computer and use it in GitHub Desktop.
Save supernlogn/d9e92731cd0fd10d8ac76e497ffdc3ce to your computer and use it in GitHub Desktop.
Downloads and creates annotation for images from openImages dataset containing certain classes
"""
This file helps reasearchers download and process the openImages dataset.
Author: Ioannis Athanasiadis(ath.ioannis94@gmail.com)
"""
import urllib2
import csv
import os
import shutil
from subprocess import call
import cv2
PATH_TO_IMAGES_CSV = './images.csv'
PATH_TO_ANNOTATION_CSV = './annotations-human-bbox.csv'
CLASSES = ["/m/01g317", "/m/09j2d"] # you can specify the classes here directly or use the class popularity function
DESIRED_WIDTH = 640 # always greater than any width
DESIRED_HEIGHT = 640 # always greater than any height
DESIRED_SIZE = str(DESIRED_WIDTH) + "x" + str(DESIRED_HEIGHT)
DESTINATION_FOLDER = './'
GET_CLASSES_BY_POPULARITY = False
def get_class_popularity(path_to_annotation_csv):
"""
Args:
path_to_annotation_csv: path to the annotation csv file.
Returns:
A dictionary where the each key is the LabelName and its value is the corresponding
number of images containing this LabelName.
"""
classes = {}
csv_file = open(os.path.join(path_to_annotation_csv), "r")
annotations_csv_in = csv.DictReader(csv_file, delimiter=',')
for t in annotations_csv_in:
if(t["LabelName"] in classes.keys()):
classes[t["LabelName"]] += 1
else:
classes[t["LabelName"]] = 1
ret = sorted(classes.items(), key=lambda x: x[1], reverse=True)
return ret
def create_annotation(image_ids_to_download, destination_folder, path_to_annotation_csv, path_to_images_csv):
"""
Creates annotation needed for dataset to be read
and used for object detection algorithms.
Args:
image_ids_to_download: list of image ids that have been validly downloaded
destination_folder: directory where the new annotation and the downloaded images will be placed into.
path_to_annotation_csv: path to the annotation csv file.
path_to_images_csv: path to the images csv file.
"""
if(not os.path.isdir(os.path.join(destination_folder, 'train'))):
os.makedirs(os.path.join(destination_folder, 'train'))
images_csv_in = csv.DictReader(open(os.path.join(path_to_images_csv), "r"), delimiter=',')
images_csv_out = csv.DictWriter(open(os.path.join(destination_folder, "train", "images.csv"), "w"),
fieldnames=images_csv_in.fieldnames)
images_csv_out.writeheader()
for t in images_csv_in:
if(t["ImageID"] in image_ids_to_download):
images_csv_out.writerow(t)
annotations_csv_in = csv.DictReader(open(os.path.join(path_to_annotation_csv), "r"), delimiter=',')
annotations_csv_out = csv.DictWriter(open(os.path.join(destination_folder, "train", "annotations-human-bbox.csv"), "w"),
fieldnames=annotations_csv_in.fieldnames)
annotations_csv_out.writeheader()
for t in annotations_csv_in:
if(t["LabelName"] in CLASSES):
img = cv2.imread(os.path.join(destination_folder, 'Images', t["ImageID"] + ".jpg"))
height, width = img.shape[:2]
if(width < DESIRED_WIDTH and height < DESIRED_HEIGHT):
t["XMin"] = int(width * t["XMin"] + (DESIRED_WIDTH - width))
t["XMax"] = int(width * t["XMax"] + (DESIRED_WIDTH - width))
t["YMin"] = int(height * t["YMin"] + (DESIRED_HEIGHT - height))
t["YMax"] = int(height * t["YMax"] + (DESIRED_HEIGHT - height))
else:
# resize image if needed
cv2.resize(img, (DESIRED_WIDTH, DESIRED_HEIGHT))
cv2.imwrite(os.path.join(destination_folder, 'Images', t["ImageID"] + ".jpg"), img)
t["XMin"] = int(DESIRED_WIDTH * t["XMin"])
t["XMax"] = int(DESIRED_WIDTH * t["XMax"])
t["YMin"] = int(DESIRED_HEIGHT * t["YMin"])
t["YMax"] = int(DESIRED_HEIGHT * t["YMax"])
annotations_csv_out.writerow(t)
return
def image_index_to_download(path_to_annotation_csv, classes):
"""
Find all the images which contain elements of classes and
will be candidates for downloading.
Args:
path_to_annotation_csv: path to the annotation csv file.
classes: The class names that will be used for indexing.
Returns:
Candidate image indexes to be downloaded.
"""
image_ids_to_download = set([])
image_set_file = os.path.join(path_to_annotation_csv)
assert os.path.exists(image_set_file), \
'File does not exist: {}'.format(image_set_file)
with open(image_set_file, 'r') as csvfile:
csv_reader = csv.DictReader(csvfile, delimiter=',')
for t in csv_reader:
if( t["LabelName"] in classes):
image_ids_to_download.add(t["ImageID"])
return image_ids_to_download
def get_image_urls(image_ids_to_download, path_to_images_csv):
"""
Args:
image_ids_to_download: candidate image id's to download.
path_to_images_csv: path to the images csv file.
Returns:
2D array [nx2] . The first collumn contains the candidate image id's to download
and the second contains the image urls of the candidate image id's to download.
"""
image_urls_to_download = []
image_url_file = os.path.join(path_to_images_csv)
assert os.path.exists(image_url_file), \
'File does not exist: {}'.format(image_url_file)
with open(image_url_file, 'r') as csvfile:
csv_reader = csv.DictReader(csvfile, delimiter=',')
for t in csv_reader:
if(t["ImageID"] in image_ids_to_download):
if(t["Thumbnail300KURL"]):
image_urls_to_download.append([t["ImageID"], t["Thumbnail300KURL"]])
else:
image_urls_to_download.append([t["ImageID"], t["OriginalURL"]])
return image_urls_to_download
def download_images(image_urls_to_download, destination_folder):
"""
Args:
image_urls_to_download: 2D array [nx2] . The first collumn contains the candidate image id's to download
and the second contains the image urls of the candidate image id's to download.
destination_folder: directory where the new annotation and the downloaded images will be placed into.
Returns:
2D array [mx2] . The first collumn contains the candidate image id's which have been validly downloaded
and the second contains the image urls of the candidate image id's which have been validly downloaded.
"""
# create a directory
if(not os.path.isdir(os.path.join(destination_folder, 'Images'))):
os.mkdir(os.path.join(destination_folder, 'Images'))
image_urls_downloaded = []
l = len(image_urls_to_download)
i = 1
# download jpg images
for img_rec in image_urls_to_download:
print "\r", (str(i) +"/" + str(len) + " ") + img_rec[1]
try:
req = urllib2.Request(img_rec[1])
response = urllib2.urlopen(req)
if(200 == response.getcode() and response.geturl() == img_rec[1]):
with open(os.path.join(destination_folder, 'Images', img_rec[0] + '.jpg'), "wb") as output_file:
shutil.copyfileobj(response.fp, output_file)
image_urls_downloaded.append(img_rec)
except urllib2.HTTPError:
continue
except:
continue
return image_urls_downloaded
if __name__ == "__main__":
print("Starting procedure")
# get most popular classes
CLASS_POPULARITY = get_class_popularity(PATH_TO_ANNOTATION_CSV)
c = filter(lambda x: (x[1] < 1000 and x[1] > 600), CLASS_POPULARITY)[0]
CLASSES = [c]
print("classes found")
# find image ids
image_ids_to_download = image_index_to_download(PATH_TO_ANNOTATION_CSV, CLASSES)
print("ids found")
# get image url's
image_urls_to_download = get_image_urls(image_ids_to_download, PATH_TO_IMAGES_CSV)
print("image urls found")
# download all images
image_urls_downloaded = download_images(image_urls_to_download, DESTINATION_FOLDER)
print("jpg images downloaded")
# create annotation
create_annotation(image_urls_downloaded,DESTINATION_FOLDER, PATH_TO_ANNOTATION_CSV, PATH_TO_IMAGES_CSV)
print("annotation created")
# resize all downloaded images with black fillings to do training
call(['mogrify', '-extent', DESIRED_SIZE, '-gravity', 'Center', '-fill', 'black', 'Images/*.jpg'])
@supernlogn
Copy link
Author

supernlogn commented Mar 2, 2018

main currently uses class popularity to select the classes
and mogrify to resize them.
This can be changed according to user preferences.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment