目录
完整代码:
进阶:
📂 项目目录结构
1. models/cnn_model.py (模型定义)
2. utils/visualizer.py (绘图工具)
3. data_loader.py (数据准备)
4. train_engine.py (训练引擎)
5. main.py (主入口)
在kaggle 找到一个图像数据集,用 cnn 网络进行训练并且用 grad-cam 做可视化
以Dogs vs. Cats —— 经典的二分类问题为例
真实场景:图片的分辨率不一,背景复杂,更接近现实项目。
进阶必备:你会学到如何调整图片大小(Resizing)、数据增强(Data Augmentation)以防止过拟合。
迁移学习入门:这是练习使用预训练模型(如 VGG16, ResNet)进行迁移学习(Transfer Learning)的最佳战场。
适合练习:二分类交叉熵损失函数(Binary Crossentropy)、数据流加载(ImageDataGenerator)。
Kaggle 链接:Dogs vs. Cats
完整代码:
import os import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pyplot as plt # 设置中文字体支持 plt.rcParams["font.family"] = ["SimHei"] plt.rcParams['axes.unicode_minus'] = False # 4. 定义CNN模型 class CNN(nn.Module): def __init__(self): super(CNN, self).__init__() # 第一个卷积块: 128x128 -> 64x64 self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1) self.bn1 = nn.BatchNorm2d(32) self.relu1 = nn.ReLU() self.pool1 = nn.MaxPool2d(2, 2) # 第二个卷积块: 64x64 -> 32x32 self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) self.bn2 = nn.BatchNorm2d(64) self.relu2 = nn.ReLU() self.pool2 = nn.MaxPool2d(2, 2) # 第三个卷积块: 32x32 -> 16x16 self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1) self.bn3 = nn.BatchNorm2d(128) self.relu3 = nn.ReLU() self.pool3 = nn.MaxPool2d(2, 2) # 全连接层 # 注意:128x128 经过 3 次池化变为 16x16 self.fc1 = nn.Linear(128 * 16 * 16, 512) self.dropout = nn.Dropout(p=0.5) self.fc2 = nn.Linear(512, 2) # 猫狗双分类 def forward(self, x): x = self.pool1(self.relu1(self.bn1(self.conv1(x)))) x = self.pool2(self.relu2(self.bn2(self.conv2(x)))) x = self.pool3(self.relu3(self.bn3(self.conv3(x)))) x = x.view(x.size(0), -1) # 动态展平 x = torch.relu(self.fc1(x)) x = self.dropout(x) x = self.fc2(x) return x # 绘图函数保留在外面 def plot_iter_losses(losses, indices): plt.figure(figsize=(10, 4)) plt.plot(indices, losses, 'b-', alpha=0.7, label='Iteration Loss') plt.xlabel('Iteration(Batch序号)') plt.ylabel('损失值') plt.title('每个 Iteration 的训练损失') plt.legend(); plt.grid(True); plt.tight_layout(); plt.show() def plot_epoch_metrics(train_acc, test_acc, train_loss, test_loss): epochs = range(1, len(train_acc) + 1) plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(epochs, train_acc, 'b-', label='训练准确率') plt.plot(epochs, test_acc, 'r-', label='测试准确率') plt.title('准确率曲线'); plt.legend(); plt.grid(True) plt.subplot(1, 2, 2) plt.plot(epochs, train_loss, 'b-', label='训练损失') plt.plot(epochs, test_loss, 'r-', label='测试损失') plt.title('损失曲线'); plt.legend(); plt.grid(True) plt.tight_layout(); plt.show() # 5. 训练函数 def train(model, train_loader, val_loader, test_loader, criterion, optimizer, scheduler, device, epochs): """ 完整的训练逻辑 :param val_loader: 验证集加载器,用于训练过程中调整超参数 :param test_loader: 测试集加载器,用于最后评估模型泛化能力 """ # 记录数据用于绘图 all_iter_losses, iter_indices = [], [] train_acc_history, val_acc_history = [], [] train_loss_history, val_loss_history = [], [] print(f"开始训练,共 {epochs} 个 Epoch...") for epoch in range(epochs): # ==================== 1. 训练阶段 (Training) ==================== model.train() running_loss, correct, total = 0.0, 0, 0 for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() # 梯度清零 output = model(data) # 前向传播 loss = criterion(output, target) # 计算损失 loss.backward() # 反向传播 optimizer.step() # 更新参数 # 记录 iteration 级别的数据 iter_loss = loss.item() all_iter_losses.append(iter_loss) iter_indices.append(epoch * len(train_loader) + batch_idx + 1) running_loss += iter_loss _, predicted = output.max(1) total += target.size(0) correct += predicted.eq(target).sum().item() # 每 50 个 batch 打印一次进度 if (batch_idx + 1) % 50 == 0: print(f'Epoch: {epoch+1}/{epochs} [{batch_idx+1}/{len(train_loader)}] ' f'Loss: {iter_loss:.4f} | Acc: {100.*correct/total:.2f}%') epoch_train_loss = running_loss / len(train_loader) epoch_train_acc = 100. * correct / total train_acc_history.append(epoch_train_acc) train_loss_history.append(epoch_train_loss) # ==================== 2. 验证阶段 (Validation) ==================== # 每个 epoch 跑完都要去验证集“考试”,根据考试成绩调整学习率 model.eval() val_loss, correct_val, total_val = 0, 0, 0 with torch.no_grad(): # 验证阶段不计算梯度,节省内存和显存 for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) val_loss += criterion(output, target).item() _, predicted = output.max(1) total_val += target.size(0) correct_val += predicted.eq(target).sum().item() epoch_val_loss = val_loss / len(val_loader) epoch_val_acc = 100. * correct_val / total_val val_acc_history.append(epoch_val_acc) val_loss_history.append(epoch_val_loss) # 根据验证集的损失调整学习率 scheduler.step(epoch_val_loss) # 获取当前学习率(用于打印) current_lr = optimizer.param_groups[0]['lr'] print(f'--- Epoch {epoch+1} 结束 | Train Acc: {epoch_train_acc:.2f}% | Val Acc: {epoch_val_acc:.2f}% | LR: {current_lr} ---') # ==================== 3. 最终测试阶段 (Testing) ==================== # 所有的训练都结束后,用完全没见过的测试集做最后的评估 print("\n" + "="*30) print("训练完成!正在进行最终测试...") model.eval() test_correct, test_total = 0, 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) _, predicted = output.max(1) test_total += target.size(0) test_correct += predicted.eq(target).sum().item() final_test_acc = 100. * test_correct / test_total print(f'终极测试准确率: {final_test_acc:.2f}%') print("="*30) # 绘制图表 plot_iter_losses(all_iter_losses, iter_indices) plot_epoch_metrics(train_acc_history, val_acc_history, train_loss_history, val_loss_history) return final_test_acc # 6. 主执行入口 if __name__ == '__main__': device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 数据路径处理 current_dir = os.path.dirname(os.path.abspath(__file__)) train_path = os.path.join(current_dir, 'dataset', 'train') val_path = os.path.join(current_dir, 'dataset', 'validation') test_path = os.path.join(current_dir, 'dataset', 'test') # 数据预处理 target_size = (128, 128) train_transform = transforms.Compose([ transforms.Resize(target_size), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) test_transform = transforms.Compose([ transforms.Resize(target_size), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) # 加载器 train_dataset = datasets.ImageFolder(train_path, transform=train_transform) test_dataset = datasets.ImageFolder(test_path, transform=test_transform) train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2) test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2) # 加载验证集 val_dataset = datasets.ImageFolder(val_path, transform=test_transform) val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2) # 模型初始化 model = CNN().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5) # 启动训练 final_acc = train(model, train_loader, val_loader, test_loader, criterion, optimizer, scheduler, device, epochs=20) print(f"最终准确率: {final_acc:.2f}%")进阶:
将代码拆分为多个文件(模块化)是开发深度学习项目的标准操作。这样做不仅让代码清晰,还方便你以后更换模型(比如换成 ResNet)或更换数据集而不需要大规模改动代码。
按照以下结构拆分:
📂 项目目录结构
day49/ ├── dataset/ # 数据集文件夹 ├── models/ │ └── cnn_model.py # 存放模型结构 (CNN类) ├── utils/ │ └── visualizer.py # 存放绘图函数 (plot_... 函数) ├── data_loader.py # 存放数据预处理和 DataLoader 逻辑 ├── train_engine.py # 存放 train 核心函数 └── main.py # 执行入口1.models/cnn_model.py(模型定义)
将模型单独拎出来,方便以后在其他项目复用。
import torch import torch.nn as nn class CNN(nn.Module): def __init__(self): super(CNN, self).__init__() self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1) self.bn1 = nn.BatchNorm2d(32) self.relu1 = nn.ReLU() self.pool1 = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) self.bn2 = nn.BatchNorm2d(64) self.relu2 = nn.ReLU() self.pool2 = nn.MaxPool2d(2, 2) self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1) self.bn3 = nn.BatchNorm2d(128) self.relu3 = nn.ReLU() self.pool3 = nn.MaxPool2d(2, 2) self.fc1 = nn.Linear(128 * 16 * 16, 512) self.dropout = nn.Dropout(p=0.5) self.fc2 = nn.Linear(512, 2) def forward(self, x): x = self.pool1(self.relu1(self.bn1(self.conv1(x)))) x = self.pool2(self.relu2(self.bn2(self.conv2(x)))) x = self.pool3(self.relu3(self.bn3(self.conv3(x)))) x = x.view(x.size(0), -1) x = torch.relu(self.fc1(x)) x = self.dropout(x) x = self.fc2(x) return x2.utils/visualizer.py(绘图工具)
绘图逻辑通常比较占篇幅,且与训练逻辑无关。
import matplotlib.pyplot as plt # 设置中文字体支持 plt.rcParams["font.family"] = ["SimHei"] plt.rcParams['axes.unicode_minus'] = False def plot_iter_losses(losses, indices): plt.figure(figsize=(10, 4)) plt.plot(indices, losses, 'b-', alpha=0.7, label='Iteration Loss') plt.xlabel('Iteration') plt.ylabel('损失值') plt.title('训练损失') plt.legend(); plt.grid(True); plt.show() def plot_epoch_metrics(train_acc, val_acc, train_loss, val_loss): epochs = range(1, len(train_acc) + 1) plt.figure(figsize=(12, 4)) # ... 之前的绘图逻辑 ... plt.tight_layout(); plt.show() import cv2 import numpy as np import torch import torch.nn.functional as F import matplotlib.pyplot as plt def show_gradcam(model, img_tensor, original_img_path, target_layer): """ model: 训练好的模型 img_tensor: 经过 transform 处理后的图像张量 [1, 3, 128, 128] original_img_path: 原始图片的路径(用于叠加显示) target_layer: 想要可视化的卷积层(通常是最后一个卷积层) """ model.eval() # 1. 注册 Hook 获取梯度和特征图 gradients = [] activations = [] def backward_hook(module, grad_input, grad_output): gradients.append(grad_output[0]) def forward_hook(module, input, output): activations.append(output) # 绑定到目标层 handle_forward = target_layer.register_forward_hook(forward_hook) handle_backward = target_layer.register_full_backward_hook(backward_hook) # 2. 前向传播 output = model(img_tensor) category_index = output.argmax(dim=1).item() # 3. 反向传播获取梯度 model.zero_grad() loss = output[0, category_index] loss.backward() # 4. 计算 Grad-CAM grads = gradients[0].cpu().data.numpy()[0] # [C, H, W] f_maps = activations[0].cpu().data.numpy()[0] # [C, H, W] # 对通道维度取平均值作为权重 weights = np.mean(grads, axis=(1, 2)) cam = np.zeros(f_maps.shape[1:], dtype=np.float32) for i, w in enumerate(weights): cam += w * f_maps[i] # ReLU 激活并归一化 cam = np.maximum(cam, 0) cam = cv2.resize(cam, (128, 128)) cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam)) # 5. 叠加到原图 img = cv2.imdecode(np.fromfile(original_img_path, dtype=np.uint8), cv2.IMREAD_COLOR) img = cv2.resize(img, (128, 128)) heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET) result = heatmap * 0.4 + img * 0.6 # 0.4 是热力图透明度 # 移除 Hook handle_forward.remove() handle_backward.remove() # 展示结果 plt.figure(figsize=(8, 4)) plt.subplot(1, 2, 1) plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) plt.title("Original Image") plt.subplot(1, 2, 2) plt.imshow(cv2.cvtColor(np.uint8(result), cv2.COLOR_BGR2RGB)) plt.title(f"Grad-CAM (Class: {'Dog' if category_index==1 else 'Cat'})") plt.show()3.data_loader.py(数据准备)
这部分负责把原始图片变成模型能吃的DataLoader。
import os from torchvision import datasets, transforms from torch.utils.data import DataLoader def get_loaders(current_dir, batch_size=64): target_size = (128, 128) train_transform = transforms.Compose([ transforms.Resize(target_size), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) test_transform = transforms.Compose([ transforms.Resize(target_size), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) train_path = os.path.join(current_dir, 'dataset', 'train') val_path = os.path.join(current_dir, 'dataset', 'validation') test_path = os.path.join(current_dir, 'dataset', 'test') train_loader = DataLoader(datasets.ImageFolder(train_path, train_transform), batch_size=batch_size, shuffle=True, num_workers=2) val_loader = DataLoader(datasets.ImageFolder(val_path, test_transform), batch_size=batch_size, shuffle=False, num_workers=2) test_loader = DataLoader(datasets.ImageFolder(test_path, test_transform), batch_size=batch_size, shuffle=False, num_workers=2) return train_loader, val_loader, test_loader, test_transform4.train_engine.py(训练引擎)
把train函数放进来。注意要从其他模块导入绘图工具。
import torch from utils.visualizer import plot_iter_losses, plot_epoch_metrics def train(model, train_loader, val_loader, test_loader, criterion, optimizer, scheduler, device, epochs): """ 完整的训练逻辑 :param val_loader: 验证集加载器,用于训练过程中调整超参数 :param test_loader: 测试集加载器,用于最后评估模型泛化能力 """ # 记录数据用于绘图 all_iter_losses, iter_indices = [], [] train_acc_history, val_acc_history = [], [] train_loss_history, val_loss_history = [], [] print(f"开始训练,共 {epochs} 个 Epoch...") for epoch in range(epochs): # ==================== 1. 训练阶段 (Training) ==================== model.train() running_loss, correct, total = 0.0, 0, 0 for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() # 梯度清零 output = model(data) # 前向传播 loss = criterion(output, target) # 计算损失 loss.backward() # 反向传播 optimizer.step() # 更新参数 # 记录 iteration 级别的数据 iter_loss = loss.item() all_iter_losses.append(iter_loss) iter_indices.append(epoch * len(train_loader) + batch_idx + 1) running_loss += iter_loss _, predicted = output.max(1) total += target.size(0) correct += predicted.eq(target).sum().item() # 每 50 个 batch 打印一次进度 if (batch_idx + 1) % 50 == 0: print(f'Epoch: {epoch+1}/{epochs} [{batch_idx+1}/{len(train_loader)}] ' f'Loss: {iter_loss:.4f} | Acc: {100.*correct/total:.2f}%') epoch_train_loss = running_loss / len(train_loader) epoch_train_acc = 100. * correct / total train_acc_history.append(epoch_train_acc) train_loss_history.append(epoch_train_loss) # ==================== 2. 验证阶段 (Validation) ==================== # 每个 epoch 跑完都要去验证集“考试”,根据考试成绩调整学习率 model.eval() val_loss, correct_val, total_val = 0, 0, 0 with torch.no_grad(): # 验证阶段不计算梯度,节省内存和显存 for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) val_loss += criterion(output, target).item() _, predicted = output.max(1) total_val += target.size(0) correct_val += predicted.eq(target).sum().item() epoch_val_loss = val_loss / len(val_loader) epoch_val_acc = 100. * correct_val / total_val val_acc_history.append(epoch_val_acc) val_loss_history.append(epoch_val_loss) # 根据验证集的损失调整学习率 scheduler.step(epoch_val_loss) # 获取当前学习率(用于打印) current_lr = optimizer.param_groups[0]['lr'] print(f'--- Epoch {epoch+1} 结束 | Train Acc: {epoch_train_acc:.2f}% | Val Acc: {epoch_val_acc:.2f}% | LR: {current_lr} ---') # ==================== 3. 最终测试阶段 (Testing) ==================== # 所有的训练都结束后,用完全没见过的测试集做最后的评估 print("\n" + "="*30) print("训练完成!正在进行最终测试...") model.eval() test_correct, test_total = 0, 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) _, predicted = output.max(1) test_total += target.size(0) test_correct += predicted.eq(target).sum().item() final_test_acc = 100. * test_correct / test_total print(f'终极测试准确率: {final_test_acc:.2f}%') print("="*30) # 绘制图表 plot_iter_losses(all_iter_losses, iter_indices) plot_epoch_metrics(train_acc_history, val_acc_history, train_loss_history, val_loss_history) return final_test_acc5.main.py(主入口)
主文件现在变得非常干净,只负责调度。
import os import torch import torch.nn as nn import torch.optim as optim # 导入你拆分的模块 from models.cnn_model import CNN from data_loader import get_loaders from train_engine import train if __name__ == '__main__': # 1. 配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") current_dir = os.path.dirname(os.path.abspath(__file__)) # 2. 获取数据 train_loader, val_loader, test_loader,test_transform = get_loaders(current_dir) # 3. 初始化模型 model = CNN().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5) # 4. 运行 final_acc = train(model, train_loader, val_loader, test_loader, criterion, optimizer, scheduler, device, epochs=2) print(f"最终准确率: {final_acc:.2f}%") # --- Grad-CAM 可视化部分 --- from utils.visualizer import show_gradcam # 1. 挑一张测试图片(或者你本地找一张猫/狗的图) # 手动拼接完整路径 target_img_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dataset', 'test', 'dogs', 'dog (1001).jpg') # 2. 对这张图做相同的预处理 from PIL import Image raw_img = Image.open(target_img_path).convert('RGB') input_tensor = test_transform(raw_img).unsqueeze(0).to(device) # 增加 batch 维度并移至 GPU # 3. 指定可视化最后一层卷积层 target_layer = model.conv3 # 4. 绘图 print("生成 Grad-CAM 可视化中...") show_gradcam(model, input_tensor, target_img_path, target_layer)