Tensorflow学习之线程队列与IO操作

tensorflow学习之线程队列与IO操作。

队列和线程

在训练样本的时候,希望读入的训练样本时有序的

  • tf.FIFOQueue 先进先出队列,按顺序出队列

  • tf.RandomShuffleQueue 随机出队列

tf.FIFOQueue

FIFOQueue(capacity, dtypes, name=’fifo_queue’)

  • 创建一个以先进先出的顺序对元素进行排队的队列

  • capacity:整数。可能存储在此队列中的元素数量的上限

  • dtypes:DType对象列表。长度dtypes必须等于每个队列元素中的张量数,dtype的类型形状,决定了后面进队列元素形状

method

  • dequeue(name=None)

  • enqueue(vals, name=None):

  • enqueue_many(vals, name=None):vals列表或者元组,返回一个进队列操作

  • size(name=None)

完成一个出队列、+1、入队列操作(同步操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import tensorflow as tf
import os

# 首先定义队列
Q=tf.FIFOQueue(3,tf.float32)

# 放入一些数据
enq_many=Q.enqueue_many([[0.1,0.2,0.3],])

# 定义取数据的过程
out_q=Q.dequeue()
data=out_q+1
en_q=Q.enqueue(data)

with tf.Session() as sess:
# 初始化队列
sess.run(enq_many)
# 处理数据
for i in range(100):
sess.run(en_q)
# 训练数据
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue()))

分析:当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个线程,实现异步读取。

队列管理器

tf.train.QueueRunner(queue, enqueue_ops=None):创建一个QueueRunner

  • queue:A Queue

  • enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程

  • create_threads(sess, coord=None,start=False)创建线程来运行给定会话的入队操作

  • start:布尔值,如果True启动线程;如果为False调用者,必须调用start()启动线程

  • coord:线程协调器,后面线程管理需要用到

  • return:

通过队列管理器来实现变量加1,入队,主线程出队列的操作,观察效果?(异步操作)

分析:这时候有一个问题就是,入队自顾自的去执行,在需要的出队操作完成之后,程序没法结束。需要一个实现线程间的同步,终止其他线程。

线程协调器

tf.train.Coordinator():线程协调员,实现一个简单的机制来协调一组线程的终止

  • request_stop()

  • should_stop() 检查是否要求停止

  • join(threads=None, stop_grace_period_secs=120) 等待线程终止

  • return:线程协调员实例

文件读取

文件读取流程

image-20200906080652974

步骤

1、文件读取API-文件队列构造

tf.train.string_input_producer(string_tensor,shuffle=True):将输出字符串(例如文件名)输入到管道队列

  • string_tensor 含有文件名的1阶张量

  • num_epochs:过几遍数据,默认无限过数据

  • return:具有输出字符串的队列

2、文件读取API-文件阅读器•根据文件格式,选择对应的文件阅读器

class tf.TextLineReader:阅读文本文件逗号分隔值(CSV)格式,默认按行读取

  • return:读取器实例

tf.FixedLengthRecordReader(record_bytes)

  • 要读取每个记录是固定数量字节的二进制文件

  • record_bytes:整型,指定每次读取的字节数

  • return:读取器实例

tf.TFRecordReader

  • 读取TfRecords文件

有一个共同的读取方法

read(file_queue):从队列中指定数量内容,返回一个Tensors元组(key文件名字,value默认的内容(行,字节))

3、文件读取API-文件内容解码器

由于从文件中读取的是字符串,需要函数去解析这些字符串到张量

tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)将CSV转换为张量,与tf.TextLineReader搭配使用

  • records:tensor型字符串,每个字符串是csv中的记录行

  • field_delim:默认分割符”,”

  • record_defaults:参数决定了所得张量的类型,并设置一个值在输入字符串中缺少使用默认值,如

  • tf.decode_raw(bytes,out_type,little_endian = None,name = None)

将字节转换为一个数字向量表示,字节为一字符串类型的张量,与函数tf.FixedLengthRecordReader搭配使用,二进制读取为uint8格式

4、开启线程操作

tf.train.start_queue_runners(sess=None,coord=None):收集所有图中的队列线程,并启动线程

  • sess:所在的会话中

  • coord:线程协调器

  • return:返回所有线程队列

如果读取的文件为多个或者样本数量为多个,怎么去管道读取?

管道读端批处理

tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None):读取指定大小(个数)的张量

  • tensors:可以是包含张量的列表

  • batch_size:从队列中读取的批处理大小

  • num_threads:进入队列的线程数

  • capacity:整数,队列中元素的最大数量

  • return:tensors

tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue, num_threads=1)

  • 乱序读取指定大小(个数)的张量

  • min_after_dequeue:留下队列里的张量个数,能够保持随机打乱

文件读取案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import tensorflow as tf
import os

# 定义一个队列,1000
Q=tf.FIFOQueue(1000,tf.float32)

# 定义子线程要做的事情:循环,加一,放入队列
var=tf.Variable(0.0)
# 实现自增
data=tf.assign_add(var,tf.constant(1.0))
en_q=Q.enqueue(data)

# 定义队列管理器op,定义多少个子线程,子线程该干的事情
qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)

# 初始化变量op
init_op=tf.global_variables_initializer()

with tf.Session() as sess:
# 初始化变量
sess.run(init_op)

# 开启线程管理器
coord=tf.train.Coordinator()

# 真正开启子线程
threads=qr.create_threads(sess,coord=coord,start=True)

# 主线程,不断读取数据
for i in range(300):
print(sess.run(Q.dequeue()))

# 回收子线程
coord.request_stop()
coord.join(threads)
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2020 WuXei Si
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信