Skip to content

Instantly share code, notes, and snippets.

@thomasweng15
Created May 7, 2020 18:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thomasweng15/33359cb34cd5c183450626d12a824d6b to your computer and use it in GitHub Desktop.
Save thomasweng15/33359cb34cd5c183450626d12a824d6b to your computer and use it in GitHub Desktop.
Data Loader
# -*- coding: utf-8 -*-
from __future__ import print_function
from matplotlib import pyplot as plt
from PIL import Image
import numpy as np
import random
import os
import cv2
import random
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import utils
import torchvision.transforms.functional as tf
import torchvision.transforms as T
def scale_and_mask_depth_image(depth_img):
mask = np.array(np.isnan(depth_img), dtype=np.uint8)
depth_img = cv2.inpaint(depth_img, mask, 3, cv2.INPAINT_NS)
mask = np.array(depth_img == 0, dtype=np.uint8)
depth_img_out = cv2.inpaint(depth_img, mask, 3, cv2.INPAINT_NS)
return depth_img_out
class TowelDataset(Dataset):
def __init__(self, root_dir, phase, transform=None, datasize=None, pretrained=""):
self.root_dir = root_dir
self.transform = transform
self.pretrained = pretrained
print("pretrained model: %s" % self.pretrained)
def index(x):
return(int(x.split("_")[1]))
filename = os.listdir(self.root_dir)
filename = [f for f in filename if f.startswith("rgb")]
self.imgs = filename
if datasize is not None:
self.imgs = filename[0:datasize]
# print(self.imgs)
#self.imgs = sorted(filename, key = index)
if phase == 'train':
self.total_data_num = int(len(self.imgs)/6*4) if datasize != 1 else 1
elif phase == 'val':
self.total_data_num = int(len(self.imgs)/6)
elif phase == 'test':
self.total_data_num = int(len(self.imgs)/6)
self.phase = phase
print(self.total_data_num)
def __len__(self):
return self.total_data_num
def __getitem__(self, idx):
row_start = 150
row_end = 660
col_start = 415
col_end = 900
step = 2
if self.phase == 'val':
idx = idx + self.total_data_num*4
elif self.phase == 'test':
idx = idx + self.total_data_num*5
imidx = self.imgs[idx].split("_")[1].replace(".png", "")
img_path = self.root_dir + self.imgs[idx]
depth_path = self.root_dir + imidx+"_depth.npy"
direction_path_x = self.root_dir + imidx+"_direction_x.npy"
direction_path_y = self.root_dir + imidx+"_direction_y.npy"
#direction_path_x = self.root_dir + ("1")+"_direction_x.npy"
#direction_path_y = self.root_dir + ("1")+"_direction_y.npy"
# variance_path = self.root_dir + imidx +"_variance.npy"
# newvar_path = "/home/jianingq/Downloads/newvar.npy"
newvar_path = self.root_dir + imidx + "_newvar.npy"
if self.pretrained != "":
grasp_path = self.root_dir + imidx +"_grasp.npy"
#print(img_path)
#print(depth_path)
img_rgb = Image.open(img_path)
img_rgb = np.array(img_rgb)
#img_rgb = img_rgb[row_start:row_end:step, col_start:col_end:step, :3]
h, w, _ = img_rgb.shape
img_rgb = Image.fromarray(img_rgb)
depth_npy = np.load(depth_path)
#depth_npy = depth_npy[row_start:row_end:step, col_start:col_end:step]
direction_npy_x = np.load(direction_path_x).astype(np.float32)
#direction_npy_x = direction_npy_x[row_start:row_end:step, col_start:col_end:step]
direction_npy_y = np.load(direction_path_y).astype(np.float32)
#direction_npy_y = direction_npy_y[row_start:row_end:step, col_start:col_end:step]
#print(np.max(direction_npy_x))
#print(np.min(direction_npy_x))
# variance_npy = np.load(variance_path).astype(np.float32)
variance_npy = np.load(newvar_path).astype(np.float32)
variance_npy = 1.0 - variance_npy
#variance_npy = variance_npy[row_start:row_end:step, col_start:col_end:step]
# make a mask for grasp info
if self.pretrained != "":
grasp_npy = np.load(grasp_path, encoding="latin1")
px, py, _, _, reward = grasp_npy
mask_npy = np.zeros_like(depth_npy)
mask_npy[py, px] = 1
reward_npy = np.zeros_like(depth_npy)
reward_npy[py, px] = reward
"""
for num in range(5):
depth_npy = scale_and_mask_depth_image(depth_npy)
if np.sum(np.isnan(depth_npy)) == 0:
break
"""
max_d = np.nanmax(depth_npy)
depth_npy[np.isnan(depth_npy)] = max_d
max_d = np.nanmax(direction_npy_x)
direction_npy_x[np.isnan(direction_npy_x)] = max_d
max_d = np.nanmax(direction_npy_y)
direction_npy_y[np.isnan(direction_npy_y)] = max_d
#print(max_d)
#print(np.sum(np.isnan(direction_npy_y)))
#print(np.max(direction_npy_y))
max_d = np.nanmax(variance_npy)
variance_npy[np.isnan(variance_npy)] = max_d
#print(variance_npy.max())
#variance_npy /= variance_npy.max() # Normalize
assert np.sum(np.isnan(depth_npy)) == 0
img_depth = Image.fromarray(depth_npy, mode='F')
img_direction_x = Image.fromarray(direction_npy_x, mode='F')
img_direction_y = Image.fromarray(direction_npy_y, mode='F')
#print(np.max(img_direction_y))
#img_depth = img_depth.convert('RGB')
img_variance = Image.fromarray(variance_npy, mode='F')
if self.pretrained != "":
img_mask = Image.fromarray(mask_npy, mode='F')
img_reward = Image.fromarray(reward_npy, mode='F')
if self.phase == 'test':
if self.transform:
img_rgb = self.transform(img_rgb)
img_depth = self.transform(img_depth)
min_I = img_depth.min()
# min_I = self.depth_thresh
max_I = img_depth.max()
img_depth[img_depth<=min_I] = min_I
img_depth = (img_depth - min_I) / (max_I - min_I)
sample = {'rgb': img_rgb, 'X': img_depth}
else:
corners = Image.open(self.root_dir + imidx + '_labels_red.png')
corners = np.array(corners)
edges = Image.open(self.root_dir + imidx + '_labels_yellow.png')
#edges = Image.open(self.root_dir + (self.imgs[idx].split("_")[1]) + '_labels_blue.png')
edges = np.array(edges)
inner_edges = Image.open(self.root_dir + imidx + '_labels_green.png')
inner_edges = np.array(inner_edges)
#corners = corners[row_start:row_end:step, col_start:col_end:step]
corners_label = Image.fromarray(corners)
#edges = edges[row_start:row_end:step, col_start:col_end:step]
edges_label = Image.fromarray(edges)
#inner_edges = inner_edges[row_start:row_end:step, col_start:col_end:step]
inner_edges_label = Image.fromarray(inner_edges)
if self.pretrained == "":
mask_npy = np.zeros_like(edges, dtype=np.float32)
mask_npy[edges > 254] = 1.0
img_mask = Image.fromarray(mask_npy, mode='F')
if self.transform:
if random.random() > 0.5:
img_rgb = tf.hflip(img_rgb)
img_depth = tf.hflip(img_depth)
img_direction_x = tf.hflip(img_direction_x)
img_direction_y = tf.hflip(img_direction_y)
img_variance = tf.hflip(img_variance)
img_mask = tf.hflip(img_mask)
if self.pretrained != "":
img_reward = tf.hflip(img_reward)
corners_label = tf.hflip(corners_label)
edges_label = tf.hflip(edges_label)
inner_edges_label = tf.hflip(inner_edges_label)
if random.random() > 0.5:
img_rgb = tf.vflip(img_rgb)
img_depth = tf.vflip(img_depth)
img_direction_x = tf.vflip(img_direction_x)
img_direction_y = tf.vflip(img_direction_y)
img_variance = tf.vflip(img_variance)
img_mask = tf.vflip(img_mask)
if self.pretrained != "":
img_reward = tf.vflip(img_reward)
corners_label = tf.vflip(corners_label)
edges_label = tf.vflip(edges_label)
inner_edges_label = tf.vflip(inner_edges_label)
angle = T.RandomRotation.get_params([-30, 30])
img_rgb = tf.rotate(img_rgb, angle, resample=Image.NEAREST)
img_depth = tf.rotate(img_depth, angle, resample=Image.NEAREST)
img_direction_x = tf.rotate(img_direction_x, angle, resample=Image.NEAREST)
img_direction_y = tf.rotate(img_direction_y, angle, resample=Image.NEAREST)
img_variance = tf.rotate(img_variance, angle, resample=Image.NEAREST)
img_mask = tf.rotate(img_mask, angle, resample=Image.NEAREST)
if self.pretrained != "":
img_reward = tf.rotate(img_reward, angle, resample=Image.NEAREST)
corners_label = tf.rotate(corners_label, angle, resample=Image.NEAREST)
edges_label = tf.rotate(edges_label, angle, resample=Image.NEAREST)
inner_edges_label = tf.rotate(inner_edges_label, angle, resample=Image.NEAREST)
img_rgb = self.transform(img_rgb)
img_depth = self.transform(img_depth)
img_direction_x = self.transform(img_direction_x)
img_direction_y = self.transform(img_direction_y)
img_variance = self.transform(img_variance)
img_mask = self.transform(img_mask)
if self.pretrained != "":
img_reward = self.transform(img_reward)
corners_label = self.transform(corners_label)
edges_label = self.transform(edges_label)
inner_edges_label = self.transform(inner_edges_label)
else:
transform = T.Compose([T.ToTensor()])
corners_label = transform(corners_label)
edges_label = transform(edges_label)
inner_edges_label = transform(inner_edges_label)
img_direction_x = transform(img_direction_x)
img_direction_y = transform(img_direction_y)
img_variance = transform(img_variance)
img_mask = transform(img_mask)
if self.pretrained != "":
img_reward = transform(img_reward)
img_rgb = transform(img_rgb)
img_depth = transform(img_depth)
label = torch.cat((corners_label, edges_label, inner_edges_label), 0)
#label = torch.cat((corners_label, edges_label), 0)
label_direction = torch.cat((img_direction_x,img_direction_y),0)
label_variance = torch.Tensor(img_variance)
label_mask = torch.Tensor(img_mask)
if self.pretrained != "":
label_reward = torch.Tensor(img_reward)
min_I = img_depth.min()
# min_I = self.depth_thresh
max_I = img_depth.max()
img_depth[img_depth<=min_I] = min_I
img_depth = (img_depth - min_I) / (max_I - min_I)
#print(torch.max(label_direction))
# weights
weights = label * 49 + 1
if self.pretrained != "":
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'direction' : label_direction, 'variance' : label_variance, 'mask': label_mask, 'reward': label_reward, 'w': weights}
else:
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'direction' : label_direction, 'variance' : label_variance, 'mask': label_mask, 'w': weights}
return sample
class ShirtDataset(Dataset):
def __init__(self, root_dir, phase, transform=None):
self.root_dir = root_dir
self.transform = transform
if phase == 'train':
self.total_data_num = 3664
elif phase == 'val':
self.total_data_num = 628
elif phase == 'test':
self.total_data_num = 333
self.phase = phase
print(self.total_data_num)
def __len__(self):
return self.total_data_num
def __getitem__(self, idx):
row_start = 310
row_end = 710
col_start = 220
col_end = 920
img_path = self.root_dir + str(idx) + '/rgb_0.jpg'
depth_path = self.root_dir + str(idx) + '/depth.npy'
img_rgb = Image.open(img_path)
img_rgb = np.array(img_rgb)
img_rgb = img_rgb[row_start:row_end, col_start:col_end, :]
h, w, _ = img_rgb.shape
img_rgb = Image.fromarray(img_rgb)
depth_npy = np.load(depth_path)
depth_npy = depth_npy[row_start:row_end, col_start:col_end]
"""
for num in range(5):
depth_npy = scale_and_mask_depth_image(depth_npy)
if np.sum(np.isnan(depth_npy)) == 0:
break
"""
max_d = np.nanmax(depth_npy)
depth_npy[np.isnan(depth_npy)] = max_d
assert np.sum(np.isnan(depth_npy)) == 0
img_depth = Image.fromarray(depth_npy, mode='F')
img_depth = img_depth.convert('RGB')
if self.phase == 'test':
if self.transform:
img_rgb = self.transform(img_rgb)
img_depth = self.transform(img_depth)
min_I = img_depth.min()
max_I = img_depth.max()
img_depth = (img_depth - min_I) / (max_I - min_I)
sample = {'rgb': img_rgb, 'X': img_depth}
else:
corners = Image.open(self.root_dir + str(idx) + '/corner_labels.png')
corners = np.array(corners)
edges = Image.open(self.root_dir + str(idx) + '/edge_labels.png')
edges = np.array(edges)
sleeves = Image.open(self.root_dir + str(idx) + '/sleeve_labels.png')
sleeves = np.array(sleeves)
shoulders = Image.open(self.root_dir + str(idx) + '/shoulder_labels.png')
shoulders = np.array(shoulders)
collars = Image.open(self.root_dir + str(idx) + '/collar_labels.png')
collars = np.array(collars)
corners = corners[row_start:row_end:step, col_start:col_end:step]
corners_label = Image.fromarray(corners)
edges = edges[row_start:row_end:step, col_start:col_end:step]
edges_label = Image.fromarray(edges)
sleeves = sleeves[row_start:row_end:step, col_start:col_end:step]
sleeves_label = Image.fromarray(sleeves)
shoulders = shoulders[row_start:row_end:step, col_start:col_end:step]
shoulders_label = Image.fromarray(shoulders)
collars = collars[row_start:row_end:step, col_start:col_end:step]
collars_label = Image.fromarray(collars)
if self.transform:
img_rgb = self.transform(img_rgb)
img_depth = self.transform(img_depth)
corners_label = self.transform(corners_label)
edges_label = self.transform(edges_label)
sleeves_label = self.transform(sleeves_label)
shoulders_label = self.transform(shoulders_label)
collars_label = self.transform(collars_label)
label = torch.cat((corners_label, edges_label, sleeves_label, shoulders_label, collars_label), 0)
min_I = img_depth.min()
max_I = img_depth.max()
img_depth = (img_depth - min_I) / (max_I - min_I)
# weights
weights = label * 49 + 1
bdbox = np.load(self.root_dir+"shirt_bdbox/" + str(idx) + "bdbox.npy").astype(np.float32)
bdbox[0] = bdbox[0] / w
bdbox[1] = bdbox[1] / h
bdbox[2] = bdbox[2] / w
bdbox[3] = bdbox[3] / h
sample = {'rgb': img_rgb, 'X': img_depth, 'Y': label, 'w': weights, 'bdbox': bdbox}
return sample
def show_batch(batch):
img_batch = batch['X']
batch_size = len(img_batch)
grid = utils.make_grid(img_batch)
plt.imshow(grid.numpy()[::-1].transpose((1, 2, 0)))
plt.title('Batch from dataloader')
if __name__ == "__main__":
train_transform = T.Compose([
T.RandomAffine(180),
T.RandomHorizontalFlip(),
T.RandomVerticalFlip(),
T.ToTensor()])
train_data = TowerDataset(root_dir="../towel_video_val/", phase='val', transform=train_transform)
# show a batch
batch_size = 1
for i in range(batch_size):
sample = train_data[i]
print(i, sample['X'].size())
print(sample['X'].max(), sample['X'].min(), sample['X'].type())
print(sample['Y'].max(), sample['Y'].min(), sample['Y'].type())
print(sample['rgb'].max(), sample['rgb'].min(), sample['rgb'].type())
print(sample['w'].max(), sample['w'].min(), sample['w'].type())
a = sample['Y'].numpy()
for i in range(a.shape[0]):
for j in range(a.shape[1]):
for k in range(a.shape[2]):
if a[i,j,k] != 0 and a[i,j,k] != 1:
print(a[i,j,k])
dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=False, num_workers=4)
for i, batch in enumerate(dataloader):
print(i, batch['X'].size())
# observe 4th batch
if i == 0:
plt.figure()
show_batch(batch)
plt.axis('off')
plt.ioff()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment