NSDT工具推荐Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 可编程3D场景编辑器 - REVIT导出3D模型插件 - 3D模型语义搜索引擎 - Three.js虚拟轴心开发包

在上一篇文章中我们全面介绍了如何使用 fast.ai 构建自动驾驶卡车模拟器,最终这些方法可以处理任何你需要微调预训练模型或开发预测包围框和分类的情况。

现在我的目标是逐步了解训练和推理过程的一些更技术性的方面,并解释它们如何在 PyTorch 中实现的细节。 你可以参考此 Github 存储库中的代码库。

回想上一篇文章,有两个神经网络在工作。

  • 预测转弯方向的 DNN。
  • 预测汽车、人等的包围框和类别的 DNN。

1、微调转弯方向模型

两个网络都以预训练的 resnet34 网络开始,并针对适当的任务进行微调。

可以从 torchvision.models 获得预训练的 resnet34:

import torchvision.models as models
arch = models.resnet34(pretrained=True)

所有预训练模型都已在 1000 个分类的 Imagenet 数据集上进行了预训练。

为了微调预训练网络,我们基本上是从一组权重开始,这些权重已经包含了很多关于嵌入其中的 Imagenet 数据集的信息。 所以我们可以用两种方法之一来做到这一点。 一种方法是通过设置 requires_grad=False 来冻结所有早期层,然后只为最后一层设置 requires_grad=True。 另一种方法只使用所有权重作为初始化值并继续对我们的新训练数据进行训练。

对于冻结早期层并仅训练最终层的选项 1,我们可以为所有层设置 requires_grad=False,然后删除并替换最后一层。无论何时将层分配给网络,它都会自动将  requires_grad 属性设置为 True 。

class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()
def forward(self, x):
        x = x.view(x.size(0), -1)
        return x
class normalize(nn.Module):
    def __init__(self):
        super(normalize, self).__init__()
def forward(self, x):
        x = F.normalize(x, p=2, dim=1)
        return x
layer_list = list(arch.children())[-2:]
arch = nn.Sequential(*list(arch.children())[:-2])
arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
arch.fc = nn.Sequential(
    Flatten(),
    nn.Linear(in_features=layer_list[1].in_features, 
              out_features=3, 
              bias=True),
    normalize()
)
arch = arch.to(device)

如果查看 resnet34 的架构,你会发现最后一个 conv 块后跟一个 AdaptiveAvgPool2d 和一个 Linear 层。

我们可以使用  nn.Sequential(*list(arch.children())[:-2]) 删除最后两层,然后使用 arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1 ,1)) 和另一个带有 Flatten、Linear 和 normalize 层的 nn.Sequential。 我们最终想要预测 3 个类别:左、右、直——所以我们的 out_features 将为 3。

现在我们将为方向模型创建数据集和数据加载器。 由于我们的数据只是图像和分类 [left, right, straight],我们可以只使用内置的 torch 数据集类,但无论如何我都喜欢使用自定义类,因为我可以准确地看到数据是如何更容易地提取的。

class DirectionsDataset(Dataset):
    """Directions dataset."""
    def __init__(self, csv_file, root_dir, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with labels.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform
        """
        self.label = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
    def __len__(self):
        return len(self.label)
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir,
                                self.label.iloc[idx, 0])
        image = io.imread(img_name+'.jpg')
        sample = image
        label = self.label.iloc[idx, 1]
        if self.transform:
            sample = self.transform(sample)
        return sample, label

在 csv 文件中的图像名称没有扩展名,因此是 img_name+'.jpg'

tensor_dataset = DirectionsDataset(csv_file='data/labels_directions.csv',
                  root_dir='data/train3/',
                  transform=transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize(
                  (0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
dataloader = DataLoader(tensor_dataset, batch_size=16, shuffle=True)

所以我们准备开始训练模型。

def train_model(model, criterion, optimizer, scheduler, 
                dataloader, num_epochs=25):
    since = time.time()
    FT_losses = []
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    iters = 0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        scheduler.step()
        model.train()  # Set model to training mode
        running_loss = 0.0
        running_corrects = 0
        # Iterate over data.
        for i, (inputs, labels) in enumerate(dataloader):
            #set_trace()
            inputs = inputs.to(device)
            labels = labels.to(device)
            # zero the parameter gradients
            optimizer.zero_grad()
            # forward
            # track history if only in train
            model.eval()   # Set model to evaluate mode
            with torch.no_grad():
                outputs = model(inputs)
                #set_trace()
                _, preds = torch.max(outputs, 1)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            # backward + optimize only if in training phase
            loss.backward()
            optimizer.step()
            FT_losses.append(loss.item())
            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            #set_trace()
            iters += 1
            
            if iters % 2 == 0:
                 print('Prev Loss: {:.4f} Prev Acc: {:.4f}'.format(
                     loss.item(), torch.sum(preds == labels.data) / inputs.size(0)))
        epoch_loss = running_loss / dataset_size
        epoch_acc = running_corrects.double() / dataset_size
        print('Loss: {:.4f} Acc: {:.4f}'.format(
            epoch_loss, epoch_acc))
        # deep copy the model
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, FT_losses

在这个训练循环中,如果 epoch 准确度是目前为止最好的,我们可以跟踪最佳模型权重。 我们还可以跟踪每次迭代和每个时期的损失,并在最后返回以绘制并查看调试或演示的样子。

请记住,模型在每次迭代时都在接受训练,如果你停止训练循环,它将保留这些权重,只需再次运行 train_model() 命令即可再次继续训练。 要再次从头开始,请返回并使用预训练架构重新初始化权重。

criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(arch.parameters(), lr=1e-2, momentum=0.9)
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
arch, FT_losses = train_model(arch, criterion, optimizer_ft, exp_lr_scheduler, dataloader, num_epochs=5)

2、微调包围框模型

同样,我们将使用预训练的 resnet34 架构。 然而,这一次我们将不得不对其进行更实质性的编辑,以输出类别预测和边界框值。 此外,这是一个多类别预测问题,因此可能有 1 个边界框,也可能有 15 个——因此同时也有 1 个或 15 个类别。

我们将以类似于替换方向模型中的图层的方式为架构创建自定义头。

class StdConv(nn.Module):
    def __init__(self, nin, nout, stride=2, drop=0.1):
        super().__init__()
        self.conv = nn.Conv2d(nin, nout, 3, stride=stride, padding=1)
        self.bn = nn.BatchNorm2d(nout)
        self.drop = nn.Dropout(drop)
        
    def forward(self, x): 
        return self.drop(self.bn(F.relu(self.conv(x))))
        
def flatten_conv(x,k):
    bs,nf,gx,gy = x.size()
    x = x.permute(0,2,3,1).contiguous()
    return x.view(bs,-1,nf//k)
class OutConv(nn.Module):
    def __init__(self, k, nin, bias):
        super().__init__()
        self.k = k
        self.oconv1 = nn.Conv2d(nin, (len(id2cat)+1)*k, 3, padding=1)
        self.oconv2 = nn.Conv2d(nin, 4*k, 3, padding=1)
        self.oconv1.bias.data.zero_().add_(bias)
        
    def forward(self, x):
        return [flatten_conv(self.oconv1(x), self.k),
                flatten_conv(self.oconv2(x), self.k)]
drop=0.4
class SSD_MultiHead(nn.Module):
    def __init__(self, k, bias):
        super().__init__()
        self.drop = nn.Dropout(drop)
        self.sconv0 = StdConv(512,256, stride=1, drop=drop)
        self.sconv1 = StdConv(256,256, drop=drop)
        self.sconv2 = StdConv(256,256, drop=drop)
        self.sconv3 = StdConv(256,256, drop=drop)
        self.out0 = OutConv(k, 256, bias)
        self.out1 = OutConv(k, 256, bias)
        self.out2 = OutConv(k, 256, bias)
        self.out3 = OutConv(k, 256, bias)
    def forward(self, x):
        x = self.drop(F.relu(x))
        x = self.sconv0(x)
        x = self.sconv1(x)
        o1c,o1l = self.out1(x)
        x = self.sconv2(x)
        o2c,o2l = self.out2(x)
        x = self.sconv3(x)
        o3c,o3l = self.out3(x)
        return [torch.cat([o1c,o2c,o3c], dim=1),
                torch.cat([o1l,o2l,o3l], dim=1)]

现在我们需要将这个自定义头连接到 resnet34 架构,有一个方便的函数可以做到这一点:

class ConvnetBuilder():
    def __init__(self, f, c, is_multi, is_reg, ps=None,
                 xtra_fc=None, xtra_cut=0, 
                 custom_head=None,pretrained=True):
        self.f,self.c,self.is_multi,self.is_reg,self.xtra_cut = f,c,is_multi,is_reg,xtra_cut
        xtra_fc = [512]
        ps = [0.25]*len(xtra_fc) + [0.5]
        self.ps,self.xtra_fc = ps,xtra_fc
        cut,self.lr_cut = [8,6] # specific to resnet_34 arch
        cut-=xtra_cut
        layers = cut_model(f(pretrained), cut)
        self.nf = num_features(layers)*2
        self.top_model = nn.Sequential(*layers)
        n_fc = len(self.xtra_fc)+1
        self.ps = [self.ps]*n_fc
        fc_layers = [custom_head]
        self.n_fc = len(fc_layers)
        self.fc_model = nn.Sequential(*fc_layers).to(device)
        self.model = nn.Sequential(*(layers+fc_layers)).to(device)
def cut_model(m, cut):
    return list(m.children())[:cut] if cut else [m]
def num_features(m):
    c=children(m)
    if len(c)==0: return None
    for l in reversed(c):
        if hasattr(l, 'num_features'): return l.num_features
        res = num_features(l)
        if res is not None: return res
def children(m): return m if isinstance(m, (list, tuple)) else list(m.children())

使用这个 ConvnetBuilder 类,我们可以结合自定义头和 resnet34 架构。

k = len(anchor_scales)
head_reg4 = SSD_MultiHead(k, -4.)
f_model = models.resnet34
modelss = ConvnetBuilder(f_model, 0, 0, 0, custom_head=head_reg4)

k是9。

我们现在可以通过 modelsmodel 属性访问模型。

损失函数必须能够接受分类(类)和连续值(边界框)并输出单个损失值。

def ssd_loss(pred,targ,print_it=False):
    lcs,lls = 0.,0.
    for b_c,b_bb,bbox,clas in zip(*pred,*targ):
        loc_loss,clas_loss = ssd_1_loss(b_c,b_bb,bbox,clas,print_it)
        lls += loc_loss
        lcs += clas_loss
    if print_it: 
        print(f'loc: {lls.data.item()}, clas: {lcs.data.item()}')
    return lls+lcs
def ssd_1_loss(b_c,b_bb,bbox,clas,print_it=False):
    bbox,clas = get_y(bbox,clas)
    a_ic = actn_to_bb(b_bb, anchors)
    overlaps = jaccard(bbox.data, anchor_cnr.data)
    gt_overlap,gt_idx = map_to_ground_truth(overlaps,print_it)
    gt_clas = clas[gt_idx]
    pos = gt_overlap > 0.4
    pos_idx = torch.nonzero(pos)[:,0]
    gt_clas[1-pos] = len(id2cat)
    gt_bbox = bbox[gt_idx]
    loc_loss = ((a_ic[pos_idx] - gt_bbox[pos_idx]).abs()).mean()
    clas_loss  = loss_f(b_c, gt_clas)
    return loc_loss, clas_loss
def one_hot_embedding(labels, num_classes):
    return torch.eye(num_classes)[labels.data.long().cpu()]
class BCE_Loss(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.num_classes = num_classes
def forward(self, pred, targ):
        t = one_hot_embedding(targ, self.num_classes+1)
        t = V(t[:,:-1].contiguous()).cpu()
        x = pred[:,:-1]
        w = self.get_weight(x,t)
        return F.binary_cross_entropy_with_logits(x, t, w, size_average=False)/self.num_classes
    
    def get_weight(self,x,t): return None
loss_f = BCE_Loss(len(id2cat))
def get_y(bbox,clas):
    bbox = bbox.view(-1,4)/sz
    bb_keep = ((bbox[:,2]-bbox[:,0])>0).nonzero()[:,0]
    return bbox[bb_keep],clas[bb_keep]
def actn_to_bb(actn, anchors):
    actn_bbs = torch.tanh(actn)
    actn_centers = (actn_bbs[:,:2]/2 * grid_sizes) + anchors[:,:2]
    actn_hw = (actn_bbs[:,2:]/2+1) * anchors[:,2:]
    return hw2corners(actn_centers, actn_hw)
def intersect(box_a, box_b):
    max_xy = torch.min(box_a[:, None, 2:], box_b[None, :, 2:])
    min_xy = torch.max(box_a[:, None, :2], box_b[None, :, :2])
    inter = torch.clamp((max_xy - min_xy), min=0)
    return inter[:, :, 0] * inter[:, :, 1]
def box_sz(b): return ((b[:, 2]-b[:, 0]) * (b[:, 3]-b[:, 1]))
def jaccard(box_a, box_b):
    inter = intersect(box_a, box_b)
    union = box_sz(box_a).unsqueeze(1) + box_sz(box_b).unsqueeze(0) - inter
    return inter / union

一旦我们设置了数据集和数据加载器,就可以在 bbox 模型的批量输出上测试损失函数。

这里我们实际上需要一个自定义数据集类来处理这些数据类型。

class BboxDataset(Dataset):
    """Bbox dataset."""
    def __init__(self, csv_file, root_dir, transform=None):
        """
        Args:
            csv_file (string): Path to csv file with bounding boxes.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform.
        """
        self.label = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.sz = 224
    def __len__(self):
        return len(self.label)
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir,
                                self.label.iloc[idx, 0])
        image = io.imread(img_name)
        sample = image
        
        h, w = sample.shape[:2]; new_h, new_w = (224,224)
        bb = np.array([float(x) for x in self.label.iloc[idx, 1].split(' ')], dtype=np.float32)
        bb = np.reshape(bb, (int(bb.shape[0]/2),2))
        bb = bb * [new_h / h, new_w / w]
        bb = bb.flatten()
        bb = T(np.concatenate((np.zeros((189*4) - len(bb)), bb), axis=None)) # 189 is 21 * 9 where 9 = k
        if self.transform:
            sample = self.transform(sample)
        return sample, bb

这个自定义数据集类处理边界框,但我们想要一个处理类和边界框的数据集类。

bb_dataset = BboxDataset(csv_file='data/pascal/tmp/mbb.csv',
             root_dir='data/pascal/VOCdevkit2/VOC2007/JPEGImages/',
             transform=transforms.Compose([
                       transforms.ToPILImage(),
                       transforms.Resize((224,224)),
                       transforms.ToTensor(),
                       transforms.Normalize(
             (0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
bb_dataloader = DataLoader(bb_dataset, batch_size=16, shuffle=True)

在这里,我们可以连接两个数据集类,以便为每个图像返回类和边界框。

class ConcatLblDataset(Dataset):
    def __init__(self, ds, y2):
        self.ds,self.y2 = ds,y2
        self.sz = ds.sz
    def __len__(self): return len(self.ds)
    
    def __getitem__(self, i):
        self.y2[i] = np.concatenate((np.zeros(189 - len(self.y2[i])), self.y2[i]), axis=None)
        x,y = self.ds[i]
        return (x, (y,self.y2[i]))
trn_ds2 = ConcatLblDataset(bb_dataset, mcs)

其中 mcs 是一个 numpy 数组,其中包含每个训练图像的类。

PATH_pascal = Path('data/pascal')
trn_j = json.load((PATH_pascal / 'pascal_train2007.json').open())
cats = dict((o['id'], o['name']) for o in trn_j['categories'])
mc = [[cats[p[1]] for p in trn_anno[o]] for o in trn_ids]
id2cat = list(cats.values())
cat2id = {v:k for k,v in enumerate(id2cat)}
mcs = np.array([np.array([cat2id[p] for p in o]) for o in mc])

现在我们可以测试自定义损失。

sz=224
x,y = next(iter(bb_dataloader2))
batch = modelss.model(x)
ssd_loss(batch, y, True)
tensor([0.6254])
tensor([0.6821, 0.7257, 0.4922])
tensor([0.9563])
tensor([0.6522, 0.5276, 0.6226])
tensor([0.6811, 0.3338])
tensor([0.7008])
tensor([0.5316, 0.2926])
tensor([0.9422])
tensor([0.5487, 0.7187, 0.3620, 0.1578])
tensor([0.6546, 0.3753, 0.4231, 0.4663, 0.2125, 0.0729])
tensor([0.3756, 0.5085])
tensor([0.2304, 0.1390, 0.0853])
tensor([0.2484])
tensor([0.6419])
tensor([0.5954, 0.5375, 0.5552])
tensor([0.2383])
loc: 1.844399333000183, clas: 79.79206085205078

Out[1024]:

tensor(81.6365, grad_fn=<AddBackward0>)

现在训练ssd模型:

beta1 = 0.5
optimizer = optim.Adam(modelss.model.parameters(), lr=1e-3, betas=(beta1, 0.99))
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

我们可以使用与以前基本相同的 train_model() 函数,但这次我们将边界框和类的列表传递给损失函数 ssd_loss()

现在我们已经在新的训练数据集上训练了我们的两个模型,准备好使用它们对我们的卡车模拟器游戏进行推理。

玩得开心!


原文链接:Autonomous Truck Simulator with PyTorch — finetuning and single shot detectors

BimAnt翻译整理,转载请标明出处