"import os\n",
"from random import shuffle\n",
"import torch\n",
"import torch.nn as nn\n",
"from torch.autograd import Variable\n",
"import as data\n",
"import torch.optim as optim\n",
"from torch.optim import lr_scheduler\n",
"import torchvision.models as models\n",
"from torchvision import transforms\n",
"from PIL import Image\n",
"from tqdm import tqdm"
"device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n",
"if torch.cuda.is_available():\n",
" use_gpu = True\n",
" print(\"Using GPU\")\n",
" use_gpu = False\n",
"FloatTensor = torch.cuda.FloatTensor if use_gpu else torch.FloatTensor\n",
"LongTensor = torch.cuda.LongTensor if use_gpu else torch.LongTensor\n",
"ByteTensor = torch.cuda.ByteTensor if use_gpu else torch.ByteTensor\n",
"Tensor = FloatTensor"
"class ImageDataset(data.Dataset):\n",
" \n",
" def __init__(self, image_paths, class_dict, transform=None):\n",
" self.image_paths = image_paths\n",
" self.transform = transform\n",
" self.class_dict = class_dict\n",
" \n",
" def __len__(self):\n",
" return len(self.image_paths)\n",
" \n",
" def __getitem__(self, index):\n",
" img_path = self.image_paths[index]\n",
" image =\n",
" if self.transform:\n",
" image = self.transform(image)\n",
" for key, value in class_dict.items():\n",
" if key in img_path:\n",
" label = value\n",
" break\n",
" label = torch.tensor(label)\n",
" return [image, label]"
"def get_resnet_model(num_classes):\n",
" resnet = models.resnet18(pretrained='imagenet')\n",
" num_hidden = resnet.fc.in_features\n",
" resnet.fc = nn.Linear(num_hidden, num_classes)\n",
" for name, params in resnet.named_parameters():\n",
" if 'layer3' not in name:\n",
" params.requires_grad = False\n",
" else:\n",
" break\n",
" return resnet"
"def train_model(model, dataloaders, dataset_sizes, criterion, optimizer, scheduler, num_epochs=25):\n",
" best_model_wts = model.state_dict()\n",
" best_acc = 0.0\n",
" for epoch in range(num_epochs):\n",
" print('Epoch {}/{}'.format(epoch, num_epochs - 1))\n",
" print('-' * 10)\n",
" for phase in ['train', 'val']:\n",
" \n",
" if phase == 'train':\n",
" scheduler.step()\n",
" model.train(True)\n",
" else:\n",
" model.train(False)\n",
" running_loss = 0.0\n",
" running_corrects = 0.0\n",
" for data in tqdm(dataloaders[phase]):\n",
" inputs, labels = data\n",
" inputs = Variable(inputs.type(Tensor))\n",
" labels = Variable(labels.type(LongTensor))\n",
" optimizer.zero_grad()\n",
" outputs = model(inputs)\n",
" if type(outputs) == tuple:\n",
" outputs, _ = outputs\n",
" _, preds = torch.max(, 1)\n",
" loss = criterion(outputs, labels)\n",
" if phase == 'train':\n",
" loss.backward()\n",
" optimizer.step()\n",
" \n",
" batch_loss =[0]\n",
" batch_acc = torch.sum(preds ==\n",
" \n",
" running_loss += batch_loss\n",
" running_corrects += batch_acc\n",
" epoch_loss = running_loss / dataset_sizes[phase]\n",
" epoch_acc = float(running_corrects) / dataset_sizes[phase]\n",
" print('{} Loss: {:.4f} Acc: {:.4f}'.format(\n",
" phase, epoch_loss, epoch_acc))\n",
" if phase == 'val' and epoch_acc > best_acc:\n",
" best_acc = epoch_acc\n",
" best_model_wts = model.state_dict()\n",
" print()\n",
" print('Best validation accuarcy: {:4f}'.format(best_acc))\n",
" model.load_state_dict(best_model_wts)\n",
" return model"
"data_dir = './natural_images/'\n",
"mean = [0.485, 0.456, 0.406]\n",
"std = [0.229, 0.224, 0.225]\n",
"scale = 360\n",
"input_shape = 224\n",
"batch_size = 128\n",
"epochs = 20\n",
"num_classes = 8"
"img_files = []\n",
"classes = os.listdir(data_dir)\n",
"class_dict = {label: i for i, label in enumerate(classes)}\n",
"for directory in os.listdir(data_dir):\n",
" for filename in os.listdir(os.path.join(data_dir, directory)):\n",
" img_files.append(os.path.join(data_dir, directory, filename))\n",
"train_length = int(0.6*len(img_files))\n",
"val_length = int(0.8*len(img_files))\n",
"img_files = {'train': img_files[:train_length],\n",
" 'val': img_files[train_length:val_length],\n",
" 'test': img_files[val_length:]}"
"data_transforms = {\n",
" 'train': transforms.Compose([\n",
" transforms.Resize(scale),\n",
" transforms.RandomResizedCrop(input_shape),\n",
" transforms.RandomHorizontalFlip(),\n",
" transforms.RandomVerticalFlip(),\n",
" transforms.ToTensor(),\n",
" transforms.Normalize(mean, std)]),\n",
" \n",
" 'val': transforms.Compose([\n",
" transforms.Resize(scale),\n",
" transforms.CenterCrop(input_shape),\n",
" transforms.ToTensor(),\n",
" transforms.Normalize(mean, std)])\n",
"image_datasets = {x: ImageDataset(img_files[x], class_dict, data_transforms[x]) for x in ['train', 'val']}\n",
"dataloaders = {x:[x], batch_size=batch_size,\n",
" shuffle=True, num_workers=4) for x in ['train', 'val']}\n",
"dataset_sizes = {x: float(len(image_datasets[x])) for x in ['train', 'val']}"
"model = get_resnet_model(num_classes).to(device)\n",
"criterion = nn.CrossEntropyLoss()\n",
"optimizer = optim.SGD(list(filter(lambda p: p.requires_grad, model.parameters())), lr=0.001, momentum=0.9)\n",
"exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)\n",
"model = train_model(model, dataloaders, dataset_sizes, criterion, optimizer, exp_lr_scheduler, num_epochs=epochs)"
"test_transform = transforms.Compose([\n",
" transforms.Resize(scale),\n",
" transforms.CenterCrop(input_shape),\n",
" transforms.ToTensor(),\n",
" transforms.Normalize(mean, std)])\n",
"test_set = ImageDataset(img_files['test'], class_dict, test_transform)\n",
"test_loader =, batch_size=batch_size,\n",
" shuffle=True, num_workers=4)\n",
"test_size = float(len(test_set))"
"running_loss = 0.0\n",
"running_corrects = 0.0\n",
"for data in tqdm(test_loader):\n",
" inputs, labels = data\n",
" inputs = Variable(inputs.type(Tensor))\n",
" labels = Variable(labels.type(LongTensor))\n",
" outputs = model(inputs)\n",
" if type(outputs) == tuple:\n",
" outputs, _ = outputs\n",
" _, preds = torch.max(, 1)\n",
" \n",
" running_corrects += torch.sum(preds ==\n",
"accuracy = float(running_corrects) / test_size\n",
"print(\"Test Accuracy: {}\".format(accuracy))"
