Skip to content

Instantly share code, notes, and snippets.

@drivendata
Created October 18, 2017 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drivendata/70638e8a9e6a10fa020623f143259df3 to your computer and use it in GitHub Desktop.
Save drivendata/70638e8a9e6a10fa020623f143259df3 to your computer and use it in GitHub Desktop.
from itertools import cycle
import numpy as np
import os
import pandas as pd
import skvideo.io as skv
from warnings import filterwarnings
filterwarnings('ignore')
from multilabel import multilabel_train_test_split
class Dataset(object):
def __init__(self, datapath, dataset_type='nano', reduce_frames=True, val_size=0.3, batch_size=16, test=False):
self.datapath = datapath
self.dataset_type = dataset_type
self.reduce_frames = reduce_frames
self.val_size = val_size
self.batch_size = batch_size
# boolean for test mode
self.test = test
# params based on dataset type
if self.dataset_type == 'nano':
self.height = 16
self.width = 16
elif self.dataset_type == 'micro':
self.height = 64
self.width = 64
elif self.dataset_type == 'raw':
print("\nRaw videos have variable size... \nsetting height and width to None... \nfirst video in test will determine size (test must be True ")
self.height = None
self.weidth = None
else:
raise NotImplementedError("Please set dataset_type as raw, micro, or nano.")
# params based on frame reduction
if self.reduce_frames:
self.num_frames = 15
else:
self.num_frames = 30
# for tracking errors
self.bad_videos = []
# training and validation
self.X_train, self.X_val, self.y_train, self.y_val = self.split_training_into_validation()
# params of data based on training data
self.num_classes = self.y_train.shape[1]
self.class_names = self.y_train.columns.values
assert self.num_classes == self.y_val.shape[1]
self.num_samples = self.y_train.shape[0]
self.num_batches = self.num_samples // self.batch_size
# test paths and prediction matrix
self.X_test_ids, self.predictions = self.prepare_test_data_and_prediction()
# variables to make batch generating easier
self.batch_idx = cycle(range(self.num_batches))
self.batch_num = next(self.batch_idx)
self.num_val_batches = self.y_val.shape[0] // self.batch_size
self.val_batch_idx = cycle(range(self.num_val_batches))
self.val_batch_num = next(self.val_batch_idx)
self.num_test_samples = self.X_test_ids.shape[0]
self.num_test_batches = self.num_test_samples // self.batch_size
self.test_batch_idx = cycle(range(self.num_test_batches))
self.test_batch_num = next(self.test_batch_idx)
# for testing iterator in test_mode
self.train_data_seen = pd.DataFrame(data={'seen': 0}, index=self.y_train.index)
# test the generator
if test:
self._test_batch_generator()
def prepare_test_data_and_prediction(self):
"""
Returns paths to test data indexed by subject_id
and preallocates prediction dataframe.
"""
predpath = os.path.join(self.datapath, 'submission_format.csv')
predictions = pd.read_csv(predpath, index_col='filename')
test_idx = predictions.index
subjpath = os.path.join(self.datapath, self.dataset_type)
#subject_ids = pd.read_csv(subjpath, index_col=0)
subject_ids = pd.DataFrame(data=subjpath, columns=['filepath'], index=test_idx)
for row in subject_ids.itertuples():
subject_ids.loc[row.Index] = os.path.join(row.filepath, row.Index)
return test_idx, predictions
def split_training_into_validation(self):
"""
Uses the multilabel_train_test_split function
to maintain class distributions between train
and validation sets.
"""
datapath = self.datapath
dataset_type = self.dataset_type
val_size = self.val_size
# load training labels
labelpath = os.path.join(datapath, 'train_labels.csv')
labels = pd.read_csv(labelpath, index_col='filename')
# load subject labels (assumed to have same index as training labels)
subjpath = os.path.join(datapath, dataset_type)
#subject_ids = pd.read_csv(subjpath, index_col=0)
subject_ids = pd.DataFrame(data=subjpath, columns=['filepath'], index=labels.index)
for row in subject_ids.itertuples():
subject_ids.loc[row.Index] = os.path.join(row.filepath, row.Index)
# split
X_train, X_val, y_train, y_val = multilabel_train_test_split(subject_ids, labels, size=val_size, min_count=1, seed=0)
# check distribution is maintained
dist_diff = (y_train.sum()/y_train.shape[0] - y_val.sum() / y_val.shape[0]).sum()
#print(dist_diff)
assert np.isclose(dist_diff, 0, rtol=1e-04, atol=1e-02)
return X_train, X_val, y_train, y_val
def batches(self, verbose=False):
"""This method yields the next batch of videos for training."""
reduce_frames = self.reduce_frames
batch_size = self.batch_size
num_train = self.y_train.shape[0]
while 1:
# get videos
start = self.batch_size*self.batch_num
stop = self.batch_size*(self.batch_num + 1)
# print batch ranges if testing
if self.test:
print(f"batch {self.batch_num}:\t{start} --> {stop-1}")
x_paths = self.X_train.iloc[start:stop]
x, failed = self._get_video_batch(x_paths,
reduce_frames=reduce_frames,
verbose=verbose)
x_paths = x_paths.drop(failed)
self.bad_videos += failed
# get labels
y = self.y_train.iloc[start:stop]
y = y.drop(failed)
# check match for labels and videos
assert (x_paths.index==y.index).all()
assert x.shape[0] == y.shape[0]
# report failures if verbose
if len(failed) != 0 and verbose==True:
print(f"\t\t\t*** ERROR FETCHING BATCH {self.batch_num}/{self.num_batches} ***")
print(f"Dropped {len(failed)} videos:")
for failure in failed:
print(f"\t{failure}\n\n")
# increment batch number
self.batch_num = next(self.batch_idx)
# update dataframe of seen training indices for testing
self.train_data_seen.loc[y.index.values] = 1
yield (x, y)
def val_batches(self, verbose=False):
"""This method yields the next batch of videos for validation."""
reduce_frames = self.reduce_frames
batch_size = self.batch_size
num_train = self.y_train.shape[0]
while 1:
# get videos
start = self.batch_size*self.val_batch_num
stop = self.batch_size*(self.val_batch_num + 1)
x_paths = self.X_train.iloc[start:stop]
x, failed = self._get_video_batch(x_paths,
reduce_frames=reduce_frames,
verbose=verbose)
x_paths = x_paths.drop(failed)
self.bad_videos += failed
# get labels
y = self.y_train.iloc[start:stop]
y = y.drop(failed)
# check match for labels and videos
assert (x_paths.index==y.index).all()
assert x.shape[0] == y.shape[0]
# report failures if verbose
if len(failed) != 0 and verbose==True:
print(f"\t\t\t*** ERROR FETCHING BATCH {self.batch_num}/{self.num_batches} ***")
print(f"Dropped {len(failed)} videos:")
for failure in failed:
print(f"\t{failure}\n\n")
# increment batch number
self.val_batch_num = next(self.val_batch_idx)
yield (x, y)
def test_batches(self, verbose=False):
"""This method yields the next batch of videos for testing."""
reduce_frames = self.reduce_frames
batch_size = self.batch_size
num_test = self.num_test_samples
test_dir = os.path.join(self.datapath, self.dataset_type)
while 1:
# get videos
start = self.batch_size*self.test_batch_num
stop = self.batch_size*(self.test_batch_num + 1)
x_ids = self.X_test_ids[start:stop]
x_paths = pd.DataFrame(data=[os.path.join(test_dir, f"{filename}") for filename in x_ids],
columns=['filepath'],
index=x_ids)
#print(x_paths)
x, failed = self._get_video_batch(x_paths,
reduce_frames=reduce_frames,
verbose=verbose)
self.test_batch_ids = x_ids.values
# increment batch number
self.test_batch_num = next(self.test_batch_idx)
yield x
def _get_video_batch(self, x_paths, as_grey=True, reduce_frames=True, verbose=False):
"""
Returns ndarray of shape (batch_size, num_frames, width, height, channels).
If as_grey, then channels dimension is squeezed out.
"""
videos = []
failed = []
for row in x_paths.itertuples():
filepath = row.filepath
obf_id = row.Index
# load
video = skv.vread(filepath, as_grey=as_grey)
# fill video if neccessary
if video.shape[0] < self.num_frames:
video = self._fill_video(video)
# reduce
if reduce_frames:
frames = np.arange(0, video.shape[0], 2)
try:
video = video[frames, :, :] #.squeeze()
videos.append(video)
except IndexError:
if verbose:
print(f"FAILED TO REDUCE: {filepath}")
print(f"id:\t{obf_id}")
failed.append(obf_id)
return np.array(videos), failed
def _fill_video(self, video):
"""Returns a video with self.num_frames given at least one frame."""
# establish boundaries
target_num_frames = self.num_frames
num_to_fill = target_num_frames - video.shape[0]
# preallocate array for filler
filler_frames = np.zeros(shape=(num_to_fill, self.width, self.height, 1)) # assumes grey
# fill frames
source_frame = cycle(np.arange(0, video.shape[0]))
for i in range(num_to_fill):
filler_frames[i, :, :] = video[next(source_frame), :, :]
return np.concatenate((video, filler_frames), axis=0)
def _test_batch_generator(self):
print('Testing train batch generation...')
for i in range(self.num_batches):
if self.batch_num % 10 == 0:
print(f"\n\t\t\tBATCH \t{self.batch_num}/{self.num_batches}\n")
batch = self.batches(verbose=True)
x,y = next(batch)
# same batches for videos and labels
assert x.shape[0] == y.shape[0]
# square videos
assert x.shape[2] == x.shape[3]
# black and white
assert x.shape[4] == 1
# assert we've seen all data up to remainder of a batch
assert (self.y_train.shape[0] - self.train_data_seen.sum().values[0]) < self.batch_size
# check that batch_num is reset
assert self.batch_num == 0
# turn off test mode
if self.test == True:
self.test = False
print('Test passed.')
def update_predictions(self, results):
self.predictions.loc[self.test_batch_ids] = results
@simonguist
Copy link

@drivendata I think there is an error in val_batches() in lines 194 and 202:
Videos and labels should be taken from X_val and y_val and not from of X_train and y_train

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment