单设备训练场景

本教程演示如何记录并展示单设备训练场景下的训练产生的信息、指标和文件(以 PyTorch 模型在单个 CPU 或 GPU 上的训练为例)。

运行示例

请按照使用方法准备环境,然后前往本教程对应的示例,参照其 README 文档运行。

下面介绍训练脚本进行了哪些修改以追踪训练。

准备训练脚本

准备一个 PyTorch 训练脚本,其模型对 MNIST 数据集的图像进行分类,具体代码如下所示。接下来将在此脚本的基础上进行简单的修改以进行追踪。

torch_mnist.py
import argparse
import logging
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

parser = argparse.ArgumentParser(
    description='Recording of training data of PyTorch model for MNIST with EM.'
)
parser.add_argument('--no_cuda',
                    action='store_true',
                    default=False,
                    help='Disable CUDA training.')
logging.basicConfig(format='%(message)s', level=logging.INFO)


class Net(nn.Module):

    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 64, 1)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dense1 = nn.Linear(576, 64)
        self.dense2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = F.relu(self.conv3(x))
        x = torch.flatten(x, 1)
        x = F.relu(self.dense1(x))
        output = F.softmax(self.dense2(x), dim=1)
        return output


def train(scheduler):
    global global_step
    for epoch in range(1, epochs + 1):
        model.train()
        for step, (data, target) in enumerate(train_loader, 1):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            if step % 500 == 0:
                train_loss = loss.item()
                logging.info(
                    'epoch {:d}/{:d}, batch {:5d}/{:d} with loss: {:.4f}'.
                    format(epoch, epochs, step, steps_per_epoch, train_loss))
                global_step = (epoch - 1) * steps_per_epoch + step

        scheduler.step()
        global_step = epoch * steps_per_epoch
        test(val=True, epoch=epoch)


def test(val=False, epoch=None):
    label = 'val' if val else 'test'
    model.eval()
    running_loss = 0.0
    correct = 0

    with torch.no_grad():
        loader = val_loader if val else test_loader
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            running_loss += loss.item()
            prediction = output.max(1)[1]
            correct += (prediction == target).sum().item()

    test_loss = running_loss / len(loader)
    test_accuracy = correct / len(loader.dataset)
    msg = '{:s} loss: {:.4f}, {:s} accuracy: {:.4f}'.format(
        label, test_loss, label, test_accuracy)
    if val:
        msg = 'epoch {:d}/{:d} with '.format(epoch, epochs) + msg
    logging.info(msg)


if __name__ == '__main__':
    args = parser.parse_args()

    use_cuda = not args.no_cuda and torch.cuda.is_available()
    if use_cuda:
        logging.info('Using CUDA')
    device = torch.device('cuda' if use_cuda else 'cpu')
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

    torch.manual_seed(1)

    model = Net().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.7)

    dataset_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
                                'data')
    transform = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.5), (0.5))])
    train_dataset = datasets.MNIST(root=dataset_path,
                                   train=True,
                                   download=True,
                                   transform=transform)
    train_dataset, val_dataset = torch.utils.data.random_split(
        train_dataset, [48000, 12000])
    test_dataset = datasets.MNIST(root=dataset_path,
                                  train=False,
                                  download=True,
                                  transform=transform)
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=32,
        shuffle=True,
        **kwargs)
    val_loader = torch.utils.data.DataLoader(val_dataset,
                                             batch_size=400,
                                             shuffle=False,
                                             **kwargs)
    test_loader = torch.utils.data.DataLoader(test_dataset,
                                              batch_size=1000,
                                              shuffle=False,
                                              **kwargs)

    global_step = 0
    epochs = 10
    steps_per_epoch = len(train_loader)
    train(scheduler)
    test()

    torch.save(model.state_dict(), 'model_state_dict.pt')

创建 Run

在建立模型之前,创建并初始化一个 Run 实例。

from t9k import em

if __name__ == '__main__':
    ...
    run = em.create_run(name='mnist_torch')  # Run 的名称
    ...

Run 实例的 hparams 属性用于保存超参数,使用类似 Python 字典的方法更新它以设定超参数。

if __name__ == '__main__':
    ...
    run.hparams.update({
        'batch_size': 32,
        'epochs': 10,
        'learning_rate': 0.001,
        'learning_rate_decay_period': 1,
        'learning_rate_decay_factor': 0.7,
        'conv_channels1': 32,
        'conv_channels2': 64,
        'conv_channels3': 64,
        'conv_kernel_size': 3,
        'maxpool_size': 2,
        'linear_features1': 64,
        'seed': 1,
    })
    hparams = run.hparams
    ...

使用设定的超参数配置模型

使用上面设定的试验的超参数替换直接提供的超参数值,以配置模型各层、数据集、优化器、训练流程等。

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, hparams['conv_channels1'],
                               hparams['conv_kernel_size'], 1)
        self.conv2 = nn.Conv2d(hparams['conv_channels1'],
                               hparams['conv_channels2'],
                               hparams['conv_kernel_size'], 1)
        self.conv3 = nn.Conv2d(hparams['conv_channels2'],
                               hparams['conv_channels3'],
                               hparams['conv_kernel_size'], 1)
        self.pool = nn.MaxPool2d(hparams['maxpool_size'],
                                 hparams['maxpool_size'])
        self.dense1 = nn.Linear(576, hparams['linear_features1'])
        self.dense2 = nn.Linear(hparams['linear_features1'], 10)
    ...

if __name__ == '__main__':
    ...
    torch.manual_seed(hparams['seed'])
    ...
    optimizer = optim.Adam(model.parameters(), lr=hparams['learning_rate'])
    scheduler = optim.lr_scheduler.StepLR(
        optimizer,
        step_size=hparams['learning_rate_decay_period'],
        gamma=hparams['learning_rate_decay_factor'])
    ...
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=hparams['batch_size'],
        shuffle=True,
        **kwargs)
    ...
    epochs = hparams['epochs']
    ...

记录指标

在模型的训练和测试过程中调用 Run 实例的 log() 方法,以记录模型在此期间产生的指标。

def train(scheduler):
    ...
            if step % 500 == 0:
                train_loss = loss.item()
                logging.info(
                    'epoch {:d}/{:d}, batch {:5d}/{:d} with loss: {:.4f}'.
                    format(epoch, epochs, step, steps_per_epoch, train_loss))
                global_step = (epoch - 1) * steps_per_epoch + step

                run.log(type='train',                    # 记录训练指标
                        metrics={'loss': train_loss},    # 指标名称及相应值
                        step=global_step,                # 当前全局步数
                        epoch=epoch)                     # 当前回合数
    ...

def test(val=False, epoch=None):
    ...
    test_loss = running_loss / len(loader)
    test_accuracy = correct / len(loader.dataset)
    msg = '{:s} loss: {:.4f}, {:s} accuracy: {:.4f}'.format(
        label, test_loss, label, test_accuracy)
    if val:
        msg = 'epoch {:d}/{:d} with '.format(epoch, epochs) + msg
    logging.info(msg)

    run.log(type=label,    # 记录验证/测试指标
            metrics={
                'loss': test_loss,
                'accuracy': test_accuracy,
            },
            step=global_step,
            epoch=epoch)

创建 Artifact 并添加模型检查点文件

在保存模型检查点文件之后,创建并初始化一个新的 Artifact 实例,为其添加该检查点文件,并标记为 Run 的输出。

if __name__ == '__main__':
    ...
    torch.save(model.state_dict(), 'model_state_dict.pt')
    model_artifact = em.create_artifact(name='mnist_torch_saved_model')
    model_artifact.add_file('model_state_dict.pt')
    run.mark_output(model_artifact)
    ...

结束和上传 Run

模型的训练和测试结束后,调用 Run 实例的 finish()upload() 方法以结束和上传 Run(Artifact 也会被一并上传)。在上传之前需要调用 em.login() 函数以登录到 AIStore 服务器。

if __name__ == '__main__':
    ...
    run.finish()
    em.login()
    run.upload(folder='em-examples', make_folder=True)

检查 Run 和 Artifact

训练结束后,进入实验管理控制台,可以看到名为 mnist_torch 的 Run 及其输出的 Artifact 被上传:

uploaded

点击 Run 或 Artifact 的名称进入其详情页面,可以看到 Run 的平台信息、指标、超参数和数据流,以及 Artifact 的文件和数据流。并且它们的数据流是连通的。

run
artifact
dataflow

下一步