Skip to content

Instantly share code, notes, and snippets.

@MrRjxrby
Last active June 28, 2025 21:04
Show Gist options
  • Select an option

  • Save MrRjxrby/3581af98bb0a5ac51b480c36f70a6c50 to your computer and use it in GitHub Desktop.

Select an option

Save MrRjxrby/3581af98bb0a5ac51b480c36f70a6c50 to your computer and use it in GitHub Desktop.
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler
import gc
# Конфигурация устройства (GPU/CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Загрузка и предобработка данных
def data_loader(data_dir, batch_size, random_seed=42, valid_size=0.1, shuffle=True, test=False):
# Нормализация данных для CIFAR-10
normalize = transforms.Normalize(
mean=[0.4914, 0.4822, 0.4465], # Средние значения для каналов RGB
std=[0.2023, 0.1994, 0.2010], # Стандартные отклонения
)
# Преобразования данных
transform = transforms.Compose([
transforms.Resize((224,224)), # Изменение размера изображений
transforms.ToTensor(), # Конвертация в тензор
normalize, # Нормализация
])
# Загрузка тестовых данных
if test:
dataset = datasets.CIFAR10(
root=data_dir,
train=False,
download=True,
transform=transform
)
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
# Загрузка и разделение тренировочных данных
train_dataset = datasets.CIFAR10(
root=data_dir,
train=True,
download=True,
transform=transform
)
valid_dataset = datasets.CIFAR10(
root=data_dir,
train=True,
download=True,
transform=transform
)
# Разделение данных на тренировочные и валидационные
num_train = len(train_dataset)
indices = list(range(num_train))
split = int(np.floor(valid_size * num_train))
if shuffle:
np.random.seed(random_seed)
np.random.shuffle(indices)
# Создание загрузчиков данных
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
sampler=SubsetRandomSampler(indices[split:]))
valid_loader = torch.utils.data.DataLoader(
valid_dataset,
batch_size=batch_size,
sampler=SubsetRandomSampler(indices[:split]))
return train_loader, valid_loader
# Реализация остаточного блока
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
# Первая свертка
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU()
)
# Вторая свертка
self.conv2 = nn.Sequential(
nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(out_channels)
)
self.downsample = downsample # Опциональное преобразование для skip-connection
self.relu = nn.ReLU()
def forward(self, x):
residual = x # Сохраняем вход для skip-connection
out = self.conv1(x)
out = self.conv2(out)
# Если нужно изменить размерность или количество каналов
if self.downsample:
residual = self.downsample(x)
# Складываем выход и skip-connection
out += residual
return self.relu(out)
# Реализация архитектуры ResNet
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=10):
super(ResNet, self).__init__()
self.inplanes = 64 # Начальное количество каналов
# Первоначальная свертка
self.conv1 = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU()
)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# Создание слоев с остаточными блоками
self.layer0 = self._make_layer(block, 64, layers[0], stride=1)
self.layer1 = self._make_layer(block, 128, layers[1], stride=2)
self.layer2 = self._make_layer(block, 256, layers[2], stride=2)
self.layer3 = self._make_layer(block, 512, layers[3], stride=2)
# Финальные слои
self.avgpool = nn.AvgPool2d(7, stride=1)
self.fc = nn.Linear(512, num_classes)
def _make_layer(self, block, planes, blocks, stride=1):
downsample = None
# Если нужно изменить размерность или количество каналов
if stride != 1 or self.inplanes != planes:
downsample = nn.Sequential(
nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
nn.BatchNorm2d(planes),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample))
self.inplanes = planes
# Добавляем оставшиеся блоки
for _ in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
def forward(self, x):
# Прямой проход через все слои
x = self.conv1(x)
x = self.maxpool(x)
x = self.layer0(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1) # Вытягиваем в вектор
x = self.fc(x) # Финальный полносвязный слой
return x
# Параметры обучения
num_classes = 10 # Количество классов в CIFAR-10
num_epochs = 20 # Количество эпох
batch_size = 16 # Размер батча
learning_rate = 0.01 # Скорость обучения
# Инициализация загрузчиков данных и модели
train_loader, valid_loader = data_loader(data_dir='./data', batch_size=batch_size)
test_loader = data_loader(data_dir='./data', batch_size=batch_size, test=True)
model = ResNet(ResidualBlock, [3, 4, 6, 3]).to(device) # ResNet34-архитектура
# Функция потерь и оптимизатор
criterion = nn.CrossEntropyLoss() # Функция потерь
optimizer = torch.optim.SGD( # SGD-оптимизатор
model.parameters(),
lr=learning_rate,
weight_decay=0.001, # L2-регуляризация
momentum=0.9 # Момент
)
# Процесс обучения
total_step = len(train_loader)
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(train_loader):
# Перенос данных на устройство (GPU/CPU)
images, labels = images.to(device), labels.to(device)
# Прямой проход
outputs = model(images)
loss = criterion(outputs, labels)
# Обратный проход и оптимизация
optimizer.zero_grad() # Обнуляем градиенты
loss.backward() # Вычисляем градиенты
optimizer.step() # Обновляем веса
# Очистка памяти
del images, labels, outputs
torch.cuda.empty_cache()
gc.collect()
# Валидация после каждой эпохи
with torch.no_grad():
correct, total = 0, 0
for images, labels in valid_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
del images, labels, outputs
print(f'Эпоха [{epoch+1}/{num_epochs}], Потери: {loss.item():.4f}, '
f'Точность на валидации: {100 * correct/total:.2f}%')
# Тестирование модели
with torch.no_grad():
correct, total = 0, 0
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'Точность на тестовых данных: {100 * correct/total:.2f}%')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment