Skip to content

Instantly share code, notes, and snippets.

@orwa-te
Created August 20, 2020 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orwa-te/53b5ca83e36b43ef704aae53ecb751dd to your computer and use it in GitHub Desktop.
Save orwa-te/53b5ca83e36b43ef704aae53ecb751dd to your computer and use it in GitHub Desktop.
TFOS code to train my model on Spark
# Adapted from: https://www.tensorflow.org/beta/tutorials/distribute/multi_worker_with_keras
from __future__ import absolute_import, division, print_function, unicode_literals
def main_fun(args, ctx):
import tensorflow as tf
import numpy as np
import imagecodecs
#My params
work_images=2
import time
start_time=time.time()
def build_unet_model(shape = (None,None,4)):
from tensorflow.keras import Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D,MaxPooling2D, UpSampling2D,BatchNormalization,Dropout,concatenate
from tensorflow.keras.optimizers import Adam
# import tensorflow as tf
# from keras.callbacks import ModelCheckpoint, LearningRateScheduler
# from keras.preprocessing.image import ImageDataGenerator
# from keras import backend as keras
# Left side of the U-Net
inputs = Input(shape)
# in_shape = inputs.shape
# print(in_shape)
conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(inputs)
conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv1)
conv1 = BatchNormalization()(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool1)
conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv2)
conv2 = BatchNormalization()(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool2)
conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv3)
conv3 = BatchNormalization()(conv3)
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool3)
conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv4)
conv4 = BatchNormalization()(conv4)
drop4 = Dropout(0.5)(conv4)
pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)
# Bottom of the U-Net
conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(pool4)
conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv5)
conv5 = BatchNormalization()(conv5)
drop5 = Dropout(0.5)(conv5)
# Upsampling Starts, right side of the U-Net
up6 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(drop5))
merge6 = concatenate([drop4,up6], axis = 3)
conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge6)
conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv6)
conv6 = BatchNormalization()(conv6)
up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv6))
merge7 = concatenate([conv3,up7], axis = 3)
conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge7)
conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv7)
conv7 = BatchNormalization()(conv7)
up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv7))
merge8 = concatenate([conv2,up8], axis = 3)
conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge8)
conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv8)
conv8 = BatchNormalization()(conv8)
up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(UpSampling2D(size = (2,2))(conv8))
merge9 = concatenate([conv1,up9], axis = 3)
conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(merge9)
conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv9)
conv9 = Conv2D(16, 3, activation = 'relu', padding = 'same', kernel_initializer = 'random_normal')(conv9)
conv9 = BatchNormalization()(conv9)
# Output layer of the U-Net with a softmax activation
conv10 = Conv2D(9, 1, activation = 'softmax')(conv9)
model = Model(inputs = inputs, outputs = conv10)
model.compile(optimizer = Adam(lr = 0.0001), loss = 'categorical_crossentropy', metrics = ['accuracy'])
return model
# Resizing the image to nearest dimensions multipls of 'stride'
def resize(img, stride, n_h, n_w):
#h,l,_ = img.shape
ne_h = (n_h*stride) + stride
ne_w = (n_w*stride) + stride
# img_resized = imresize(img, (ne_h,ne_w))
img_resized = img.reshape(ne_h,ne_w)
return img_resized
# Padding at the bottem and at the left of images to be able to crop them into 128*128 images for training
def padding(img, w, h, c, crop_size, stride, n_h, n_w):
w_extra = w - ((n_w-1)*stride)
w_toadd = crop_size - w_extra
h_extra = h - ((n_h-1)*stride)
h_toadd = crop_size - h_extra
img_pad = np.zeros(((h+h_toadd), (w+w_toadd), c))
#img_pad[:h, :w,:] = img
#img_pad = img_pad+img
img_pad = np.pad(img, [(0, h_toadd), (0, w_toadd), (0,0)], mode='constant')
return img_pad
# Adding pixels to make the image with shape in multiples of stride
def add_pixals(img, h, w, c, n_h, n_w, crop_size, stride):
w_extra = w - ((n_w-1)*stride)
w_toadd = crop_size - w_extra
h_extra = h - ((n_h-1)*stride)
h_toadd = crop_size - h_extra
img_add = np.zeros(((h+h_toadd), (w+w_toadd), c))
img_add[:h, :w,:] = img
img_add[h:, :w,:] = img[:h_toadd,:, :]
img_add[:h,w:,:] = img[:,:w_toadd,:]
img_add[h:,w:,:] = img[h-h_toadd:h,w-w_toadd:w,:]
return img_add
# Slicing the image into crop_size*crop_size crops with a stride of crop_size/2 and makking list out of them
def crops(a, crop_size = 128):
#stride = int(crop_size/2)
stride = 48
croped_images = []
h, w, c = a.shape
n_h = int(int(h/stride))
n_w = int(int(w/stride))
# Padding using the padding function we wrote
a = padding(a, w, h, c, crop_size, stride, n_h, n_w)
# Resizing as required
##a = resize(a, stride, n_h, n_w)
# Adding pixals as required
#a = add_pixals(a, h, w, c, n_h, n_w, crop_size, stride)
# Slicing the image into 128*128 crops with a stride of 64
for i in range(n_h-1):
for j in range(n_w-1):
crop_x = a[(i*stride):((i*stride)+crop_size), (j*stride):((j*stride)+crop_size), :]
croped_images.append(crop_x)
return croped_images
def get_images_paths(parent_path, is_label):
sub_path='gt'
if(is_label==False):
sub_path='sat'
paths=[]
for i in range(work_images):
paths.append(parent_path+'/'+sub_path+'/'+str(i+1)+'.tif')
return paths
def read_tif_image_from_hdfs(path, imagecodecs):
from pyarrow import hdfs
connect = hdfs.connect(host='master',port=9000)
img_file = connect.open(path, mode='rb')
img_bytes = img_file.read()
numpy_img = imagecodecs.tiff_decode(img_bytes)
return numpy_img
color_dict = {0: (0, 0, 0),
1: (0, 125, 0),
2: (150, 80, 0),
3: (255, 255, 0),
4: (100, 100, 100),
5: (0, 255, 0),
6: (0, 0, 150),
7: (150, 150, 255),
8: (255, 255, 255)}
def rgb_to_onehot(rgb_arr, color_dict):
num_classes = len(color_dict)
shape = rgb_arr.shape[:2]+(num_classes,)
#print(shape)
arr = np.zeros( shape, dtype=np.int8 )
for i, cls in enumerate(color_dict):
arr[:,:,i] = np.all(rgb_arr.reshape( (-1,3) ) == color_dict[i], axis=1).reshape(shape[:2])
return arr
def onehot_to_rgb(onehot, color_dict):
single_layer = np.argmax(onehot, axis=-1)
output = np.zeros( onehot.shape[:2]+(3,) )
for k in color_dict.keys():
output[single_layer==k] = color_dict[k]
return np.uint8(output)
hdfs_path='/Inter-IIT-CSRE/The-Eye-in-the-Sky-dataset'
filelist_trainx = get_images_paths(hdfs_path, False)
filelist_trainy = get_images_paths(hdfs_path, True)
# Reading, padding, cropping and making array of all the cropped images of all the trainig sat images
trainx_list = []
for fname in filelist_trainx[:work_images]:
# Reading the image
image = read_tif_image_from_hdfs(fname, imagecodecs)
# Padding as required and cropping
crops_list = crops(image)
#print(len(crops_list))
trainx_list = trainx_list + crops_list
# Array of all the cropped Training sat Images
trainx = np.asarray(trainx_list)
#print("size="+str(len(trainx)))
# Reading, padding, cropping and making array of all the cropped images of all the trainig gt images
trainy_list = []
for fname in filelist_trainy[:work_images]:
# Reading the image
image = read_tif_image_from_hdfs(fname, imagecodecs)
# Padding as required and cropping
crops_list =crops(image)
trainy_list = trainy_list + crops_list
# Array of all the cropped Training gt Images
trainy = np.asarray(trainy_list)
# Convert trainy and testy into one hot encode
trainy_hot = []
for i in range(trainy.shape[0]):
hot_img = rgb_to_onehot(trainy[i], color_dict)
trainy_hot.append(hot_img)
trainy_hot = np.asarray(trainy_hot)
#Use only required data
import math
#deduct extra examples to fit generator
int_num_batches = math.floor(trainx.shape[0]/args.batch_size)
trainx = trainx[:int_num_batches*args.batch_size]
trainy_hot = trainy_hot[:int_num_batches*args.batch_size]
num_training_examples = trainx.shape[0]
#scale input data
trainx = trainx.astype(np.float16) / np.max(trainx)
import os
import json
os.environ["TF_CONFIG"] = json.dumps({"cluster": {"worker": ["192.168.198.131:33035"]},"task": {"type": "worker", "index": 0}})
#print("\ntf_param="+os.environ["TF_CONFIG"]+"\n")
#Create distribute strategy
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
#convert numpy data to tensorflow dataset
ds = tf.data.Dataset.from_tensor_slices((trainx, trainy_hot))
#set options for sharding policy
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
ds = ds.with_options(options).shuffle(num_training_examples)
ds=ds.batch(args.batch_size)
#Delete unused data
del trainx
del trainy
del trainy_hot
del trainx_list
del trainy_list
import gc
gc.collect()
steps_per_epoch = num_training_examples/args.batch_size
print("training "+str(num_training_examples)+" examples....\n batch_size="+str(args.batch_size)+"\n steps_per_epoch="+str(steps_per_epoch))
with strategy.scope():
multi_worker_model = build_unet_model()
history = multi_worker_model.fit(ds, epochs=args.epochs, verbose=1, steps_per_epoch=steps_per_epoch)
multi_worker_model.save("my_model.h5")
end_time=time.time()
print("time elapsed_inside_worker="+str(end_time-start_time)+"\n")
if __name__ == '__main__':
import argparse
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from tensorflowonspark import TFCluster
import time
start_time=time.time()
sc = SparkContext(conf=SparkConf().setAppName("unet_train_keras"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=32)
parser.add_argument("--buffer_size", help="size of shuffle buffer", type=int, default=10000)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--epochs", help="number of epochs", type=int, default=10)
parser.add_argument("--model_dir", help="path to save model/checkpoint", default="mnist_model")
parser.add_argument("--export_dir", help="path to export saved_model", default="mnist_export")
parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=-1)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")
args = parser.parse_args()
print("args:", args)
cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, num_ps=0, tensorboard=args.tensorboard, input_mode=TFCluster.InputMode.TENSORFLOW, master_node='chief', log_dir=args.model_dir)
end_time=time.time()
print("time elapsed="+str(end_time-start_time)+"\n")
cluster.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment