Created
September 5, 2023 14:56
-
-
Save ineersa/a63b20f2b7487b73a8cb6273609dc1d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
import logging | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import seaborn as sns | |
from keras.applications import EfficientNetV2M | |
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, LearningRateScheduler | |
from keras.optimizers import Adam | |
from keras.utils import to_categorical | |
from sklearn.metrics import confusion_matrix | |
import os | |
from sklearn.utils import compute_class_weight | |
from utils.config import Config | |
from utils.helpers import get_train_and_validation_data, plot_history | |
import tensorflow as tf | |
import numpy as np | |
import keras | |
from keras import layers | |
import tensorflow_addons as tfa | |
from transformers import AdamWeightDecay, create_optimizer | |
import keras_cv.layers as cv_layers | |
from sklearn.model_selection import StratifiedKFold | |
import albumentations as A | |
from utils.learning_rate_tracker import LearningRateTracker | |
tf.keras.backend.clear_session() | |
logging.basicConfig( | |
level=logging.WARN, | |
format="%(asctime)s - %(name)s - %(levelname)s @ %(message)s", | |
datefmt="%y-%m-%d %H:%M:%S", | |
) | |
logger = logging.getLogger(name="EfficientNet") | |
data_augmentation = keras.Sequential( | |
[ | |
layers.Resizing(256, 256), | |
layers.RandomCrop(height=224, width=224), | |
], | |
name="data_augmentation", | |
) | |
val_augmentation = keras.Sequential( | |
[ | |
layers.Resizing(256, 256), | |
], | |
name="val_augmentation", | |
) | |
input_shape = (256, 256, 3) | |
projection_units = 512 | |
learning_rate = 2e-4 | |
temperature = 0.05 | |
num_classes = 6 | |
train_batch_size = 16 | |
eval_batch_size = 16 | |
encoder_epochs = 10 | |
classifier_epochs = 10 | |
weight_decay_rate = 0.01 | |
steps_per_epoch = 5 | |
def create_encoder(): | |
efficientNet = EfficientNetV2M( | |
include_top=False, weights="imagenet", input_shape=input_shape, pooling="avg" | |
) | |
inputs = keras.Input(shape=input_shape) | |
outputs = efficientNet(inputs) | |
model = keras.Model(inputs=inputs, outputs=outputs, name="image-encoder") | |
# for layer in efficientNet.layers: | |
# print(layer.name) | |
# print(model.summary()) | |
# exit(0) | |
return model | |
def create_classifier(encoder, optimizer, trainable=True): | |
for layer in encoder.layers: | |
if not isinstance(layer, layers.BatchNormalization): | |
layer.trainable = trainable | |
inputs = keras.Input(shape=input_shape) | |
features = encoder(inputs) | |
outputs = layers.Dense(num_classes, activation="softmax")(features) | |
model = keras.Model(inputs=inputs, outputs=outputs, name="image-classifier") | |
metrics = [ | |
tf.keras.metrics.CategoricalAccuracy(name="accuracy"), | |
tfa.metrics.F1Score(num_classes=6, average="macro") | |
] | |
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=metrics) | |
return model | |
def albumentations_transform(image): | |
transform = A.Compose([ | |
A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.2), | |
A.HorizontalFlip(p=0.2), | |
A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=20, p=0.2), | |
# A.RandomResizedCrop(256, 256, scale=(0.9, 1.0), p=1.0), | |
A.GaussianBlur(p=0.2), | |
A.RGBShift(r_shift_limit=10, g_shift_limit=10, b_shift_limit=10, p=0.2), | |
A.RandomGamma(gamma_limit=(90, 110), p=0.2), | |
A.ToGray(p=0.1) | |
]) | |
image = transform(image=image)['image'] | |
return image | |
def load_img(img_path, label=None): | |
img = tf.io.read_file(img_path) | |
img = tf.image.decode_jpeg(img, channels=3) | |
img = tf.numpy_function(func=albumentations_transform, inp=[img], Tout=tf.uint8) | |
img.set_shape((256, 256, 3)) | |
if label is None: | |
return img | |
else: | |
return img, label | |
def load_val_img(img_path, label=None): | |
img = tf.io.read_file(img_path) | |
img = tf.image.decode_jpeg(img, channels=3) | |
if label is None: | |
return img | |
else: | |
return img, label | |
def prepare_and_train(train_df, val_df, fold_index): | |
train_img_data_files = [os.path.join(Config.train_img_dir, img_id) for img_id in train_df["image_id"].values] | |
train_label_data_files = list(to_categorical(train_df["class_6"].values, num_classes=num_classes)) | |
train_dataset = tf.data.Dataset.from_tensor_slices((train_img_data_files, train_label_data_files)).map(load_img) | |
classes = [0, 1, 2, 3, 4, 5] | |
class_weights = compute_class_weight('balanced', classes=classes, y=train_df["class_6"].values) | |
class_weight_dict = dict(zip(classes, class_weights)) | |
val_img_data_files = [os.path.join(Config.train_img_dir, img_id) for img_id in val_df["image_id"].values] | |
val_label_data_files = list(to_categorical(val_df["class_6"].values, num_classes=num_classes)) | |
val_dataset = tf.data.Dataset.from_tensor_slices((val_img_data_files, val_label_data_files)).map(load_val_img) | |
val_dataset = val_dataset.map(lambda x, y: (val_augmentation(x), y)) | |
# Batch | |
train_dataset = train_dataset.batch(train_batch_size).prefetch(tf.data.AUTOTUNE) | |
val_dataset = val_dataset.cache().batch(eval_batch_size).prefetch(tf.data.AUTOTUNE) | |
encoder = create_encoder() | |
callbacks = [ | |
EarlyStopping(patience=5, monitor="val_f1_score", mode="max"), | |
ModelCheckpoint(f'./models/stratified/efficientnetv2m_classifier_{fold_index}.h5', monitor='val_f1_score', verbose=1, save_best_only=True, | |
mode='max'), | |
LearningRateTracker(train_batch_size, len(train_df)) | |
] | |
# create optimizer wight weigh decay | |
num_batches_per_epoch = len(train_df) // train_batch_size | |
num_train_steps = num_batches_per_epoch * 8 | |
num_warmup_steps = 2 * num_batches_per_epoch | |
optimizer, lr_schedule = create_optimizer( | |
init_lr=learning_rate, | |
num_train_steps=num_train_steps, | |
weight_decay_rate=weight_decay_rate, | |
num_warmup_steps=num_warmup_steps, | |
min_lr_ratio=0.25 | |
) | |
# Train only classifier | |
classifier = create_classifier(encoder, optimizer, trainable=True) | |
history = classifier.fit(train_dataset, | |
validation_data=val_dataset, | |
epochs=classifier_epochs, | |
callbacks=callbacks, | |
class_weight=class_weight_dict | |
) | |
# confusion matrix | |
predictions = classifier.predict(val_dataset) | |
predicted_classes = np.argmax(predictions, axis=1) | |
true_classes = [] | |
for images, labels in val_dataset: | |
true_classes.extend(np.argmax(labels.numpy(), axis=1)) | |
class_labels = ["0", "1", "2", "3", "4", "5"] | |
cm = confusion_matrix(true_classes, predicted_classes) | |
plt.figure(figsize=(10, 8)) | |
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels) | |
plt.ylabel('True Label') | |
plt.xlabel('Predicted Label') | |
plt.title(f'Confusion Matrix Fold#{fold_index}') | |
plt.show() | |
plot_history(history) | |
def run(): | |
n_splits = 5 | |
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42) | |
_, _, df = get_train_and_validation_data() | |
bad_train_im = [ | |
'098f33ec4437ae0c87eb52485b5070db.jpg', '1993ef21ac6839457f27e273498c3030.jpg', | |
'1a78cc758607dd9cb8357c4cf459dcb7.jpg', '1f9bec82fb35a41247b1e5bfb81c45d0.jpg', | |
'351cb5528affabd6fbde8556d9822766.jpg', '7bbf37562bbe04c0617ea6c7f44dacb2.jpg', | |
'81e22077bd7134e77f8ba530a2747c5d.jpg', 'a34a9002043a408557900f27b6358bff.jpg', | |
'add75b20dbd8cb37bfb2a3d6ad9f0826.jpg', 'b6f9cf4dd23440bc6dae46c15c34032f.jpg', | |
'b8509a69920fed78bf0f0edcb120c857.jpg', 'c69a0fcd678c03174d298b932aa3dbfa.jpg', | |
'dc52b9068dd57e06ed4855e43f519dcc.jpg', 'fc6a93e2e6df61a9ee04f741aff9c352.jpg', | |
] | |
df = df[~df.image_id.isin(bad_train_im)] | |
for index, (train_indexes, val_indexes) in enumerate(skf.split(df["image_id"].values, df["class_6"].values)): | |
print(f"Training fold #{index}") | |
train_df = df.iloc[train_indexes] | |
val_df = df.iloc[val_indexes] | |
prepare_and_train(train_df, val_df, index) | |
def predict_with_model(model_path, data, return_classes=True): | |
model = keras.models.load_model( | |
model_path, | |
custom_objects={ | |
'f1_score': tfa.metrics.F1Score(num_classes=6, average="macro"), | |
'AdamWeightDecay': AdamWeightDecay | |
} | |
) | |
if return_classes: | |
predictions = np.argmax(model.predict(data), axis=-1) | |
else: | |
predictions = model.predict(data) | |
# Clean up | |
del model | |
tf.keras.backend.clear_session() | |
gc.collect() | |
return predictions | |
def soft_voting_predictions(all_probabilities): | |
# Average the probabilities from all models | |
avg_probabilities = np.mean(np.stack(all_probabilities, axis=0), axis=0) | |
print(f"Shape of avg_probabilities: {avg_probabilities.shape}") | |
# Convert averaged probabilities to final predictions | |
final_preds = np.argmax(avg_probabilities, axis=-1) | |
print(f"Shape of final_preds: {final_preds.shape}") | |
return final_preds | |
def run_predictions(): | |
results = [] | |
batch_size = 8 | |
num_folds = 5 | |
all_predictions = [] | |
filenames = Config.test_img_dir | |
test_data_files = [os.path.join(Config.test_img_dir, img_id) for img_id in | |
filenames] | |
test_dataset = tf.data.Dataset.from_tensor_slices(test_data_files) | |
test_dataset = test_dataset.map(lambda x: load_val_img(x)) | |
test_dataset = test_dataset.map(lambda x: val_augmentation(x)) | |
test_dataset = test_dataset.batch(batch_size) | |
for i in range(num_folds): | |
model_path = f"./models/stratified/efficientnetv2m_classifier_{i}.h5" | |
fold_predictions = predict_with_model(model_path, test_dataset, False) | |
all_predictions.append(fold_predictions) | |
predicted_classes = soft_voting_predictions(all_predictions) | |
for filename, predicted_class_idx in zip(filenames, predicted_classes): | |
results.append({ | |
"image_id": filename, | |
"class_6": predicted_class_idx | |
}) | |
df_predictions = pd.DataFrame.from_dict(results) | |
print(df_predictions.head) | |
df_predictions.to_csv('./data/submission.csv', index=False) | |
if __name__ == '__main__': | |
# run() | |
run_predictions() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment