data_partitioning.py
import os
from shutil import copy
import random
def mkfile(file):
if not os.path.exists(file):
os.makedirs(file)
# 获取data文件夹下所有文件夹名(即需要分类的类名)
file_path = 'dataset'
flower_class = [cla for cla in os.listdir(file_path)]
# 创建 训练集train 文件夹,并由类名在其目录下创建5个子目录
mkfile('data/train')
for cla in flower_class:
mkfile('data/train/' + cla)
# 创建 验证集val 文件夹,并由类名在其目录下创建子目录
mkfile('data/test')
for cla in flower_class:
mkfile('data/test/' + cla)
# 划分比例,训练集 : 测试集 = 9 : 1
split_rate = 0.1
# 遍历所有类别的全部图像并按比例分成训练集和验证集
for cla in flower_class:
cla_path = file_path + '/' + cla + '/' # 某一类别的子目录
images = os.listdir(cla_path) # iamges 列表存储了该目录下所有图像的名称
num = len(images)
eval_index = random.sample(images, k=int(num * split_rate)) # 从images列表中随机抽取 k 个图像名称
for index, image in enumerate(images):
# eval_index 中保存验证集val的图像名称
if image in eval_index:
image_path = cla_path + image
new_path = 'data/test/' + cla
copy(image_path, new_path) # 将选中的图像复制到新路径
# 其余的图像保存在训练集train中
else:
image_path = cla_path + image
new_path = 'data/train/' + cla
copy(image_path, new_path)
print("\r[{}] processing [{}/{}]".format(cla, index + 1, num), end="") # processing bar
print()
print("processing done!")
mean_std.py
from PIL import Image
import os
import numpy as np
# 文件夹路径,包含所有图片文件
folder_path = 'dataset'
# 初始化累积变量
total_pixels = 0
sum_normalized_pixel_values = np.zeros(3) # 如果是RGB图像,需要三个通道的均值和方差
# 遍历文件夹中的图片文件
for root, dirs, files in os.walk(folder_path):
for filename in files:
if filename.endswith(('.jpg', '.jpeg', '.png', '.bmp')): # 可根据实际情况添加其他格式
image_path = os.path.join(root, filename)
image = Image.open(image_path)
image_array = np.array(image)
# 归一化像素值到0-1之间
normalized_image_array = image_array / 255.0
# print(image_path)
# print(normalized_image_array.shape)
# 累积归一化后的像素值和像素数量
total_pixels += normalized_image_array.size
sum_normalized_pixel_values += np.sum(normalized_image_array, axis=(0, 1))
# 计算均值和方差
mean = sum_normalized_pixel_values / total_pixels
sum_squared_diff = np.zeros(3)
for root, dirs, files in os.walk(folder_path):
for filename in files:
if filename.endswith(('.jpg', '.jpeg', '.png', '.bmp')):
image_path = os.path.join(root, filename)
image = Image.open(image_path)
image_array = np.array(image)
# 归一化像素值到0-1之间
normalized_image_array = image_array / 255.0
# print(normalized_image_array.shape)
# print(mean.shape)
# print(image_path)
try:
diff = (normalized_image_array - mean) ** 2
sum_squared_diff += np.sum(diff, axis=(0, 1))
except:
print(f"捕获到自定义异常")
# diff = (normalized_image_array - mean) ** 2
# sum_squared_diff += np.sum(diff, axis=(0, 1))
variance = sum_squared_diff / total_pixels
print("Mean:", mean)
print("Variance:", variance)
model.py
import torch
from torch import nn
from torchsummary import summary
class Residual(nn.Module):
def __init__(self, input_channels, num_channels, use_1conv=False, strides=1):
super(Residual, self).__init__()
self.ReLU = nn.ReLU()
self.conv1 = nn.Conv2d(in_channels=input_channels, out_channels=num_channels, kernel_size=3, padding=1, stride=strides)
self.conv2 = nn.Conv2d(in_channels=num_channels, out_channels=num_channels, kernel_size=3, padding=1)
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
if use_1conv:
self.conv3 = nn.Conv2d(in_channels=input_channels, out_channels=num_channels, kernel_size=1, stride=strides)
else:
self.conv3 = None
def forward(self, x):
y = self.ReLU(self.bn1(self.conv1(x)))
y = self.bn2(self.conv2(y))
if self.conv3:
x = self.conv3(x)
y = self.ReLU(y+x)
return y
class ResNet18(nn.Module):
def __init__(self, Residual):
super(ResNet18, self).__init__()
self.b1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=3),
nn.ReLU(),
nn.BatchNorm2d(64),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
self.b2 = nn.Sequential(Residual(64, 64, use_1conv=False, strides=1),
Residual(64, 64, use_1conv=False, strides=1))
self.b3 = nn.Sequential(Residual(64, 128, use_1conv=True, strides=2),
Residual(128, 128, use_1conv=False, strides=1))
self.b4 = nn.Sequential(Residual(128, 256, use_1conv=True, strides=2),
Residual(256, 256, use_1conv=False, strides=1))
self.b5 = nn.Sequential(Residual(256, 512, use_1conv=True, strides=2),
Residual(512, 512, use_1conv=False, strides=1))
self.b6 = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(512, 2))
def forward(self, x):
x = self.b1(x)
x = self.b2(x)
x = self.b3(x)
x = self.b4(x)
x = self.b5(x)
x = self.b6(x)
return x
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet18(Residual).to(device)
print(summary(model, (1, 224, 224)))
model_test.py
import torch
import torch.utils.data as Data
from torchvision import transforms
from torchvision.datasets import FashionMNIST
from model import ResNet18, Residual
from torchvision.datasets import ImageFolder
from PIL import Image
def test_data_process():
# 定义数据集的路径
ROOT_TRAIN = r'data\test'
normalize = transforms.Normalize([0.17263485, 0.15147247, 0.14267451], [0.0736155, 0.06216329, 0.05930814])
# 定义数据集处理方法变量
test_transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(), normalize])
# 加载数据集
test_data = ImageFolder(ROOT_TRAIN, transform=test_transform)
test_dataloader = Data.DataLoader(dataset=test_data,
batch_size=1,
shuffle=True,
num_workers=0)
return test_dataloader
def test_model_process(model, test_dataloader):
# 设定测试所用到的设备,有GPU用GPU没有GPU用CPU
device = "cuda" if torch.cuda.is_available() else 'cpu'
# 讲模型放入到训练设备中
model = model.to(device)
# 初始化参数
test_corrects = 0.0
test_num = 0
# 只进行前向传播计算,不计算梯度,从而节省内存,加快运行速度
with torch.no_grad():
for test_data_x, test_data_y in test_dataloader:
# 将特征放入到测试设备中
test_data_x = test_data_x.to(device)
# 将标签放入到测试设备中
test_data_y = test_data_y.to(device)
# 设置模型为评估模式
model.eval()
# 前向传播过程,输入为测试数据集,输出为对每个样本的预测值
output= model(test_data_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 如果预测正确,则准确度test_corrects加1
test_corrects += torch.sum(pre_lab == test_data_y.data)
# 将所有的测试样本进行累加
test_num += test_data_x.size(0)
# 计算测试准确率
test_acc = test_corrects.double().item() / test_num
print("测试的准确率为:", test_acc)
if __name__ == "__main__":
# 加载模型
model = ResNet18(Residual)
model.load_state_dict(torch.load('best_model.pth'))
# # 利用现有的模型进行模型的测试
test_dataloader = test_data_process()
test_model_process(model, test_dataloader)
# 设定测试所用到的设备,有GPU用GPU没有GPU用CPU
device = "cuda" if torch.cuda.is_available() else 'cpu'
model = model.to(device)
classes = ['戴口罩', '不带口罩']
with torch.no_grad():
for b_x, b_y in test_dataloader:
b_x = b_x.to(device)
b_y = b_y.to(device)
# 设置模型为验证模型
model.eval()
output = model(b_x)
pre_lab = torch.argmax(output, dim=1)
result = pre_lab.item()
label = b_y.item()
print("预测值:", classes[result], "------", "真实值:", classes[label])
image = Image.open('no_mask.jfif')
normalize = transforms.Normalize([0.17263485, 0.15147247, 0.14267451], [0.0736155, 0.06216329, 0.05930814])
# 定义数据集处理方法变量
test_transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(), normalize])
image = test_transform(image)
# 添加批次维度
image = image.unsqueeze(0)
with torch.no_grad():
model.eval()
image = image.to(device)
output = model(image)
pre_lab = torch.argmax(output, dim=1)
result = pre_lab.item()
print("预测值:", classes[result])
model_train.py
import copy
import time
import torch
from torchvision.datasets import ImageFolder
from torchvision import transforms
import torch.utils.data as Data
import numpy as np
import matplotlib.pyplot as plt
from model import ResNet18, Residual
import torch.nn as nn
import pandas as pd
def train_val_data_process():
# 定义数据集的路径
ROOT_TRAIN = r'data\train'
normalize = transforms.Normalize([0.17263485, 0.15147247, 0.14267451], [0.0736155, 0.06216329, 0.05930814])
# 定义数据集处理方法变量
train_transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(), normalize])
# 加载数据集
train_data = ImageFolder(ROOT_TRAIN, transform=train_transform)
train_data, val_data = Data.random_split(train_data, [round(0.8*len(train_data)), round(0.2*len(train_data))])
train_dataloader = Data.DataLoader(dataset=train_data,
batch_size=32,
shuffle=True,
num_workers=2)
val_dataloader = Data.DataLoader(dataset=val_data,
batch_size=32,
shuffle=True,
num_workers=2)
return train_dataloader, val_dataloader
def train_model_process(model, train_dataloader, val_dataloader, num_epochs):
# 设定训练所用到的设备,有GPU用GPU没有GPU用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 使用Adam优化器,学习率为0.001
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 损失函数为交叉熵函数
criterion = nn.CrossEntropyLoss()
# 将模型放入到训练设备中
model = model.to(device)
# 复制当前模型的参数
best_model_wts = copy.deepcopy(model.state_dict())
# 初始化参数
# 最高准确度
best_acc = 0.0
# 训练集损失列表
train_loss_all = []
# 验证集损失列表
val_loss_all = []
# 训练集准确度列表
train_acc_all = []
# 验证集准确度列表
val_acc_all = []
# 当前时间
since = time.time()
for epoch in range(num_epochs):
print("Epoch {}/{}".format(epoch, num_epochs-1))
print("-"*10)
# 初始化参数
# 训练集损失函数
train_loss = 0.0
# 训练集准确度
train_corrects = 0
# 验证集损失函数
val_loss = 0.0
# 验证集准确度
val_corrects = 0
# 训练集样本数量
train_num = 0
# 验证集样本数量
val_num = 0
# 对每一个mini-batch训练和计算
for step, (b_x, b_y) in enumerate(train_dataloader):
# 将特征放入到训练设备中
b_x = b_x.to(device)
# 将标签放入到训练设备中
b_y = b_y.to(device)
# 设置模型为训练模式
model.train()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 将梯度初始化为0
optimizer.zero_grad()
# 反向传播计算
loss.backward()
# 根据网络反向传播的梯度信息来更新网络的参数,以起到降低loss函数计算值的作用
optimizer.step()
# 对损失函数进行累加
train_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1
train_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于训练的样本数量
train_num += b_x.size(0)
for step, (b_x, b_y) in enumerate(val_dataloader):
# 将特征放入到验证设备中
b_x = b_x.to(device)
# 将标签放入到验证设备中
b_y = b_y.to(device)
# 设置模型为评估模式
model.eval()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 对损失函数进行累加
val_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1
val_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于验证的样本数量
val_num += b_x.size(0)
# 计算并保存每一次迭代的loss值和准确率
# 计算并保存训练集的loss值
train_loss_all.append(train_loss / train_num)
# 计算并保存训练集的准确率
train_acc_all.append(train_corrects.double().item() / train_num)
# 计算并保存验证集的loss值
val_loss_all.append(val_loss / val_num)
# 计算并保存验证集的准确率
val_acc_all.append(val_corrects.double().item() / val_num)
print("{} train loss:{:.4f} train acc: {:.4f}".format(epoch, train_loss_all[-1], train_acc_all[-1]))
print("{} val loss:{:.4f} val acc: {:.4f}".format(epoch, val_loss_all[-1], val_acc_all[-1]))
if val_acc_all[-1] > best_acc:
# 保存当前最高准确度
best_acc = val_acc_all[-1]
# 保存当前最高准确度的模型参数
best_model_wts = copy.deepcopy(model.state_dict())
# 计算训练和验证的耗时
time_use = time.time() - since
print("训练和验证耗费的时间{:.0f}m{:.0f}s".format(time_use//60, time_use%60))
# 选择最优参数,保存最优参数的模型
model.load_state_dict(best_model_wts)
# torch.save(model.load_state_dict(best_model_wts), "C:/Users/86159/Desktop/LeNet/best_model.pth")
torch.save(best_model_wts, "C:/Users/86159/Desktop/ResNet18-1/best_model.pth")
train_process = pd.DataFrame(data={"epoch":range(num_epochs),
"train_loss_all":train_loss_all,
"val_loss_all":val_loss_all,
"train_acc_all":train_acc_all,
"val_acc_all":val_acc_all,})
return train_process
def matplot_acc_loss(train_process):
# 显示每一次迭代后的训练集和验证集的损失函数和准确率
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss")
plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss")
plt.legend()
plt.xlabel("epoch")
plt.ylabel("Loss")
plt.subplot(1, 2, 2)
plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc")
plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc")
plt.xlabel("epoch")
plt.ylabel("acc")
plt.legend()
plt.show()
if __name__ == '__main__':
# 加载需要的模型
ResNet18 = ResNet18(Residual)
# 加载数据集
train_data, val_data = train_val_data_process()
# 利用现有的模型进行模型的训练
train_process = train_model_process(ResNet18, train_data, val_data, num_epochs=50)
matplot_acc_loss(train_process)
Comments NOTHING