检测cuda可用性

如可用则将默认张量位置设置为cuda,此处为pytorch 2.1.0 推荐写法

1
2
3
4
5
6
if torch.cuda.is_available():
torch.set_default_dtype(torch.float)
torch.set_default_device("cuda")
print("using cuda:",torch.cuda.get_device_name())

device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

保存和读取模型参数

注意在读取时模型对象应由原模型同一类派生

1
2
3
torch.save(C1.state_dict(),"models/mnist97_state_dict.pth")
C2=mnist_classifier.Classifier()
C2.load_state_dict(torch.load("models/mnist97_state_dict.pth",map_location=device))

mnist_classifier 模型和数据处理类示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import torch
import torch.nn as nn
from torch.utils.data import Dataset

import pandas
import matplotlib.pyplot as plt

class Classifier(nn.Module):

def __init__(self):
super().__init__()

self.model=nn.Sequential(
nn.Linear(784,200), # 全连接层连接
nn.LeakyReLU(0.02), # 带泄漏线性整流函数
nn.LayerNorm(200), # 标准化
nn.Linear(200,10),
nn.Sigmoid() # Sigmoid激活函数
)

self.loss_function=nn.BCELoss() # 二元交叉熵损失函数

self.optimiser=torch.optim.Adam(self.parameters()) # Adam优化器

self.counter=0
self.progress=[]

def forward(self,inputs):
return self.model(inputs)

def train(self,inputs,targets):
outputs=self.forward(inputs)
loss=self.loss_function(outputs,targets)

self.counter+=1

if(self.counter%10==0):
self.progress.append(loss.item())

if(self.counter%10000==0):
print("counter = ",self.counter)

self.optimiser.zero_grad()
loss.backward()
self.optimiser.step()

def plot_progress(self):
df=pandas.DataFrame(self.progress,columns=['loss'])
df.plot(ylim=(0,1.0),figsize=(16,8),alpha=0.1,marker='.',grid=True,yticks=(0,0.25,0.5))



class MnistDataset(Dataset):
def __init__(self,csv_file):
self.data_df=pandas.read_csv(csv_file,header=None)

def __len__(self):
return len(self.data_df)

def __getitem__(self,index):
label = self.data_df.iloc[index,0]
target=torch.zeros((10))
target[label]=1.0

image_values=torch.cuda.FloatTensor(self.data_df.iloc[index,1:].values)/255.0

return label,image_values,target

def plot_image(self,index):
arr=self.data_df.iloc[index,1:].values.reshape(28,28)
plt.title('label = '+str(self.data_df.iloc[index,0]))
plt.imshow(arr,interpolation='None',cmap='Blues')

mnist_classifier 训练代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 读取训练数据集并展示
mnist_dataset=MnistDataset('mnist_data/mnist_train.csv')
mnist_dataset.plot_image(9)

# 创建分类器实例并测试参数传递
C=Classifier()
C.to(device)
C.forward(torch.rand(784))

# 训练过程
epochs=3

for i in range(epochs):
print('train epoch',i+1,'of',epochs)
for label,image_data_tensor,target_tensor in mnist_dataset:
C.train(image_data_tensor,target_tensor)

C.plot_progress() # 展示训练过程

# 加载验证数据集
mnist_test_dataset = MnistDataset('mnist_data/mnist_test.csv')

# 展示图像
record = 19
mnist_test_dataset.plot_image(record)

# 测试分类器
image_data = mnist_test_dataset[record][1]

print(image_data.shape)

output = C.forward(image_data)

pandas.DataFrame(output.cpu().detach().numpy()).plot(kind='bar', legend=False, ylim=(0,1))

# 测试识别精度
score = 0
items = 0

for label, image_data_tensor, target_tensor in mnist_test_dataset:
answer = C.forward(image_data_tensor).cpu().detach().numpy()
if (answer.argmax() == label):
score += 1
pass
items += 1

pass

print(score, items, score/items)

# 保存模型参数
torch.save(C.state_dict(),"models/mnist97_state_dict.pth")

加载并使用mnist_classifier代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import torch
from torch.utils.data import Dataset
import pandas
import matplotlib.pyplot as plt
import numpy as np

import mnist_classifier # 导入之前mnist_classifier的定义文件

if torch.cuda.is_available():
torch.set_default_dtype(torch.float)
torch.set_default_device("cuda")
print("using cuda:",torch.cuda.get_device_name())

device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

C=mnist_classifier.Classifier() # 创建分类器实例

# 加载之前保存的模型参数,加载到device上
C.load_state_dict(torch.load("models/mnist97_state_dict.pth",map_location=device))
print(C)
print(next(C.parameters()).device)

# 导入验证数据集并进行推理
mnist_test_dataset=mnist_classifier.MnistDataset("mnist_data/mnist_test.csv")

def classify_image(idx):
mnist_test_dataset.plot_image(idx)
test_image=mnist_test_dataset[idx][1]
output=C.forward(test_image)
pandas.DataFrame(output.cpu().detach().numpy()).plot(kind='bar',legend=False,ylim=(0,1))
print("模型认为此数字为 ",end='')
print(int(np.argmax(output.cpu().detach().numpy())))

classify_image(78)

mnist_cnn 参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import torch
import torch.nn as nn
from torch.utils.data import Dataset

import pandas
import matplotlib.pyplot as plt

# 展开成一维张量
class View(nn.Module):
def __init__(self, shape):
super().__init__()
self.shape = shape,

def forward(self, x):
return x.view(*self.shape)

class MnistDataset(Dataset):

def __init__(self, csv_file):
self.data_df = pandas.read_csv(csv_file, header=None)

def __len__(self):
return len(self.data_df)

def __getitem__(self, index):
# 生成标签对应的独热标签
label = self.data_df.iloc[index,0]
target = torch.zeros((10))
target[label] = 1.0

# 读取图像数据并作正规化
image_values = torch.tensor(self.data_df.iloc[index,1:].values,dtype=torch.float) / 255.0

# 返回 类别 图像数据 独热标签
return label, image_values, target

def plot_image(self, index):
img = self.data_df.iloc[index,1:].values.reshape(28,28)
plt.title("label = " + str(self.data_df.iloc[index,0]))
plt.imshow(img, interpolation='none', cmap='Blues')

class Classifier(nn.Module):

def __init__(self):

super().__init__()

# 定义神经网络层结构
self.model = nn.Sequential(
# 1 --> 10 通道
nn.Conv2d(1, 10, kernel_size=5, stride=2), # out: 12*12
nn.LeakyReLU(0.02),
nn.BatchNorm2d(10),

nn.Conv2d(10, 10, kernel_size=3, stride=2), # out: 5*5
nn.LeakyReLU(0.02),
nn.BatchNorm2d(10),

View(250),
nn.Linear(250, 10),
nn.Sigmoid()
)

# 指定损失函数
self.loss_function = nn.BCELoss()

# 指定随机梯度下降优化器
self.optimiser = torch.optim.Adam(self.parameters())

# 训练计数器和损失函数值记录
self.counter = 0
self.progress = []


def forward(self, inputs):
return self.model(inputs)


def train(self, inputs, targets):

outputs = self.forward(inputs)


loss = self.loss_function(outputs, targets)

self.counter += 1
if (self.counter % 10 == 0):
self.progress.append(loss.item())
pass
if (self.counter % 10000 == 0):
print("counter = ", self.counter)
pass

self.optimiser.zero_grad()
loss.backward()
self.optimiser.step()

def plot_progress(self):
df = pandas.DataFrame(self.progress, columns=['loss'])
df.plot(ylim=(0, 1.0), figsize=(16,8), alpha=0.1, marker='.', grid=True, yticks=(0, 0.25, 0.5))