队列和变量相似,都是计算图上有状态的节点,能够经过赋值操做修改变量的取值。对于队列,队列的操做主要有Enqueue、EnqueueMany和Dequeue。
如下代码展现如何进行队列的初始化 入队 出队数据结构
# coding utf-8 import tensorflow as tf # 建立一个先进先出队列,指定队列中能够保存两个元素,并指定类型为整数。 q = tf.FIFOQueue(2, 'int32') # 使用enqueue_many函数来初始化队列中的元素。和变量初始化相似,在使用队列以前 # 须要明确调用这个初始化过程. init = q.enqueue_many(([0, 10],)) # 使用Dequeue函数将队列中的第一个元素出队列。这个元素值,将被存在变量x中。 x = q.dequeue() y = x + 1 # 将加1后的值在从新加入队列中。 q_inc = q.enqueue_many([y]) with tf.Session() as sess: # 运行初始化队列的操做。 init.run() for _ in range(5): # 运行q_inc将执行数据出队列、出队的元素+1,、从新加入队列的整个过程。 v, _ = sess.run([x, q_inc]) # 打印出队元素的值。 print(v)
在TensorFlow中,队列不单单是一种数据结构,仍是异步计算张量取值的一个重要机制。好比多个线程能够同时向一个队列中写元素,或者同时读取一个队列中的元素。多线程
TF提供了tf.Coordinator和tf.QueueRunner两个类来完成多线程协同的功能。dom
tf.Coordinator主要用于协同多个线程一块儿中止,并提供了should_stop、request_stop和join三个函数。在启动线程以前须要声明一个tf.Coordinator类,并将这个类传入每个建立的线程中。启动的线程须要一直查询tf.Coordinatorl类中提供的should_stop函数,当这个函数的返回值为Truez时,则当前线程也须要退出。每个启动的线程均可以经过调用request_stop函数来通知其余线程退出。当某一个线程调用request_stop函数以后,should_stop函数的返回值将被设置为True,这样其余线程就能够同时终止。异步
tf.Coordinator演示代码以下函数
# coding utf-8 import tensorflow as tf import numpy as np import threading import time # 线程中运行的程序,这个程序每隔1秒判断是否须要中止并打印本身的ID。 def MyLoop(coord, worker_id): # 使用tf.Coordinator类提供的协同工具判断当前线程是否须要中止并打印本身的ID while not coord.should_stop(): # 随机中止全部线程 if np.random.rand() < 0.1: print('Stoping from id: %d\n' % worker_id) # 调用coord.request_stop()函数来通知其余线程中止 coord.request_stop() else: # 打印当前线程的ID print('Working on id: %d\n' % worker_id) # 暂停1秒 time.sleep(1) # 声明一个tf.train.Coordinator类来协同多个线程 coord = tf.train.Coordinator() # 声明建立5个线程 threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)] # 启动全部的线程 for t in threads: t.start() # 等待全部线程退出 coord.join(threads)
tf.QueueRunner主要用于启动多个线程来操做同一个队列,启动的这些线程能够经过上面介绍的tf.Coordinator类来统一管理。如下代码展现如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操做。工具
import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数 queue = tf .FIFOQueue(100, 'float') # 定义队列的入队操做 enqueue_op = queue.enqueue([tf.random_normal([1])]) # 使用 tf.train.QueueRunner来建立多个线程运行队列的入队操做 # tf.train.QueueRunner给出了被操做的队列,[enqueue_op] * 5 # 表示了须要启动5个线程,每一个线程中运行的是enqueue_op操做 qr = tf.train.QueueRunner(queue, [enqueue_op] * 5) # 将定义过的QueueRunner加入TensorFlow计算图上指定的集合 # tf.train.add_queue_runner函数没有指定集合, # 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。 # 下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合 tf.train.add_queue_runner(qr) # 定义出队操做 out_tensor = queue.dequeue() with tf.Session() as sess: # 使用tf.train.Coordinator来协同启动的线程 coord = tf.train.Coordinator() # 使用tf.train.QueueRunner时,须要明确调用tf.train.start_queue_runners # 来启动全部线程。不然由于没有线程运行入队操做,当调用出队操做时,程序一直等待 # 入队操做被运行。tf.train.start_queue_runners函数会默认启动 # tf.GraphKeys.QUEUE_RUNNERS中全部QueueRunner.由于这个函数只支持启动指定集合中的QueueRunner, # 因此通常来讲tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合 threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 获取队列中的取值 for _ in range(3): print(sess.run(out_tensor)[0]) # 使用tf.train.Coordinator来中止全部线程 coord.request_stop() coord.join(threads)