Python自动化开发学习9-多线程、队列

threading 模块

先理解一下进程与线程的概念和区别,而后经过threading模块来学习理解线程。进程要下次讲了。
以后看一下两种调用线程的方式,效果和实现都同样。貌似也没有何时用哪一种,反正爱用哪一种用哪一种。通常的话直接调用就行了。python

线程与进程

线程,是操做系统可以进行运算调度的最小单位。
进程,是对各类资源管理的集合。
进程就是一个执行中的程序。程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
进程要进行运算,必需要先建立一个线程。由于进程不具有执行的动做,可是他包含线程,经过线程来进行运算。全部在同一个进程里的线程,是共享同一块内存空间的。编程

直接调用

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t1 = threading.Thread(target=task, args=(1,))  # 生成一个线程实例
    t2 = threading.Thread(target=task, args=(2,))  # 再生成一个实例
    t1.start()  # 启动线程
    t2.start()  # 再启动一个
    print(t1.getName())  # 获取线程名
    print(t2.getName())

参数注意:函数名和参数要分开写。而且参数要写成元组的形式,这里只有一个参数,因此也必须用括号括起来后面加个逗号,表示这是一个元组。多线程

继承式调用

上面是直接实例化了 threading.Thread这个类,咱们也能够像下面这样先继承这个类,而后重构它的run方法。并发

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        super(MyThread, self).__init__()  # 继承父类的构造函数
        self.num = num
    def run(self):
        "每一个线程要运行的函数,必须写到run方法里"
        print("running on task", self.num)
        time.sleep(3)
        print("task over", self.num)

if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()  # 这里就会执行run方法
    t2.start()
    print(t1.getName())
    print(t2.getName())

在直接调用中,就是将你的函数名和运行参数,在实例化的时候,经过类的构造函数传递给了threading.Thread类的run方法。而这里咱们是重构了这个run方法app

使用for循环调用多个线程

若是须要一次调用多个线程,就不能再像上面那样一个一个写了。能够用一个for,来循环调用启动ssh

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
    print("所有运行结束?")  # 注意这句print执行的时间

程序主函数的线程

在上面的例子中,最后的print并无等待以前的sleep运行结束,而是直接执行了。这里主函数是一个线程,其余使用threading启动的都是这个主线程启动的子线程。全部线程都是独立执行的,主线程启动了子线程以后二者就相互独立了,相互独立并行执行。
咱们能够经过 threading.current_thread() 获取到当前的线程名:ide

import threading
import time

def task(num):
    print("running on task", num, threading.current_thread())
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    print("running on Main", threading.current_thread())
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
    print("所有运行结束?")  # 注意这句print执行的时间

能够看到,主函数是MainThread。每个线程都有线程名和线程号。函数

join方法

若是咱们但愿全部的子线程都是并行的,可是主函数须要等待全部子线程都执行完毕后再统一继续执行,就须要join方法。
join方法,是等待这个线程执行完毕后才会继续执行以后的语句。每个线程都要有一个join,不然就不会等待这个线程执行完毕。oop

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t1 = threading.Thread(target=task, args=(1,))  # 生成一个线程实例
    t2 = threading.Thread(target=task, args=(2,))  # 再生成一个实例
    t1.start()  # 启动线程
    t2.start()  # 再启动一个
    t1.join()  # 为每一个线程加一个join
    t2.join()
    print(t1.getName())
    print(t2.getName())

上面是2个子线程的状况,若是是以前那样的50个子线程,那么还须要再写一个for循环来执行join。此次能够来计算一下程序的运行时间。学习

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    t_objs = []  # 先定义一个空列表
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
        t_objs.append(t)  # 保存每个实例,不然跳出当前for循环后没法调用
    for j in t_objs:
        j.join()
    print("总共运行时间:", time.time()-start_time)  # 计算总共的运行时间
    print("所有运行结束?")  # 注意这句print执行的时间

活动的线程个数

经过 threading.active_count() 能够获取到活动的线程个数

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)
    print("task over", num)

if __name__ == '__main__':
    print(threading.active_count())  # 运行前是1,由于有一个主函数的线程
    t_objs = []  # 先定义一个空列表
    start_time = time.time()
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        t.start()
        t_objs.append(t)  # 保存没一个实例,不然跳出当前for循环后没法调用
    print(threading.active_count())  # 全部子线程都起来了,一个50个,再加上一个主函数
    for j in t_objs:
        j.join()
    print(threading.active_count())  # 全部子线程都结束了,又只剩一个主函数了
    print("总共运行时间:", time.time()-start_time)  # 计算总共的运行时间

守护线程

以前的进程互相之间都是独立的,虽然有父线程建立子线程,可是建立完以后,这2个线程也就互相独立了。这个是默认的设置。
守护线程,在建立完线程尚未运行以前,能够将线程设置为守护线程。守护线程依赖主线程而存在,主线程一旦运行完毕,守护线程不管是什么状况,都会中止。
二者的差异就是,以前的状况,全部线程都是独立运行的。若是没有使用join,全部的线程包括主线程都是独立运行,当全部线程所有运行结束后,咱们的程序才会结束。若是将线程设为守护线程后,那么当主线程和其余线程运行完以后,不会等待守护线程运行结束,程序会直接结束。

import threading
import time

def task(num):
    print("running on task", num)
    time.sleep(3)  # 设为守护线程后,不会等待守护线程运行结束了
    print("task over", num)  # 因此这里也不会打印了

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=task, args=('t%s' % i,))
        # 将线程设置为守护线程,当主线程退出时,守护线程也会退出
        # 而且由这个守护线程启动的其它子线程也是守护线程,也会会同时退出,无论是否执行完任务
        t.setDaemon(True)  # 必须在建立线程后,可是在运行前才能将线程设置为守护线程
        t.start()
    print("运行结束,进程数量", threading.active_count())  # 这里不会等待子线程运行完毕

小结

线程类的方法:

  • start :线程准备就绪,等待CPU调度
  • setName :为线程设置名称
  • getName :获取线程名称
  • setDaemon:设置守护线程,设为True就是守护线程。默认False
  • join :逐个执行每一个线程,执行完毕后继续往下执行
  • run :线程被cpu调度后自动执行线程对象的run方法
  • threading.current_thread : 查看当前的线程名
  • threading.active_count : 查看活动线程的数量

Python GIL

Python GIL(Global Interpreter Lock),全程解释器锁。不管你启多少个线程,你的cpu是多少核,Python在执行的时候只能是单核运行。这个是使用Python解析器(CPython)时会有的状况。Python除了(CPython)还能够经过CPython,PyPy,Psyco等不一样的Python执行环境来执行,可是CPython是大部分环境下默认的Python执行环境。
因此咱们用CPython就会有GIL,有GIL就不是真正的多线程,只能单核运行。咱们仍是会继续在CPython下学习和运行Python,GIL仍是会继续存在。目前只要知道这么多就好了,怎么利用多核是下次的内容。

线程锁

这段上课演砸了,不过不要紧,我大概搞明白了。
一个进程下能够启动多个线程,多个线程共享父进程的内存空间,也就意味着每一个线程能够访问同一份数据,此时,若是2个线程同时要修改同一份数据,这是数据就混乱了。下面模拟没有线程锁形成数据混乱的状况。

import threading
import time

gl_num = 0
def show():
    global gl_num  # 声明全局变量
    gl_num += 1  # 先执行自增1,而后停顿一会
    # 把下面的sleep注释掉,多是数据处理到输出之间几乎没有间隔,看不到数据混乱的状况
    time.sleep(0.1)  # sleep的这段时间,其余线程也会操做这个变量
    print(gl_num)  # 最后输出的时候,就是全部线程操做后的结果

for i in range(10):
    t = threading.Thread(target=show)
    t.start()
print("运行结束")

因此,出现了线程锁,同一时刻只容许一个线程修改数据。

import threading
import time

gl_num = 0
lock = threading.Lock()  # 申请一把锁,生成一个实例
def show():
    global gl_num
    lock.acquire()  # 修改数据前加锁,此时别的线程就没法操做了。
    gl_num += 1
    time.sleep(0.1)
    print(gl_num)
    lock.release()  # 上面打印出结果了,释放锁容许别的线程继续操做

for i in range(10):
    t = threading.Thread(target=show)
    t.start()
print("运行结束")

上面加了锁之间的内容其实就变成了串行执行了。

递归锁

在使用线程锁的时候,若是你须要用到多把锁嵌套使用,可能会致使程序锁死,永远没法release。下面演示一个会出现锁死的状况。说白了,就是大锁中还要再包含子锁。

import threading

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == '__main__':
    num, num2 = 0, 0
    # 下面使用了递归锁RLock,能够正常执行,若是换成以前的Lock,就会出现锁死的状况
    lock = threading.RLock()
    # 其实这里都不是线程数量的问题,起一个子线程就会锁死了
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:  # 这里没用join,而是经过判断活跃线程数来确认子线程是否执行完毕
    print("活动线程数量:", threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

在上面的代码中,程序会先进入run3,run3中有第一道锁。而后在run3中会分别去执行run1和run2,而这里面又都会有第二道锁。此时多是程序吧两道锁搞混了,因此致使再也release不出来了。使用RLock来代替Lock就避免了这种状况。
也没讲RLock和Lock的其余区别,既然RLock没问题,貌似不用Lock就行了。简单的场景都OK,复杂的场景下必定要用递归锁避免程序被锁死。

信号量

线程锁,同时只容许一个线程更改数据。
信号量(Semaphore),是同时容许必定数量的线程更改数据。好比厕全部3个坑,那最多只容许3我的上厕所,后面的人只能等里面有人出来了才能再进去。
信号量的使用就和线程锁同样,实例化的时候类名变了,多一个int参数。以后都是同样的acquire上锁和release释放。

import threading,time

# 通常应该是写到if里的,不过这里演示,从上到下按顺序执行逻辑比较清晰
semaphore  = threading.BoundedSemaphore(5)  # 最多容许5个线程同时运行
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)
    semaphore.release()

if __name__ == '__main__':
    for i in range(30):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    while threading.active_count() != 1:
        pass  # 继续用活动进程数量等待子程序结束
    else:
        print("全部进程执行完毕")

上面代码执行后的效果就是,每次只蹦5条结果出来。由于经过信号量限制了同一时间只容许必定数量的线程操做数据。
这里的代码比较简单,并且都是同样的,因此看上去是每次执行5个。实际是这里面每个都是独立的。一旦有一个执行完释放了以后,就会让下一个继续执行。就是线程都是一个一个放行的,一旦一个执行完毕就放行下一个,而不是一批一批放行的。

定时器 timer

从线程被start方法调用开始,定时器开始计时。计时完毕后才会开始执行

import threading
def hello(name):
    print("你好,%s" % name)
t = threading.Timer(2, hello, args=("世界",))
t.start()  # 须要等待上面指定的秒数后才会真正执行

事件 event

事件(event),用于线程之间数据同步的。经过 event 来实现两个或多个线程间的交互。
下面把用于控制的线程称为服务端,被控制的线程称为客户端。
服务端,设置 event 的状态,只有 set 和 clear 两个方法
客户端,检查 event 的状态,若是是 set 就继续执行,不然就阻塞等待 set

  • event = threading.Event() :使用前,先生成一个实例
  • event.set() :服务端线程,将 event 设为 set ,客户端就能够继续执行
  • event.clear() :服务端线程,将 event 的 set 清除,客户端会阻塞直到再次设为 set
  • event.wait() :客户端线程,等待 event 变成 set ,若是 set 就继续,不然就阻塞直到 set
  • event.is_set() :布尔值,当前event的状态。客户端线程也能够用它来作控制。可是若是不是 set 和 clear时都须要执行的话,仍是用 wait 来控制比较好。wait是用阻塞来控制的,而这里是每次都要作一下判断。

课上举例了一个红绿灯的例子,起一个红绿灯的线程,若是绿灯就 set ,红灯就 clear。而后能够起几个车的线程,判断event,只在set的时候执行:

import threading, time

event = threading.Event()  # 这句应该写到if __name__里面,先放这里看看清楚
def light():
    "模拟红绿灯"
    count = 0
    while True:
        if count < 10:
            event.set()  # 绿灯将标志位设为 set
            print("\033[42;1m**\033[0m绿灯", 10-count, 'event状态:', event.is_set())
        elif count < 20:
            event.clear()  # 红灯清除标志位
            print("\033[41;1m**\033[0m红灯", 20-count, 'event状态:', event.is_set())
        else:
            count = 0
            continue
        time.sleep(1)
        count += 1

def car(name):
    "在红灯的时候wait,绿灯的时候执行"
    while True:
        event.wait()  # 只有在绿灯的时候才会继续执行
        print(name, '正在行驶...')
        time.sleep(1)

if __name__ == '__main__':
    light1 = threading.Thread(target=light)
    light1.start()
    # 你也能够用for循环多起几个车
    car1 = threading.Thread(target=car, args=("特斯拉",))
    car1.start()
    car2 = threading.Thread(target=car, args=("保时捷",))
    car2.start()

虽然不用 event 也能够经过检查某个变量的状态来实现控制。可是因为变量是进程间全部线程共享的,客户端直接访问控制变量也能够修改它,虽然你程序里可能不会这么写,但不是不能够。这里使用了 event 将这个过程封装了,避免客户端直接访问这个变量。

queue模块-队列

队列,能够理解为一个有顺序的容器,里面存放数据。数据来的时候先将数据存入,使用数据的时候再按必定顺序从容器中取出。
其实列表也能实现,用pop方法。简单的话,还真的用列表就行了,不过模块封装了更多高级的设置。
有3种队列:

import queue
# 使用以前也是要先生成一个实例
q1 = queue.Queue()  # 先进先出
q2 = queue.LifoQueue()  # 后进先出,就是堆栈。last in first out (LIFO)
q3 = queue.PriorityQueue()  # 能够设置优先级的队列

上面再实例化的时候,都没有参数。有一个参数 maxsize=0 ,默认的队列大小是没有限制。能够设置 maxsize 来设置队列大小。

存取数据

put(item) :存入一个数据
get(item) :取出一个数据
put_nowait(item) :存入数据的另一个方法,不等待,直接抛出异常
get_nowait(item) :取出数据的另一个方法,不等待,直接抛出异常
可选的参数,用于 put 和 get 方法:
block=False :默认为True,就是阻塞模式。设为False,则直接抛出异常,下面的 timeout 也就无效了。
timeout=1 :默认为None,就是一直等着。设置后为阻塞多少秒,若是这段时间内能够继续了,就立刻继续。不然仍是抛出异常。
put在存入数据的时候,若是队列设置了大小,而且队列已满,就会阻塞。直到队列里有数据被取出空出了位置,那么再将这个数据存入继续。可使用 put_nowait 存入,那么满的时候,就不直接抛出一个错误。
也能够设置参数 block=False 也是同样直接抛出错误,默认是True。
还能够设置参数 timeout=1 这个是等待的时间,好比这里等待1秒,若是1秒内队列空出来,就存入,不然仍是抛出异常。若是 block 已经设置了 False 这个 timeout 就没有用了,会直接抛出错误。
get也是同样,用上面的两个参数控制,在队列为空的状况下继续要取数据,是阻塞,仍是抛出异常,或者阻塞多久后再抛出异常。
下面的列子,生成2个线程,一个存入数据,一个用户控制来取出数据,演示put和get的用法:

import threading, queue

q1 = queue.Queue(maxsize=2)  # 队列大小只有2
def put_item():
    "存入数据的线程"
    for i in range(10):
        q1.put(i, timeout=2)  # 若是队列已满,最长等待2秒钟,不然抛出异常
        print("存入数据:", i)  # 存入数据的时候,你会看到
    print("数据已经所有存入...")
def control():
    "用户控制取出数据"
    while True:
        input("按回车取出一个数据,若是等待超过2秒会抛出异常... ")
        print("取出数据:", q1.get_nowait())  # 若是队列空了,再要取就抛出异常

if __name__ == '__main__':
    put1 = threading.Thread(target=put_item)
    put1.start()
    control1 = threading.Thread(target=control)
    control1.start()

其余方法

qsize() :队列中元素的个数,0就是空队列,等于maxsize就是队列满了
empty() :布尔值,队列是否为空。空的话就get不到数据了
full() :布尔值,队列是否已满。满的话就put不进数据了

import queue
q = queue.LifoQueue(3)  # 来一个堆栈,大小只有3
def show():
    "每次打印这3个数据"
    print('队列中的数据数量:', q.qsize(),
          '队列是否为空:', q.empty(),
          '队列是否满了', q.full())
show()  # 什么都尚未存入,大小是0,而且是空的
q.put(1)
show()  # 存入数据后,大小就变了,已经不是空堆栈了,可是还没满
q.put(2)
show()
q.put(3)
show()  # 如今已经满了
print(q.get())  # 看看取出数据的顺序,是否是最后存入的最早取出
print(q.get())
print(q.get())
show()  # 都取完了,如今又是一个空堆栈了

优先级队列

优先级队列例子1:

import queue

q1 = queue.PriorityQueue()

q1.put('Jack')
q1.put('Perter')
q1.put('Alice')
q1.put('1Zoey')

while not q1.empty():  # empty队列是否为空
    print(q1.get())

从上面能够看到,最后数据的结果给按ASCII顺序取出来的。通常可能不这么用,而是会单独设置一个优先级,那么就把优先级和数据已元组的形式存入
优先级队列例子2:

import queue

q1 = queue.PriorityQueue()

q1.put((10, 'Jack'))  # 注意数据只占1个参数位置,因此这里得2层括号,存入一个元组
q1.put((-1, 'Zoey'))
q1.put((10, 'Perter'))
q1.put((-1, 'Alice'))
q1.put((-1, 'Bob'))

while not q1.empty():
    print(q1.get())

重构put方法,优化排序的规则

这里还有一个问题,虽然将数据写成了元组,可是其实排序的时候是按整个元组的内容来排序的。可是咱们须要的是按照元组的第一个元素来排序,而一样优先级的数据,仍然按照进入的顺序输出。貌似模块自己并无提供这样的方法。咱们重构一个本身的优先级排序。
源码中调用put方法,最终会调用_put方法将数据导入;
而后在_put方法中调用了heapq模块(import queque的时候导入的)的heappush方法将数据传递给_siftdown方法;
最终在_siftdown方法中从新排列存储数据的列表。问题就是这个方法,直接从列表中取出元素进行比较,而咱们须要的是取出的元素是个元组,用元组的第一个元素进行比较。
因此只要修改上面3个方法就行了。

import queue

# 继承优先级排序的类,只重构咱们须要修改的方法
class MyPriorityQueue(queue.PriorityQueue):
    "根据元组的第一个元素进行优先级排序"
    def _put(self, item):
        "源码是调用heapq模块里的方法,这里咱们就在下面重构了,调用本身的类里的同名方法"
        self.heappush(self.queue, item)

    # 下面的2个方法是heapq模块里的,抄过来小改一下就行了
    def heappush(self, heap, item):
        "这里不用改实现的逻辑,让它继续调用咱们下面的_siftdown方法就行了"
        heap.append(item)
        self._siftdown(heap, 0, len(heap) - 1)

    def _siftdown(self, heap, startpos, pos):
        """主要就是修改这里
        源码是从heap里取出数据进行比较:if newitem < parent:
        这里改为比较数据里的第一个元素:if newitem[0] < parent[0]:
        这里写的比较简单。若是存入的不是元素应该就会报错了,不过这不是重点了,须要能够再改
        """
        newitem = heap[pos]
        while pos > startpos:
            parentpos = (pos - 1) >> 1
            parent = heap[parentpos]
            if newitem[0] < parent[0]:
                heap[pos] = parent
                pos = parentpos
                continue
            break
        heap[pos] = newitem

q1 = MyPriorityQueue()
q1.put((10, 'Jack'))  # 注意数据只占1个参数位置,因此这里得2层括号,存入一个元组
q1.put((-1, 'Zoey'))
q1.put((10, 'Perter'))
q1.put((-1, 'Alice'))
q1.put((-1, 'Bob'))
while not q1.empty():
    print(q1.get())

上面好长一段,其实我只须要改一行,该怎么作最好呢?

task_done 方法和 join 方法

task_done :每次从队列中取出数据而且处理好以后,调用一下这个方法,来提示 join 方法,是否中止阻塞。
join :阻塞,直到队列清空,再继续向日后执行。
具体例子看下面的生产者消费者模型,举例一

生产者消费者模型

回顾,以前讲迭代器的时候,经过yield生成器实现过一个单线程下的有并行效果的吃包子的函数。看以前 的笔记:Python自动化开发学习4-2
如今,学习了多线程和队列以后,咱们能够用多线程来实现了,把数据存放到队列之中。

概念

在并发编程中使用生产者和消费者模式可以解决绝大多数并发问题。该模式经过平衡生产线程和消费线程的工做能力来提升程序的总体处理数据的速度。
在线程里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,若是生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。一样的道理,若是消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题因而引入了生产者和消费者模式。
生产者消费者模式是经过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者之间不直接通讯,而是经过队列来传递数据。因此生产者生产完数据以后不用等待消费者处理,直接放入队列;消费者不找生产者要数据,而是直接从队列里取,队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力,这样就实现了解耦。
用以前学习的多线程和队列的知识,就能够实现这样的模型了。

举例一

举例说明。写一个生产者的函数,生产包子。再写一个消费者的函数,消费包子。用上面的 task_done 和 join 的方法。
生产者一次生产10个包子,放入队列,而后执行 join 阻塞,等到10个包子所有被取完了,才会继续。
消费者取包子,不是直接找生产者,并且从队列里去取。取了以后执行一下 task_done 通知如下生产者看看包子有没有被取完。

import queue, threading, time

def producer():
    while True:
        for i in range(10):
            q.put("包子 %s" % i)
        print("已经放入了10个包子")
        q.join()  # 阻塞,知道包子被取完
        print("包子已经被取完了...")
def consumer(name):
    while True:
        # tmp = q.get()
        print("%s 吃了一个包子 %s" % (name, q.get()))
        q.task_done()  # 通知join方法,取了一个数据了
        time.sleep(1)

if __name__ == '__main__':
    q = queue.Queue()
    p = threading.Thread(target=producer,)
    p.start()
    c1 = threading.Thread(target=consumer, args=('Eric',))
    c1.start()
    c2 = threading.Thread(target=consumer, args=('Lassie',))
    c2.start()
    c3 = threading.Thread(target=consumer, args=('Snoopy',))
    c3.start()

上面的例子中生产数据爽哦速度远远高于消费数据的速度,咱们用阻塞来控制,防止生产了过多的数据来不及消费

举例二

队列自己就有本身的阻塞模式,因为使用了生产者消费者模型实现了解耦。咱们没必要关心生产和消费数据的速度。若是数据消费的快,在取空数据的时候就会进入阻塞,直到生产者把数据加入队列。数据生产过快的状况也是同样,队列满了天然进入阻塞,直到消费者消费了。

import queue, threading, time

# 随意调整一次生产的数量,以及每次sleep的时间间隔
def producer():
    while True:
        for i in range(5):
            q.put("包子 %s" % i)
        print("已经放入了5个包子")
        time.sleep(2)

def consumer(name):
    while True:
        print("%s 吃了一个包子 %s" % (name, q.get()))
        time.sleep(1)

# 这里也是能够多开几个生产者和消费者
if __name__ == '__main__':
    q = queue.Queue(10)  # 此次设定一下队列的大小,若是生产过快,也会阻塞等待消费
    p = threading.Thread(target=producer,)
    p.start()
    c1 = threading.Thread(target=consumer, args=('Eric',))
    c1.start()
    c2 = threading.Thread(target=consumer, args=('Lassie',))
    c2.start()
    c3 = threading.Thread(target=consumer, args=('Snoopy',))
    c3.start()

做业

类 Fabric 主机管理程序开发:

  1. 运行程序列出主机组或者主机列表
  2. 选择指定主机或主机组
  3. 选择让主机或者主机组执行命令或者向其传输文件(上传/下载)
  4. 充分使用多线程或多进程
  5. 不一样主机的用户名密码、端口能够不一样

补充-线程池-concurrent.futures模块

上面讲线程的内容里只有信号量,并无线程池。信号量只是限制同事运行的线程,可是全部线程应该是所有都建立好的。线程占用的资源比较少,这也没太大问题。不过若是要限制同一时间建立的线程的数量,就须要线程池。

从Python3.2开始,标准库为咱们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。

例子:

from concurrent.futures import ThreadPoolExecutor

def ssh_cmd(obj):
    pass

# 执行者:建立线程池
executor = ThreadPoolExecutor(5)
for obj in objs:
    executor.submit(ssh_cmd, obj)  # 第一个参数是方法,以后的参数都是变量
executor.shutdown(wait=True)

项目里只用到这么多。

相关文章
相关标签/搜索