Skip to content

Instantly share code, notes, and snippets.

@ineersa
Created September 5, 2023 14:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ineersa/a63b20f2b7487b73a8cb6273609dc1d3 to your computer and use it in GitHub Desktop.
Save ineersa/a63b20f2b7487b73a8cb6273609dc1d3 to your computer and use it in GitHub Desktop.
import gc
import logging
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from keras.applications import EfficientNetV2M
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, LearningRateScheduler
from keras.optimizers import Adam
from keras.utils import to_categorical
from sklearn.metrics import confusion_matrix
import os
from sklearn.utils import compute_class_weight
from utils.config import Config
from utils.helpers import get_train_and_validation_data, plot_history
import tensorflow as tf
import numpy as np
import keras
from keras import layers
import tensorflow_addons as tfa
from transformers import AdamWeightDecay, create_optimizer
import keras_cv.layers as cv_layers
from sklearn.model_selection import StratifiedKFold
import albumentations as A
from utils.learning_rate_tracker import LearningRateTracker
tf.keras.backend.clear_session()
logging.basicConfig(
level=logging.WARN,
format="%(asctime)s - %(name)s - %(levelname)s @ %(message)s",
datefmt="%y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(name="EfficientNet")
data_augmentation = keras.Sequential(
[
layers.Resizing(256, 256),
layers.RandomCrop(height=224, width=224),
],
name="data_augmentation",
)
val_augmentation = keras.Sequential(
[
layers.Resizing(256, 256),
],
name="val_augmentation",
)
input_shape = (256, 256, 3)
projection_units = 512
learning_rate = 2e-4
temperature = 0.05
num_classes = 6
train_batch_size = 16
eval_batch_size = 16
encoder_epochs = 10
classifier_epochs = 10
weight_decay_rate = 0.01
steps_per_epoch = 5
def create_encoder():
efficientNet = EfficientNetV2M(
include_top=False, weights="imagenet", input_shape=input_shape, pooling="avg"
)
inputs = keras.Input(shape=input_shape)
outputs = efficientNet(inputs)
model = keras.Model(inputs=inputs, outputs=outputs, name="image-encoder")
# for layer in efficientNet.layers:
# print(layer.name)
# print(model.summary())
# exit(0)
return model
def create_classifier(encoder, optimizer, trainable=True):
for layer in encoder.layers:
if not isinstance(layer, layers.BatchNormalization):
layer.trainable = trainable
inputs = keras.Input(shape=input_shape)
features = encoder(inputs)
outputs = layers.Dense(num_classes, activation="softmax")(features)
model = keras.Model(inputs=inputs, outputs=outputs, name="image-classifier")
metrics = [
tf.keras.metrics.CategoricalAccuracy(name="accuracy"),
tfa.metrics.F1Score(num_classes=6, average="macro")
]
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=metrics)
return model
def albumentations_transform(image):
transform = A.Compose([
A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.2),
A.HorizontalFlip(p=0.2),
A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=20, p=0.2),
# A.RandomResizedCrop(256, 256, scale=(0.9, 1.0), p=1.0),
A.GaussianBlur(p=0.2),
A.RGBShift(r_shift_limit=10, g_shift_limit=10, b_shift_limit=10, p=0.2),
A.RandomGamma(gamma_limit=(90, 110), p=0.2),
A.ToGray(p=0.1)
])
image = transform(image=image)['image']
return image
def load_img(img_path, label=None):
img = tf.io.read_file(img_path)
img = tf.image.decode_jpeg(img, channels=3)
img = tf.numpy_function(func=albumentations_transform, inp=[img], Tout=tf.uint8)
img.set_shape((256, 256, 3))
if label is None:
return img
else:
return img, label
def load_val_img(img_path, label=None):
img = tf.io.read_file(img_path)
img = tf.image.decode_jpeg(img, channels=3)
if label is None:
return img
else:
return img, label
def prepare_and_train(train_df, val_df, fold_index):
train_img_data_files = [os.path.join(Config.train_img_dir, img_id) for img_id in train_df["image_id"].values]
train_label_data_files = list(to_categorical(train_df["class_6"].values, num_classes=num_classes))
train_dataset = tf.data.Dataset.from_tensor_slices((train_img_data_files, train_label_data_files)).map(load_img)
classes = [0, 1, 2, 3, 4, 5]
class_weights = compute_class_weight('balanced', classes=classes, y=train_df["class_6"].values)
class_weight_dict = dict(zip(classes, class_weights))
val_img_data_files = [os.path.join(Config.train_img_dir, img_id) for img_id in val_df["image_id"].values]
val_label_data_files = list(to_categorical(val_df["class_6"].values, num_classes=num_classes))
val_dataset = tf.data.Dataset.from_tensor_slices((val_img_data_files, val_label_data_files)).map(load_val_img)
val_dataset = val_dataset.map(lambda x, y: (val_augmentation(x), y))
# Batch
train_dataset = train_dataset.batch(train_batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.cache().batch(eval_batch_size).prefetch(tf.data.AUTOTUNE)
encoder = create_encoder()
callbacks = [
EarlyStopping(patience=5, monitor="val_f1_score", mode="max"),
ModelCheckpoint(f'./models/stratified/efficientnetv2m_classifier_{fold_index}.h5', monitor='val_f1_score', verbose=1, save_best_only=True,
mode='max'),
LearningRateTracker(train_batch_size, len(train_df))
]
# create optimizer wight weigh decay
num_batches_per_epoch = len(train_df) // train_batch_size
num_train_steps = num_batches_per_epoch * 8
num_warmup_steps = 2 * num_batches_per_epoch
optimizer, lr_schedule = create_optimizer(
init_lr=learning_rate,
num_train_steps=num_train_steps,
weight_decay_rate=weight_decay_rate,
num_warmup_steps=num_warmup_steps,
min_lr_ratio=0.25
)
# Train only classifier
classifier = create_classifier(encoder, optimizer, trainable=True)
history = classifier.fit(train_dataset,
validation_data=val_dataset,
epochs=classifier_epochs,
callbacks=callbacks,
class_weight=class_weight_dict
)
# confusion matrix
predictions = classifier.predict(val_dataset)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = []
for images, labels in val_dataset:
true_classes.extend(np.argmax(labels.numpy(), axis=1))
class_labels = ["0", "1", "2", "3", "4", "5"]
cm = confusion_matrix(true_classes, predicted_classes)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.title(f'Confusion Matrix Fold#{fold_index}')
plt.show()
plot_history(history)
def run():
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
_, _, df = get_train_and_validation_data()
bad_train_im = [
'098f33ec4437ae0c87eb52485b5070db.jpg', '1993ef21ac6839457f27e273498c3030.jpg',
'1a78cc758607dd9cb8357c4cf459dcb7.jpg', '1f9bec82fb35a41247b1e5bfb81c45d0.jpg',
'351cb5528affabd6fbde8556d9822766.jpg', '7bbf37562bbe04c0617ea6c7f44dacb2.jpg',
'81e22077bd7134e77f8ba530a2747c5d.jpg', 'a34a9002043a408557900f27b6358bff.jpg',
'add75b20dbd8cb37bfb2a3d6ad9f0826.jpg', 'b6f9cf4dd23440bc6dae46c15c34032f.jpg',
'b8509a69920fed78bf0f0edcb120c857.jpg', 'c69a0fcd678c03174d298b932aa3dbfa.jpg',
'dc52b9068dd57e06ed4855e43f519dcc.jpg', 'fc6a93e2e6df61a9ee04f741aff9c352.jpg',
]
df = df[~df.image_id.isin(bad_train_im)]
for index, (train_indexes, val_indexes) in enumerate(skf.split(df["image_id"].values, df["class_6"].values)):
print(f"Training fold #{index}")
train_df = df.iloc[train_indexes]
val_df = df.iloc[val_indexes]
prepare_and_train(train_df, val_df, index)
def predict_with_model(model_path, data, return_classes=True):
model = keras.models.load_model(
model_path,
custom_objects={
'f1_score': tfa.metrics.F1Score(num_classes=6, average="macro"),
'AdamWeightDecay': AdamWeightDecay
}
)
if return_classes:
predictions = np.argmax(model.predict(data), axis=-1)
else:
predictions = model.predict(data)
# Clean up
del model
tf.keras.backend.clear_session()
gc.collect()
return predictions
def soft_voting_predictions(all_probabilities):
# Average the probabilities from all models
avg_probabilities = np.mean(np.stack(all_probabilities, axis=0), axis=0)
print(f"Shape of avg_probabilities: {avg_probabilities.shape}")
# Convert averaged probabilities to final predictions
final_preds = np.argmax(avg_probabilities, axis=-1)
print(f"Shape of final_preds: {final_preds.shape}")
return final_preds
def run_predictions():
results = []
batch_size = 8
num_folds = 5
all_predictions = []
filenames = Config.test_img_dir
test_data_files = [os.path.join(Config.test_img_dir, img_id) for img_id in
filenames]
test_dataset = tf.data.Dataset.from_tensor_slices(test_data_files)
test_dataset = test_dataset.map(lambda x: load_val_img(x))
test_dataset = test_dataset.map(lambda x: val_augmentation(x))
test_dataset = test_dataset.batch(batch_size)
for i in range(num_folds):
model_path = f"./models/stratified/efficientnetv2m_classifier_{i}.h5"
fold_predictions = predict_with_model(model_path, test_dataset, False)
all_predictions.append(fold_predictions)
predicted_classes = soft_voting_predictions(all_predictions)
for filename, predicted_class_idx in zip(filenames, predicted_classes):
results.append({
"image_id": filename,
"class_6": predicted_class_idx
})
df_predictions = pd.DataFrame.from_dict(results)
print(df_predictions.head)
df_predictions.to_csv('./data/submission.csv', index=False)
if __name__ == '__main__':
# run()
run_predictions()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment