Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from typing import Dict
import torch
import ray.train as train
from ray.train.trainer import Trainer
from ray.train.callbacks import JsonLoggerCallback
from torch import nn
from torch.utils.data import DataLoader
from dataset import training_data,test_data
import time
print("file-----------")
# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 1024),
nn.ReLU(),
nn.Linear(1024, 1024),
nn.ReLU(),
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, 10),
nn.ReLU(),
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
def train_epoch(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) // train.world_size()
model.train()
for batch, (X, y) in enumerate(dataloader):
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate_epoch(dataloader, model, loss_fn):
size = len(dataloader.dataset) // train.world_size()
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(
f"Test Error: \n "
f"Accuracy: {(100 * correct):>0.1f}%, "
f"Avg loss: {test_loss:>8f} \n"
)
return test_loss
def train_func(config: Dict):
batch_size = config["batch_size"]
lr = config["lr"]
epochs = config["epochs"]
worker_batch_size = batch_size // train.world_size()
# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
train_dataloader = train.torch.prepare_data_loader(train_dataloader)
test_dataloader = train.torch.prepare_data_loader(test_dataloader)
# Create model.
model = NeuralNetwork()
model = train.torch.prepare_model(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
loss_results = []
for _ in range(epochs):
train_epoch(train_dataloader, model, loss_fn, optimizer)
loss = validate_epoch(test_dataloader, model, loss_fn)
train.report(loss=loss)
loss_results.append(loss)
return loss_results
def train_fashion_mnist(num_workers=2, use_gpu=False,resources=None):
trainer = Trainer(backend="torch", num_workers=num_workers, use_gpu=use_gpu,resources_per_worker=resources)
trainer.start()
start_time=time.time()
result = trainer.run(
train_func=train_func,
config={"lr": 1e-3, "batch_size": 2000, "epochs": 10},
callbacks=[JsonLoggerCallback()],
)
end_time =time.time()
print("train time ----------------",end_time-start_time)
trainer.shutdown()
print(f"Loss results: {result}")
if __name__ == "__main__":
print("main--------------")
import ray
address ="ray://192.168.29.7:10001"
workers =3
use_gpu =False
resources= {"CPU":2}
ray.init(address)
train_fashion_mnist(num_workers=workers, use_gpu=use_gpu,resources=resources)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment