Created
May 7, 2020 18:34
-
-
Save thomasweng15/33359cb34cd5c183450626d12a824d6b to your computer and use it in GitHub Desktop.
Data Loader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import print_function | |
from matplotlib import pyplot as plt | |
from PIL import Image | |
import numpy as np | |
import random | |
import os | |
import cv2 | |
import random | |
import torch | |
from torch.utils.data import Dataset, DataLoader | |
from torchvision import utils | |
import torchvision.transforms.functional as tf | |
import torchvision.transforms as T | |
def scale_and_mask_depth_image(depth_img): | |
mask = np.array(np.isnan(depth_img), dtype=np.uint8) | |
depth_img = cv2.inpaint(depth_img, mask, 3, cv2.INPAINT_NS) | |
mask = np.array(depth_img == 0, dtype=np.uint8) | |
depth_img_out = cv2.inpaint(depth_img, mask, 3, cv2.INPAINT_NS) | |
return depth_img_out | |
class TowelDataset(Dataset): | |
def __init__(self, root_dir, phase, transform=None, datasize=None, pretrained=""): | |
self.root_dir = root_dir | |
self.transform = transform | |
self.pretrained = pretrained | |
print("pretrained model: %s" % self.pretrained) | |
def index(x): | |
return(int(x.split("_")[1])) | |
filename = os.listdir(self.root_dir) | |
filename = [f for f in filename if f.startswith("rgb")] | |
self.imgs = filename | |
if datasize is not None: | |
self.imgs = filename[0:datasize] | |
# print(self.imgs) | |
#self.imgs = sorted(filename, key = index) | |
if phase == 'train': | |
self.total_data_num = int(len(self.imgs)/6*4) if datasize != 1 else 1 | |
elif phase == 'val': | |
self.total_data_num = int(len(self.imgs)/6) | |
elif phase == 'test': | |
self.total_data_num = int(len(self.imgs)/6) | |
self.phase = phase | |
print(self.total_data_num) | |
def __len__(self): | |
return self.total_data_num | |
def __getitem__(self, idx): | |
row_start = 150 | |
row_end = 660 | |
col_start = 415 | |
col_end = 900 | |
step = 2 | |
if self.phase == 'val': | |
idx = idx + self.total_data_num*4 | |
elif self.phase == 'test': | |
idx = idx + self.total_data_num*5 | |
imidx = self.imgs[idx].split("_")[1].replace(".png", "") | |
img_path = self.root_dir + self.imgs[idx] | |
depth_path = self.root_dir + imidx+"_depth.npy" | |
direction_path_x = self.root_dir + imidx+"_direction_x.npy" | |
direction_path_y = self.root_dir + imidx+"_direction_y.npy" | |
#direction_path_x = self.root_dir + ("1")+"_direction_x.npy" | |
#direction_path_y = self.root_dir + ("1")+"_direction_y.npy" | |
# variance_path = self.root_dir + imidx +"_variance.npy" | |
# newvar_path = "/home/jianingq/Downloads/newvar.npy" | |
newvar_path = self.root_dir + imidx + "_newvar.npy" | |
if self.pretrained != "": | |
grasp_path = self.root_dir + imidx +"_grasp.npy" | |
#print(img_path) | |
#print(depth_path) | |
img_rgb = Image.open(img_path) | |
img_rgb = np.array(img_rgb) | |
#img_rgb = img_rgb[row_start:row_end:step, col_start:col_end:step, :3] | |
h, w, _ = img_rgb.shape | |
img_rgb = Image.fromarray(img_rgb) | |
depth_npy = np.load(depth_path) | |
#depth_npy = depth_npy[row_start:row_end:step, col_start:col_end:step] | |
direction_npy_x = np.load(direction_path_x).astype(np.float32) | |
#direction_npy_x = direction_npy_x[row_start:row_end:step, col_start:col_end:step] | |
direction_npy_y = np.load(direction_path_y).astype(np.float32) | |
#direction_npy_y = direction_npy_y[row_start:row_end:step, col_start:col_end:step] | |
#print(np.max(direction_npy_x)) | |
#print(np.min(direction_npy_x)) | |
# variance_npy = np.load(variance_path).astype(np.float32) | |
variance_npy = np.load(newvar_path).astype(np.float32) | |
variance_npy = 1.0 - variance_npy | |
#variance_npy = variance_npy[row_start:row_end:step, col_start:col_end:step] | |
# make a mask for grasp info | |
if self.pretrained != "": | |
grasp_npy = np.load(grasp_path, encoding="latin1") | |
px, py, _, _, reward = grasp_npy | |
mask_npy = np.zeros_like(depth_npy) | |
mask_npy[py, px] = 1 | |
reward_npy = np.zeros_like(depth_npy) | |
reward_npy[py, px] = reward | |
""" | |
for num in range(5): | |
depth_npy = scale_and_mask_depth_image(depth_npy) | |
if np.sum(np.isnan(depth_npy)) == 0: | |
break | |
""" | |
max_d = np.nanmax(depth_npy) | |
depth_npy[np.isnan(depth_npy)] = max_d | |
max_d = np.nanmax(direction_npy_x) | |
direction_npy_x[np.isnan(direction_npy_x)] = max_d | |
max_d = np.nanmax(direction_npy_y) | |
direction_npy_y[np.isnan(direction_npy_y)] = max_d | |
#print(max_d) | |
#print(np.sum(np.isnan(direction_npy_y))) | |
#print(np.max(direction_npy_y)) | |
max_d = np.nanmax(variance_npy) | |
variance_npy[np.isnan(variance_npy)] = max_d | |
#print(variance_npy.max()) | |
#variance_npy /= variance_npy.max() # Normalize | |
assert np.sum(np.isnan(depth_npy)) == 0 | |
img_depth = Image.fromarray(depth_npy, mode='F') | |
img_direction_x = Image.fromarray(direction_npy_x, mode='F') | |
img_direction_y = Image.fromarray(direction_npy_y, mode='F') | |
#print(np.max(img_direction_y)) | |
#img_depth = img_depth.convert('RGB') | |
img_variance = Image.fromarray(variance_npy, mode='F') | |
if self.pretrained != "": | |
img_mask = Image.fromarray(mask_npy, mode='F') | |
img_reward = Image.fromarray(reward_npy, mode='F') | |
if self.phase == 'test': | |
if self.transform: | |
img_rgb = self.transform(img_rgb) | |
img_depth = self.transform(img_depth) | |
min_I = img_depth.min() | |
# min_I = self.depth_thresh | |
max_I = img_depth.max() | |
img_depth[img_depth<=min_I] = min_I | |
img_depth = (img_depth - min_I) / (max_I - min_I) | |
sample = {'rgb': img_rgb, 'X': img_depth} | |
else: | |
corners = Image.open(self.root_dir + imidx + '_labels_red.png') | |
corners = np.array(corners) | |
edges = Image.open(self.root_dir + imidx + '_labels_yellow.png') | |
#edges = Image.open(self.root_dir + (self.imgs[idx].split("_")[1]) + '_labels_blue.png') | |
edges = np.array(edges) | |
inner_edges = Image.open(self.root_dir + imidx + '_labels_green.png') | |
inner_edges = np.array(inner_edges) | |
#corners = corners[row_start:row_end:step, col_start:col_end:step] | |
corners_label = Image.fromarray(corners) | |
#edges = edges[row_start:row_end:step, col_start:col_end:step] | |
edges_label = Image.fromarray(edges) | |
#inner_edges = inner_edges[row_start:row_end:step, col_start:col_end:step] | |
inner_edges_label = Image.fromarray(inner_edges) | |
if self.pretrained == "": | |
mask_npy = np.zeros_like(edges, dtype=np.float32) | |
mask_npy[edges > 254] = 1.0 | |
img_mask = Image.fromarray(mask_npy, mode='F') | |
if self.transform: | |
if random.random() > 0.5: | |
img_rgb = tf.hflip(img_rgb) | |
img_depth = tf.hflip(img_depth) | |
img_direction_x = tf.hflip(img_direction_x) | |
img_direction_y = tf.hflip(img_direction_y) | |
img_variance = tf.hflip(img_variance) | |
img_mask = tf.hflip(img_mask) | |
if self.pretrained != "": | |
img_reward = tf.hflip(img_reward) | |
corners_label = tf.hflip(corners_label) | |
edges_label = tf.hflip(edges_label) | |
inner_edges_label = tf.hflip(inner_edges_label) | |
if random.random() > 0.5: | |
img_rgb = tf.vflip(img_rgb) | |
img_depth = tf.vflip(img_depth) | |
img_direction_x = tf.vflip(img_direction_x) | |
img_direction_y = tf.vflip(img_direction_y) | |
img_variance = tf.vflip(img_variance) | |
img_mask = tf.vflip(img_mask) | |
if self.pretrained != "": | |
img_reward = tf.vflip(img_reward) | |
corners_label = tf.vflip(corners_label) | |
edges_label = tf.vflip(edges_label) | |
inner_edges_label = tf.vflip(inner_edges_label) | |
angle = T.RandomRotation.get_params([-30, 30]) | |
img_rgb = tf.rotate(img_rgb, angle, resample=Image.NEAREST) | |
img_depth = tf.rotate(img_depth, angle, resample=Image.NEAREST) | |
img_direction_x = tf.rotate(img_direction_x, angle, resample=Image.NEAREST) | |
img_direction_y = tf.rotate(img_direction_y, angle, resample=Image.NEAREST) | |
img_variance = tf.rotate(img_variance, angle, resample=Image.NEAREST) | |
img_mask = tf.rotate(img_mask, angle, resample=Image.NEAREST) | |
if self.pretrained != "": | |
img_reward = tf.rotate(img_reward, angle, resample=Image.NEAREST) | |
corners_label = tf.rotate(corners_label, angle, resample=Image.NEAREST) | |
edges_label = tf.rotate(edges_label, angle, resample=Image.NEAREST) | |
inner_edges_label = tf.rotate(inner_edges_label, angle, resample=Image.NEAREST) | |
img_rgb = self.transform(img_rgb) | |
img_depth = self.transform(img_depth) | |
img_direction_x = self.transform(img_direction_x) | |
img_direction_y = self.transform(img_direction_y) | |
img_variance = self.transform(img_variance) | |
img_mask = self.transform(img_mask) | |
if self.pretrained != "": | |
img_reward = self.transform(img_reward) | |
corners_label = self.transform(corners_label) | |
edges_label = self.transform(edges_label) | |
inner_edges_label = self.transform(inner_edges_label) | |
else: | |
transform = T.Compose([T.ToTensor()]) | |
corners_label = transform(corners_label) | |
edges_label = transform(edges_label) | |
inner_edges_label = transform(inner_edges_label) | |
img_direction_x = transform(img_direction_x) | |
img_direction_y = transform(img_direction_y) | |
img_variance = transform(img_variance) | |
img_mask = transform(img_mask) | |
if self.pretrained != "": | |
img_reward = transform(img_reward) | |
img_rgb = transform(img_rgb) | |
img_depth = transform(img_depth) | |
label = torch.cat((corners_label, edges_label, inner_edges_label), 0) | |
#label = torch.cat((corners_label, edges_label), 0) | |
label_direction = torch.cat((img_direction_x,img_direction_y),0) | |
label_variance = torch.Tensor(img_variance) | |
label_mask = torch.Tensor(img_mask) | |
if self.pretrained != "": | |
label_reward = torch.Tensor(img_reward) | |
min_I = img_depth.min() | |
# min_I = self.depth_thresh | |
max_I = img_depth.max() | |
img_depth[img_depth<=min_I] = min_I | |
img_depth = (img_depth - min_I) / (max_I - min_I) | |
#print(torch.max(label_direction)) | |
# weights | |
weights = label * 49 + 1 | |
if self.pretrained != "": | |
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'direction' : label_direction, 'variance' : label_variance, 'mask': label_mask, 'reward': label_reward, 'w': weights} | |
else: | |
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'direction' : label_direction, 'variance' : label_variance, 'mask': label_mask, 'w': weights} | |
return sample | |
class ShirtDataset(Dataset): | |
def __init__(self, root_dir, phase, transform=None): | |
self.root_dir = root_dir | |
self.transform = transform | |
if phase == 'train': | |
self.total_data_num = 3664 | |
elif phase == 'val': | |
self.total_data_num = 628 | |
elif phase == 'test': | |
self.total_data_num = 333 | |
self.phase = phase | |
print(self.total_data_num) | |
def __len__(self): | |
return self.total_data_num | |
def __getitem__(self, idx): | |
row_start = 310 | |
row_end = 710 | |
col_start = 220 | |
col_end = 920 | |
img_path = self.root_dir + str(idx) + '/rgb_0.jpg' | |
depth_path = self.root_dir + str(idx) + '/depth.npy' | |
img_rgb = Image.open(img_path) | |
img_rgb = np.array(img_rgb) | |
img_rgb = img_rgb[row_start:row_end, col_start:col_end, :] | |
h, w, _ = img_rgb.shape | |
img_rgb = Image.fromarray(img_rgb) | |
depth_npy = np.load(depth_path) | |
depth_npy = depth_npy[row_start:row_end, col_start:col_end] | |
""" | |
for num in range(5): | |
depth_npy = scale_and_mask_depth_image(depth_npy) | |
if np.sum(np.isnan(depth_npy)) == 0: | |
break | |
""" | |
max_d = np.nanmax(depth_npy) | |
depth_npy[np.isnan(depth_npy)] = max_d | |
assert np.sum(np.isnan(depth_npy)) == 0 | |
img_depth = Image.fromarray(depth_npy, mode='F') | |
img_depth = img_depth.convert('RGB') | |
if self.phase == 'test': | |
if self.transform: | |
img_rgb = self.transform(img_rgb) | |
img_depth = self.transform(img_depth) | |
min_I = img_depth.min() | |
max_I = img_depth.max() | |
img_depth = (img_depth - min_I) / (max_I - min_I) | |
sample = {'rgb': img_rgb, 'X': img_depth} | |
else: | |
corners = Image.open(self.root_dir + str(idx) + '/corner_labels.png') | |
corners = np.array(corners) | |
edges = Image.open(self.root_dir + str(idx) + '/edge_labels.png') | |
edges = np.array(edges) | |
sleeves = Image.open(self.root_dir + str(idx) + '/sleeve_labels.png') | |
sleeves = np.array(sleeves) | |
shoulders = Image.open(self.root_dir + str(idx) + '/shoulder_labels.png') | |
shoulders = np.array(shoulders) | |
collars = Image.open(self.root_dir + str(idx) + '/collar_labels.png') | |
collars = np.array(collars) | |
corners = corners[row_start:row_end:step, col_start:col_end:step] | |
corners_label = Image.fromarray(corners) | |
edges = edges[row_start:row_end:step, col_start:col_end:step] | |
edges_label = Image.fromarray(edges) | |
sleeves = sleeves[row_start:row_end:step, col_start:col_end:step] | |
sleeves_label = Image.fromarray(sleeves) | |
shoulders = shoulders[row_start:row_end:step, col_start:col_end:step] | |
shoulders_label = Image.fromarray(shoulders) | |
collars = collars[row_start:row_end:step, col_start:col_end:step] | |
collars_label = Image.fromarray(collars) | |
if self.transform: | |
img_rgb = self.transform(img_rgb) | |
img_depth = self.transform(img_depth) | |
corners_label = self.transform(corners_label) | |
edges_label = self.transform(edges_label) | |
sleeves_label = self.transform(sleeves_label) | |
shoulders_label = self.transform(shoulders_label) | |
collars_label = self.transform(collars_label) | |
label = torch.cat((corners_label, edges_label, sleeves_label, shoulders_label, collars_label), 0) | |
min_I = img_depth.min() | |
max_I = img_depth.max() | |
img_depth = (img_depth - min_I) / (max_I - min_I) | |
# weights | |
weights = label * 49 + 1 | |
bdbox = np.load(self.root_dir+"shirt_bdbox/" + str(idx) + "bdbox.npy").astype(np.float32) | |
bdbox[0] = bdbox[0] / w | |
bdbox[1] = bdbox[1] / h | |
bdbox[2] = bdbox[2] / w | |
bdbox[3] = bdbox[3] / h | |
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'w': weights, 'bdbox': bdbox} | |
return sample | |
def show_batch(batch): | |
img_batch = batch['X'] | |
batch_size = len(img_batch) | |
grid = utils.make_grid(img_batch) | |
plt.imshow(grid.numpy()[::-1].transpose((1, 2, 0))) | |
plt.title('Batch from dataloader') | |
if __name__ == "__main__": | |
train_transform = T.Compose([ | |
T.RandomAffine(180), | |
T.RandomHorizontalFlip(), | |
T.RandomVerticalFlip(), | |
T.ToTensor()]) | |
train_data = TowerDataset(root_dir="../towel_video_val/", phase='val', transform=train_transform) | |
# show a batch | |
batch_size = 1 | |
for i in range(batch_size): | |
sample = train_data[i] | |
print(i, sample['X'].size()) | |
print(sample['X'].max(), sample['X'].min(), sample['X'].type()) | |
print(sample['Y'].max(), sample['Y'].min(), sample['Y'].type()) | |
print(sample['rgb'].max(), sample['rgb'].min(), sample['rgb'].type()) | |
print(sample['w'].max(), sample['w'].min(), sample['w'].type()) | |
a = sample['Y'].numpy() | |
for i in range(a.shape[0]): | |
for j in range(a.shape[1]): | |
for k in range(a.shape[2]): | |
if a[i,j,k] != 0 and a[i,j,k] != 1: | |
print(a[i,j,k]) | |
dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=False, num_workers=4) | |
for i, batch in enumerate(dataloader): | |
print(i, batch['X'].size()) | |
# observe 4th batch | |
if i == 0: | |
plt.figure() | |
show_batch(batch) | |
plt.axis('off') | |
plt.ioff() | |
plt.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment