Tensorflow CIFAR10 图像分类

1. CIFAR-10数据集

1.1. 下载

进入 http://www.cs.toronto.edu/~kriz/cifar.html ,下载 CIFAR-10 python 版本

解压后,得到以下文件。data_batch_xxx是图片数据,都是二进制文件。

1
2
3
4
5
6
7
8
batches.meta
data_batch_1
data_batch_2
data_batch_3
data_batch_4
data_batch_5
readme.html
test_batch

1.2. 查看数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pickle

with open("./data/cifar/data_batch_1", "rb") as f:
data = pickle.load(f, encoding="bytes")
print(type(data)) # <class 'dict'>
print(data.keys()) # dict_keys([b'batch_label', b'labels', b'data', b'filenames'])
print(type(data[b'batch_label'])) # <class 'bytes'>
print(type(data[b'labels'])) # <class 'list'>
print(type(data[b'data'])) # <class 'numpy.ndarray'>
print(type(data[b'filenames'])) # <class 'list'>

print(data[b'data'].shape) # (10000, 3072) 3072 = 32x32x3(长宽32x32,RGB三通道)
print(data[b'data'].dtype) # uint8
print(data[b'data'][0]) # 值范围[0, 255]

print(data[b'labels']) # 图像类别标签,范围0~9,总10类

print(data[b'filenames']) # 10000张图片对应的文件名

1.3. 显示图片

1
2
3
4
5
6
7
8
9
10
11
12
import matplotlib.pyplot as plt
import pickle

with open("./data/cifar/data_batch_1", "rb") as f:
data = pickle.load(f, encoding="bytes")

images = data[b'data'][100] # 取其中1张图
images = images.reshape((3, 32, 32))
images = images.transpose((1, 2, 0)) # 轴(0,1,2) ==> (1,2,0),形状3x32x32转为32x32x3

plt.imshow(images) # 加载图片
plt.show() # 显示

2. 二分类logistic回归模型

1个隐藏层,且只有1个神经元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import pickle
import numpy as np
import tensorflow as tf
import os

def load_data(filename):
"""
读取CIFAR-10数据,返回特征和标签
"""
with open(filename, 'rb') as f:
data = pickle.load(f, encoding='bytes')
return data[b'data'], data[b'labels']


class CifarData:
def __init__(self, filenames, need_shuffle):
"""
读取数据
:param filenames: 数据文件,可以有多个
:param need_shuffle: 是否需要将数据打乱
"""
data_list = []
label_list = []
for filename in filenames:
data, labels = load_data(filename)
for item, label in zip(data, labels):
# 单个神经元logistic只能处理二分类问题,所以这里只取label=0和label=1的样本
if label in [0, 1]:
data_list.append(item)
label_list.append(label)
# 将list转为numpy
self._data = np.vstack(data_list)
# 对数据进行归一化, 尺度固定在 [-1, 1] 之间
self._data = self._data / 127.5 - 1
# 将list转为numpy
self._labels = np.hstack(label_list)
print(self._data.shape)
print(self._labels.shape)
# 记录当前的样本 数量
self._num_examples = self._data.shape[0]
# 记录是否需要将样本打乱
self._need_shuffle = need_shuffle
# 样本的起始点
self._indicator = 0
if self._need_shuffle:
self._shuffle_data()

def _shuffle_data(self):
# 生成随机全排列 np.random.permutation(6) => [3, 0, 4, 1, 5, 2]
p = np.random.permutation(self._num_examples)
# 打乱数据
self._data = self._data[p]
self._labels = self._labels[p]

def next_batch(self, batch_size):
"""return batch_size examples as a batch."""
end_indicator = self._indicator + batch_size
# 如果结束点大于样本数量
if end_indicator > self._num_examples:
if self._need_shuffle:
# 重新打乱数据
self._shuffle_data()
# 从头开始
self._indicator = 0
end_indicator = batch_size
else:
raise Exception("have no more examples")
# 再次查看是否 超出边界了
if end_indicator > self._num_examples:
raise Exception("batch size is larger than all examples")
# 把 batch 区间 的data和label保存, 并最后return
batch_data = self._data[self._indicator: end_indicator]
batch_labels = self._labels[self._indicator: end_indicator]
self._indicator = end_indicator
return batch_data, batch_labels

def logistic():
x = tf.placeholder(tf.float32, [None, 3072])
# [None]
y = tf.placeholder(tf.int64, [None])

# (3072, 1)
w = tf.get_variable('w', [x.get_shape()[-1], 1],
initializer=tf.random_normal_initializer(0, 1))
# (1, )
b = tf.get_variable('b', [1],
initializer=tf.constant_initializer(0.0))

# [None, 3072] * [3072, 1] = [None, 1]
y_ = tf.matmul(x, w) + b

# [None, 1]
p_y_1 = tf.nn.sigmoid(y_)
# [None, 1]
y_reshaped = tf.reshape(y, (-1, 1))
y_reshaped_float = tf.cast(y_reshaped, tf.float32)
loss = tf.reduce_mean(tf.square(y_reshaped_float - p_y_1))

# bool
predict = p_y_1 > 0.5
# [1,0,1,1,1,0,0,0]
correct_prediction = tf.equal(tf.cast(predict, tf.int64), y_reshaped)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float64))

with tf.name_scope('train_op'):
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss)

init = tf.global_variables_initializer()
batch_size = 20
train_steps = 30000
test_steps = 100

with tf.Session() as sess:
sess.run(init)
for i in range(train_steps):
batch_data, batch_labels = train_data.next_batch(batch_size)
loss_val, acc_val, _ = sess.run(
[loss, accuracy, train_op],
feed_dict={
x: batch_data,
y: batch_labels})
if (i + 1) % 500 == 0:
print('[Train] Step: %d, loss: %4.5f, acc: %4.5f' % (i + 1, loss_val, acc_val))
if (i + 1) % 5000 == 0:
test_data = CifarData(test_filenames, False)
all_test_acc_val = []
for j in range(test_steps):
test_batch_data, test_batch_labels \
= test_data.next_batch(batch_size)
test_acc_val = sess.run(
[accuracy],
feed_dict={
x: test_batch_data,
y: test_batch_labels
})
all_test_acc_val.append(test_acc_val)
test_acc = np.mean(all_test_acc_val)
print('[Test ] Step: %d, acc: %4.5f' % (i + 1, test_acc))


if __name__ == '__main__':
# 获取所有的训练文件名和测试文件名
CIFAR_DIR = "./data/cifar"
train_filenames = [os.path.join(CIFAR_DIR, 'data_batch_%d' % i) for i in range(1, 6)]
test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]
# 训练数据和测试数据的CifarData对象
train_data = CifarData(train_filenames, True)
test_data = CifarData(test_filenames, False)

logistic()

3. 多分类logistic回归模型

一个隐藏层,多个神经元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import pickle
import numpy as np
import tensorflow as tf
import os


def load_data(filename):
"""
读取CIFAR-10数据,返回特征和标签
"""
with open(filename, 'rb') as f:
data = pickle.load(f, encoding='bytes')
return data[b'data'], data[b'labels']


class CifarData:
def __init__(self, filenames, need_shuffle):
"""
读取数据
:param filenames: 数据文件,可以有多个
:param need_shuffle: 是否需要将数据打乱
"""
data_list = []
label_list = []
for filename in filenames:
data, labels = load_data(filename)
for item, label in zip(data, labels):
# 读取所有数据
data_list.append(item)
label_list.append(label)
self._data = np.vstack(data_list)
self._data = self._data / 127.5 - 1

self._labels = np.hstack(label_list)
print(self._data.shape)
print(self._labels.shape)

self._num_examples = self._data.shape[0]

self._need_shuffle = need_shuffle

self._indicator = 0
if self._need_shuffle:
self._shuffle_data()

def _shuffle_data(self):

p = np.random.permutation(self._num_examples)

self._data = self._data[p]
self._labels = self._labels[p]

def next_batch(self, batch_size):
"""return batch_size examples as a batch."""
end_indicator = self._indicator + batch_size

if end_indicator > self._num_examples:
if self._need_shuffle:

self._shuffle_data()

self._indicator = 0
end_indicator = batch_size
else:
raise Exception("have no more examples")

if end_indicator > self._num_examples:
raise Exception("batch size is larger than all examples")

batch_data = self._data[self._indicator: end_indicator]
batch_labels = self._labels[self._indicator: end_indicator]
self._indicator = end_indicator
return batch_data, batch_labels


def logistic():
x = tf.placeholder(tf.float32, [None, 3072])

y = tf.placeholder(tf.int64, [None])

# w形状修改为 [3072, 10]
w = tf.get_variable('w', [x.get_shape()[-1], 10],
initializer=tf.random_normal_initializer(0, 1))

# b形状修改为 [10]
b = tf.get_variable('b', [10],
initializer=tf.constant_initializer(0.0))

y_ = tf.matmul(x, w) + b

## p_y_1 = tf.nn.sigmoid(y_)
p_y = tf.nn.softmax(y_)

"""
y_reshaped = tf.reshape(y, (-1, 1))
y_reshaped_float = tf.cast(y_reshaped, tf.float32)
loss = tf.reduce_mean(tf.square(y_reshaped_float - p_y_1))
"""
y_one_hot = tf.one_hot(y, 10, dtype=tf.float32)
loss = tf.reduce_mean(tf.square(y_one_hot - p_y))

## predict = p_y_1 > 0.5
predict = tf.argmax(y_, 1)

## correct_prediction = tf.equal(tf.cast(predict, tf.int64), y_reshaped)
correct_prediction = tf.equal(predict, y)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float64))

with tf.name_scope('train_op'):
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss)

init = tf.global_variables_initializer()
batch_size = 20
train_steps = 30000
test_steps = 100

with tf.Session() as sess:
sess.run(init)
for i in range(train_steps):
batch_data, batch_labels = train_data.next_batch(batch_size)
loss_val, acc_val, _ = sess.run(
[loss, accuracy, train_op],
feed_dict={
x: batch_data,
y: batch_labels})
if (i + 1) % 500 == 0:
print('[Train] Step: %d, loss: %4.5f, acc: %4.5f' % (i + 1, loss_val, acc_val))
if (i + 1) % 5000 == 0:
test_data = CifarData(test_filenames, False)
all_test_acc_val = []
for j in range(test_steps):
test_batch_data, test_batch_labels \
= test_data.next_batch(batch_size)
test_acc_val = sess.run(
[accuracy],
feed_dict={
x: test_batch_data,
y: test_batch_labels
})
all_test_acc_val.append(test_acc_val)
test_acc = np.mean(all_test_acc_val)
print('[Test ] Step: %d, acc: %4.5f' % (i + 1, test_acc))


if __name__ == '__main__':
CIFAR_DIR = "./data/cifar"
train_filenames = [os.path.join(CIFAR_DIR, 'data_batch_%d' % i) for i in range(1, 6)]
test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]

train_data = CifarData(train_filenames, True)
test_data = CifarData(test_filenames, False)

logistic()

3.1. 使用交叉熵损失

1
2
3
4
5
6
7
8
9
10
11
12
13
""" 平方差损失
p_y = tf.nn.softmax(y_)
y_one_hot = tf.one_hot(y, 10, dtype=tf.float32)
loss = tf.reduce_mean(tf.square(y_one_hot - p_y))
"""

"""
sparse_softmax_cross_entropy:
1、预测值y_计算 softmax(y_)
2、真实值y计算one_hot
3、计算ylog(y_)
"""
loss = tf.losses.sparse_softmax_cross_entropy(labels=y, logits=y_)
panchaoxin wechat
关注我的公众号
支持一下