Last active
June 9, 2022 09:17
-
-
Save Muosvr/2d2275f2e1543fead19f1da035d6d27c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Converts angle from a float number to binary code for the neural net classifier (also known as one hot encoding) | |
# In this case it is divided into 15 brackets, represented by an array of 14 zeros and 1 one. | |
# The location or index of the one amount the zeros indicate where the float falls in that range. | |
def to_bin(a): | |
arr = np.zeros(15) | |
a = a + 1 | |
b = round(a/(2/14)) | |
arr[int(b)] = 1 | |
return arr | |
# Retrieve images and convert into numpy array for the neural net to be used a input | |
def read_record(record_dict, path = PATH): | |
data = {} | |
for key, val in record_dict.items(): | |
if key == 'cam/image_array': | |
img = Image.open(f'{val}') | |
val = np.array(img) | |
data[key] = val | |
return data | |
# shuffle the records and transform it if a tranfrom funtion is passed in | |
def get_record_gen(record_transform=None, shuffle=True, df=None): | |
while True: | |
for _ in df.iterrows(): | |
if shuffle: | |
record_dict = df.sample(n=1).to_dict(orient='record')[0] | |
record_dict = read_record(record_dict) | |
if record_transform: | |
record_dict = record_transform(record_dict) | |
yield record_dict | |
# Create a batch of data dependent of batch_size because neural net trains in batches | |
def get_batch_gen(keys=None, batch_size=128, record_transform=None, shuffle=True, df=None): | |
record_gen = get_record_gen(record_transform=record_transform, shuffle=shuffle, df=df) | |
while True: | |
record_list = [next(record_gen) for _ in range(batch_size)] | |
batch_arrays = {} | |
for i, k in enumerate(keys): | |
arr = np.array([r[k] for r in record_list]) | |
batch_arrays[k] = arr | |
yield batch_arrays | |
# Create training set using the above batch function | |
def get_train_gen(X_keys, Y_keys, batch_size=128, record_transform=None, df=None): | |
batch_gen = get_batch_gen(X_keys + Y_keys, batch_size=batch_size, | |
record_transform=record_transform, df=df) | |
while True: | |
batch = next(batch_gen) | |
X = [batch[k] for k in X_keys] | |
Y = [batch[k] for k in Y_keys] | |
yield X, Y | |
# Create a validation set from the training set | |
def get_train_val_gen(df, X_keys, Y_keys, batch_size = 128, train_frac=0.8, | |
train_record_transform=None, val_record_transform=None): | |
train_df = df.sample(frac=train_frac, random_state=200) | |
val_df = df.drop(train_df.index) | |
train_gen = get_train_gen(X_keys=X_keys, Y_keys=Y_keys, batch_size=batch_size, | |
record_transform=train_record_transform, df=train_df) | |
val_gen = get_train_gen(X_keys=X_keys, Y_keys=Y_keys, batch_size=batch_size, | |
record_transform=val_record_transform, df=val_df) | |
return train_gen, val_gen | |
# By default only user/angle data is transfromed into binary, the user/throttle data is left as a single float number | |
# So the neural net actually have both classification and regression output | |
def record_transform(record): | |
record['user/angle'] = to_bin(record['user/angle']) | |
return record | |
#retrieve data from the denerator. They are what got passed into the neural net | |
train_gen, val_gen = get_train_val_gen(df, X_keys, y_keys, | |
train_record_transform=record_transform, # | |
val_record_transform=record_transform, # | |
batch_size=bs, # | |
train_frac=train_frac) # |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment