前言:学习了一些基础的机器学习和深度学习的知识,对tensorflow框架有了简单的了解,想本身作个小项目练习一下,没想到遇到了许多坑,故把此次项目记录下来以备之后回顾。python
Feature:github
Target:bash
为了将目标值为空、重复的数据和错误的数据删除,并将图片重命名为n.png形式与csv文件中的目标值相对应,定义了一个DataClean类。网络
def readTxt(self):
""" 读取txt文件并分割文件名与目标值 :return: """
with open("./yzm_labels.txt", "r") as f:
text_list = f.readlines()
for text in text_list:
# 取出目标值,因为最后一位为\n,故有[:-1]取到倒数第二位
feature = text.split(",")[1][:-1]
# print(feature)
# 取出文件名
file_name = text.split(",")[0].split("/")[1]
# print(file_name)
# 存入字典
self.file_dic[file_name] = feature
# print(len(self.file_dic))
复制代码
def renameFile(self):
""" 修改文件名并创建新的目标值文档 :return: """
# 命名图片
n = 0
with open("labels.csv", "a") as f:
for key in self.file_dic:
oldname = "./yzm/" + key
newname = "./yzm/" + str(n) + ".png"
# 删除目标值为空、图片错误、重复的图片和目标值不足的图片
if len(self.file_dic[key]) == 4 and self.file_dic[key] not in self.value_list:
try:
os.rename(oldname, newname)
except FileNotFoundError:
print("出现错误")
continue
self.value_list.append(self.file_dic[key])
f.writelines(self.file_dic[key] + ",")
n += 1
else:
os.remove("./yzm/{}".format(key))
复制代码
def delFile(self):
""" 删除不存在的图片 :return: """
name_list = os.listdir("./yzm")
for name in name_list:
if len(name) > 10:
os.remove("./yzm/{}".format(name))
复制代码
def martixT(self):
""" labels.txt转置 :return: """
df = pd.read_csv("./labels.csv")
df.T.to_csv("./newlabels.csv")
os.remove("./labels.csv")
复制代码
处理后的数据app
Feature: 框架
此处有两个大坑,会致使在sess.run(label_bat/image_bat)时报错 机器学习
OutOfRangeError (see above for traceback): FIFOQueue '_2_batch/fifo_queue' is closed and has insufficient elements (requested 8789, current size 0)
会出现各类各样的解决方法,我搞了一下午都解决不了,最后发现问题出在数据上。先把代码贴上来。
def get_captcha_image():
""" 获取验证码图片数据 :return: image """
# 构造文件名
filename = []
for i in range(8789):
string = str(i) + ".png"
filename.append(string)
# 构造路径+文件
file_list = [os.path.join(r'D:\My_project\CNN_captcha\yzm', file) for file in filename]
# print(file_list)
# 构造文件队列
file_queue = tf.train.string_input_producer(file_list, shuffle=False)
# 构造阅读器
reader = tf.WholeFileReader()
# 读取图片数据内容
_, value = reader.read(file_queue)
# 解码图片数据
image = tf.image.decode_png(value)
# 图片分辨率:120 * 48 * 4,将一张png图像使用PIL读入的时候,发现是一个四通道图像,即:RGBA,分别表明Red(红色)Green(绿色)Blue(蓝色)和Alpha的色彩空间。
image.set_shape([48, 120, 4])
print(image)
# 批处理数据 [1000, 48, 120, 4]
image_batch = tf.train.batch([image], batch_size=3000, num_threads=1, capacity=3000)
return image_batch
复制代码
这一段代码在sess.run(image_batch)
时报错的缘由是我一开始将图片数据的形状设置成了image.set_shape([48, 120, 3])
而非image.set_shape([48, 120, 4])
,但其指望的是一个四通道图像,由于PNG图像有一个透明空间。 另外当batch_size设置过大时也会报一样的错误。函数
def get_captcha_label():
""" 读取验证码图片标签数据 :return: label_bat """
# 构造文件队列
file_queue = tf.train.string_input_producer([r"D:\My_project\CNN_captcha\newlabels.csv"], shuffle=False)
# 构造阅读器
reader = tf.TextLineReader()
_, value = reader.read(file_queue)
# 这里的参数设置取决于读取的值,读取的值有几列就设置几行,其中[1]表明整形,[1.]表明float型,["None"]表明字符型
records = [["None"]]
label = tf.decode_csv(records=value, record_defaults=records)
# print(label)
# [b'95m8'],[b'sr3e']
label_batch = tf.train.batch([label], batch_size=3000, num_threads=1, capacity=3000)
return label_batch
复制代码
这一段代码报错的缘由是将records = [["None"]]
的值设置错误,这里的records取决于读取的值,读取的值有几列就设置几行,其中[1]表明整形,[1.]表明float型,["None"]表明字符型。学习
def dealWithLabel(self, label_str):
""" 将标签数据处理为数字 :param label_str: :return: """
# 构建字符索引 {0:'A', 1:'B'......}
num_letter = dict(enumerate(list(self.letter)))
# 键值对反转 {'A':0, 'B':1......}
letter_num = dict(zip(num_letter.values(), num_letter.keys()))
# print(letter_num)
# 构建标签的列表
array = []
# 给标签数据进行处理[[b"NZPP"], ......]
for string in label_str:
letter_list = [] # [1,2,3,4]
# 修改编码,b'FVQJ'到字符串,而且循环找到每张验证码的字符对应的数字标记
for letter in string[0].decode('utf-8'):
letter_list.append(letter_num[letter])
array.append(letter_list)
# [[13, 25, 15, 15], [22, 10, 7, 10], [22, 15, 18, 9], [16, 6, 13, 10], [1, 0, 8, 17], [0, 9, 24, 14].....]
# print(array[:10])
# 将array转换成tensor类型
label = tf.constant(array)
return label
复制代码
这里在label = tf.constant(array)进行类型转换的时候有个坑,若是array里有坏数据(例如标签为ABC,即4位验证码缺乏了一位),则会报ValueError: Argument must be a dense tensor错误,即数据类型不符。
这里一次性写入了3000个数据。
def write_to_tfr(self, image_batch, label_deal):
""" 写入tfr文件 :param image_batch: 特征值 :param label_deal: 目标值 :return: """
# 转换类型
label_uint8 = tf.cast(label_deal, tf.uint8)
# print(label_batch)
# 创建TFRecords 存储器
writer = tf.python_io.TFRecordWriter(self.dir)
# 循环将每个图片上的数据构造example协议块,序列化后写入
for i in range(3000):
# 取出第i个图片数据,转换相应类型,图片的特征值要转换成字符串形式
image_string = image_batch[i].eval().tostring()
# 标签值,转换成整型
label_string = label_uint8[i].eval().tostring()
print(i)
# 构造协议块
example = tf.train.Example(features=tf.train.Features(feature={
"image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_string])),
"label": tf.train.Feature(bytes_list=tf.train.BytesList(value=[label_string]))
}))
writer.write(example.SerializeToString())
# 关闭文件
writer.close()
复制代码
def read_captcha_tfrecords(self):
""" 读取验证码特征值和目标值数据 :return: """
# 一、构造文件的队列
file_queue = tf.train.string_input_producer([self.dir])
# 二、tf.TFRecordReader 读取TFRecords数据
reader = tf.TFRecordReader()
# 单个样本数据
_, value = reader.read(file_queue)
# 三、解析example协议
feature = tf.parse_single_example(value, features={
"image": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.string)
})
# 四、解码操做、数据类型、形状
image = tf.decode_raw(feature["image"], tf.uint8)
label = tf.decode_raw(feature["label"], tf.uint8)
# 肯定类型和形状
# 图片形状 [48, 120, 4]
# 目标值 [4]
image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
label_reshape = tf.reshape(label, [self.label_num])
# 类型
image_type = tf.cast(image_reshape, tf.float32)
label_type = tf.cast(label_reshape, tf.int32)
# 五、 批处理
# print(image_type, label_type)
# 提供每批次多少样本去进行训练
image_batch, label_batch = tf.train.batch([image_type, label_type],
batch_size=self.train_batch,
num_threads=1,
capacity=self.train_batch)
print(image_batch, label_batch)
return image_batch, label_batch
复制代码
def loss(self, y_true, y_predict):
""" 创建验证码4个目标值的损失 :param y_true: 真实值 :param y_predict: 预测值 :return: loss """
with tf.variable_scope("loss"):
# 先进行网络输出的值的几率计算softmax,在进行交叉熵损失计算
# y_true:[100, 4, 63]------>[100, 252]
# y_predict:[100, 252]
y_reshape = tf.reshape(y_true,
[self.train_batch, self.label_num * self.feature_num])
all_loss = tf.nn.softmax_cross_entropy_with_logits(labels=y_reshape,
logits=y_predict,
name="compute_loss")
# 求出平均损失
loss = tf.reduce_mean(all_loss)
return loss
复制代码
def sgd(loss):
""" 梯度降低优化损失 :param loss: :return: train_op """
with tf.variable_scope("sgd"):
train_op = tf.train.AdamOptimizer(0.001).minimize(loss)
return train_op
复制代码
四个目标值彻底符合结果为True
def acc(self, y_true, y_predict):
""" 计算准确率 :param y_true: 真实值 :param y_predict: 预测值 :return: accuracy """
with tf.variable_scope("acc"):
# y_true:[100, 4, 63]
# y_predict:[100, 252] --> [100,4,63]
y_predict_reshape = tf.reshape(y_predict, [self.train_batch, self.label_num, self.feature_num])
# 先对最大值的位置去求解
euqal_list = tf.equal(tf.argmax(y_true, 2),
tf.argmax(y_predict_reshape, 2))
# 须要对每一个样本进行判断
# euqal_list:[True, True,True, True], [True, False,True, True]
# x = tf.constant([[True, True], [True, False]])
# tf.reduce_all(x, 1),四个特征值全为True结果为T,求与逻辑[True, False]
accuracy = tf.reduce_mean(tf.cast(tf.reduce_all(euqal_list, 1), tf.float32))
return accuracy
复制代码
def model_nn(self, image_batch):
""" 创建全链接模型 :param image_batch:特征值 :return: y_predict """
# 全链接层
# [100,48,120,4] --> [100,48*120*4]
# y_pre:[100,48*120*4] * [48*120*4,252] = [100,252]
with tf.variable_scope("model"):
# 初始化权重和偏置
weight = self.weight_variables([48 * 120 * 4, 252])
bias = self.bias_variables([252])
# 特征值四维转二维
x_re = tf.reshape(image_batch, [self.train_batch, 48 * 120 * 4])
y_predict = tf.matmul(x_re, weight) + bias
return y_predict
复制代码
全链接层模型训练结果,效果很差
最初直接使用上面的方法,只将5.5的全链接层神经网络拓展为两层卷积神经网络,发现训练准确率一直为0。通过查阅资料,将损失计算函数作了修改。
def loss(self, y_true, y_predict):
""" 创建验证码4个目标值的损失 :param y_true: 真实值 :param y_predict: 预测值 :return: loss """
with tf.variable_scope("loss"):
# 先进行网络输出的值的几率计算sigmiod,在进行交叉熵损失计算
# y_true:[100, 4, 63]------>[100, 252]
# y_predict:[100, 252]
y_reshape = tf.reshape(y_true,
[self.train_batch, self.label_num * self.feature_num])
all_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_predict, labels=y_reshape, name="compute_loss")
# 求出平均损失
loss = tf.reduce_mean(all_loss)
复制代码
将softmax损失计算改成了sigmoid损失计算,训练效果大大提高。
其缘由在于:
sigmoid:计算网络输出logits和标签labels的sigmoid cross entropy loss用来衡量独立不互斥离散分类任务的偏差。 说独立不互斥离散分类任务是由于,在这些任务中类与类之间是独立可是不互斥的。拿多分类任务中的多目标检测来举例子,一张图中能够有各类instance,好比有一只狗和一只猫。对于一个总共有五类的多目标检测任务,假如网络的输出层有5个节点,label的形式是[1,1,0,0,1]这种,1表示该图片有某种instance,0表示没有。那么,每一个instance在这张图中是否存在显然是独立事件,可是多个instance能够存在一张图中,这就说明事件们并非互斥的。因此咱们能够直接将网络的输出用做该方法的logits输入,从而进行输出与label的cross entropy loss。
softmax:计算网络输出logits和标签labels的softmax cross entropy loss,衡量独立互斥离散分类任务的偏差。 说独立互斥离散分类任务是由于,在这些任务中类与类之间是独立并且互斥的,好比VOC classification、Imagenet、CIFAR-10甚至MNIST,这些都是多分类任务,可是一张图就对应着一个类,class在图片中是否存在是独立的,而且一张图中只能有一个class,因此是独立且互斥事件。
显然,在本次项目的场景,使用sigmoid函数来衡量偏差是更加合适的。
此外,对准确率计算函数进行了优化,除计算每一个样本的准确率外,还计算了单个字符的准确率,以便更直观的观察训练过程。 单个字符的准确率可理解为将每一个样本都拆分为4个字符,而后计算全部字符的当前训练准确率。
def acc(self, y_true, y_predict):
""" 计算准确率 :param y_true: 真实值 :param y_predict: 预测值 :return: accuracy """
with tf.variable_scope("acc"):
# y_true:[100, 4, 63]
# y_predict:[100, 252] --> [100,4,63]
y_predict_reshape = tf.reshape(y_predict, [self.train_batch, self.label_num, self.feature_num])
# 先对最大值的位置去求解
equal_list = tf.equal(tf.argmax(y_true, 2),
tf.argmax(y_predict_reshape, 2))
# 字符准确率
# 直接对equal_list求平均
# equal_list:[True, True,True, True], [True, False,True, True]
accuracy_char = tf.reduce_mean(tf.cast(equal_list, tf.float32))
# 图片准确率
# 须要对每一个样本进行判断
# equal_list:[True, True,True, True], [True, False,True, True]
# x = tf.constant([[True, True], [True, False]])
# tf.reduce_all(x, 1),四个特征值全为True结果为T,求与逻辑[True, False]
accuracy_image = tf.reduce_mean(tf.cast(tf.reduce_all(equal_list, 1), tf.float32))
return accuracy_char, accuracy_image
复制代码
网络结构
序号 | 层级 |
---|---|
输入 | input |
1 | 卷积层 + 池化层 + 降采样层 + ReLU |
2 | 全链接 + sigmoid |
输出 | output |
训练结果
网络结构
序号 | 层级 |
---|---|
输入 | input |
1 | 卷积层 + 池化层 + 降采样层 + ReLU |
2 | 卷积层 + 池化层 + 降采样层 + ReLU |
2 | 全链接 + sigmoid |
输出 | output |
训练结果
用测试集进行预测并进行结果展现
def predict(self):
""" 进行预测,打印结果 :return: """
# 构建字符索引 {0:'A', 1:'B'......}
num_letter = dict(enumerate(list(self.letter)))
# 更改获取的样本数
self.get_batch = 10
# 经过接口获取特征值和目标值
# image_batch:[100, 48, 120, 4]
# label_batch: [100, 4]
# [[13, 25, 15, 15], [22, 10, 7, 10]]
image_batch, label_batch = self.read_captcha_tfrecords(self.testdir)
# 创建卷积模型,y_predict:[100,252]
# CNN
y_predict = self.model_cnn(image_batch)
# 转换label_batch到one_hot编码
# y_true:[100, 4, 63]
y_true = self.turn_to_onehot(label_batch)
# 计算准确率,获取reshape后的y_predict:[100, 4, 63]
char_acc, image_acc, y_predict = self.acc(y_true, y_predict)
# 建立读取模型的OP
saver = tf.train.Saver()
# 会话训练
with tf.Session() as sess:
# 初始化变量
sess.run(tf.global_variables_initializer())
# 生成线程的管理
coord = tf.train.Coordinator()
# 指定开启子线程去读取数据
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 加载保存的模型,从模型中找出与当前代码中名字同样的OP操做,覆盖原来的值
ckpt = tf.train.latest_checkpoint(self.modeldir)
if ckpt:
saver.restore(sess, ckpt)
# 获取准确率,预测值和真实值
char_run, image_run, predict, label = sess.run([char_acc, image_acc, tf.argmax(y_predict, 2), label_batch])
print("预测准确率为:%f,字符准确率为:%f" % (image_run, char_run))
# 将数字目标值改成字母
# 打印预测结果
array_true = [] # 真实值列表
array_predict = [] # 预测值列表
for i in range(self.get_batch):
array_true_single = [] # 真实值单个样本
array_predict_sin = [] # 预测值单个样本
for num in label[i]:
# 将数字转换为字母加入单个样本列表
array_true_single.append(num_letter[num])
# 将单个样本加入总列表
array_true.append(array_true_single)
for num in predict[i]:
# 将数字转换为字母加入单个样本列表
array_predict_sin.append(num_letter[num])
# 将单个样本加入总列表
array_predict.append(array_predict_sin)
# 打印预测结果
print("第 %d 次预测,真实值:" % i, array_true[i], ",", "预测值:", array_predict[i])
# 回收线程
coord.request_stop()
coord.join(threads)
复制代码
下面这段代码的目的是将以数字形式进行打包的样本转换为原始的字母形式,并将真实值与预测值进行比对。
# 将数字目标值改成字母
# 打印预测结果
array_true = [] # 真实值列表
array_predict = [] # 预测值列表
for i in range(self.get_batch):
array_true_single = [] # 真实值单个样本
array_predict_sin = [] # 预测值单个样本
for num in label[i]:
# 将数字转换为字母加入单个样本列表
array_true_single.append(num_letter[num])
# 将单个样本加入总列表
array_true.append(array_true_single)
for num in predict[i]:
# 将数字转换为字母加入单个样本列表
array_predict_sin.append(num_letter[num])
# 将单个样本加入总列表
array_predict.append(array_predict_sin)
# 打印预测结果
print("第 %d 次预测,真实值:" % i, array_true[i], ",", "预测值:", array_predict[i])
复制代码
训练集使用了7000个样本,其中有标签不许确的数据,训练结果达到100%的准确率。测试集使用了100个样本,测试结果并不理想,考虑是训练集不够的问题,测试结果以下图。
考虑使用更多的数据集进行训练,提升模型的兼容性和鲁棒性,未完待续...
代码已开源到GitHub:github.com/hy-struggle…:,有兴趣的朋友能够一块儿交流和进步哦~