Python说文解字_Python之多任务_02

  第三部分:Semaphore控制进入数量的锁html

   有时候可能须要运行多个工做线程同时访问一个资源,但要限制总数。例如,链接池支持同时链接,可是数目多是固定的,或者一个网络应用可能支持固定数据的并发下载。这些链接就可使用semaphore来进行管理。网络

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url):
        super().__init__()
        self.url = url

    def run(self):
        time.sleep(2)
        print("got html text success")

class UrlProducer(threading.Thread):
    def run(self):
        for i in range(20): # 好比抓取20个网站信息
            html_thread = HtmlSpider("http://baidu.com/{}".format(i))
            html_thread.start()

if __name__ == '__main__':
    url_producer = UrlProducer()
    url_producer.start()

  咱们能够看到结果是20个并发去执行的,若是咱们想一次并发3个线程如何处理呢?多线程

   更改代码以下:并发

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20): # 好比抓取20个网站信息
            self.sem.acquire()
            html_thread = HtmlSpider("http://baidu.com/{}".format(i),self.sem)
            html_thread.start()

if __name__ == '__main__':
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

 

   其实semaphore内部是调用了一个condition。咱们注意semaphore也是必须有acquire方法和release方法。框架

  另外,咱们发现Queue内部也是调用了不少condition的方法。ide

 

问:前面介绍了不少同步的方法,其余一些软件都有池的概念,Python也具有吗?函数

答:固然,Python有两个池子,一个叫线程池,一个叫进程池,后续咱们讲到进程的时候回搠进程池。如今先说线程池。网站

  线程池就是concurrent模块包,是在Python3.2时候引入的。这个池是很是顶层的,对于咱们进行线程和进程池编码是非好的。并且接口会高度的一致。ui

  一个问题:为何要有线程池?编码

  目的很简单:就是很是容易的管理线程,线程池本身去调度新的线程去使用,线程池过大的时候会阻塞,知道最新的线程空出来。它不只仅起到了数量控制,若是咱们在主线程当中能够获取某一个线程的状态,或者某一个任务的状态,或者返回值,这样就会变得很是简单。另外,当一个线程完成的时候咱们主线程就会立马知道。futures可让多线程和多进程编码接口一直。

from concurrent.futures import ThreadPoolExecutor
import time


def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# 经过submit函数提交执行的函数到线程池中,submit是当即返回
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(2))

# done方法用于断定某我的物是否完成
print(task1.done()) # 断定咱们的函数是否执行成功的
print(task2.cancel()) # 如我咱们执行的状态是执行中是cancel不了的
time.sleep(3)
print(task1.done()) # 断定咱们的函数是否执行成功的

# result方法能够获取task的执行结果
print(task1.result())

  运行结果:

False
False
get page 2 success
get page 3 success
True
3

 

  这里面咱们用到了ThreadPoolExecutor的类,其中规定了运行的线程数量。

  done()方法:断定咱们的函数是否执行成功

  result()方法:返回函数是否成功执行

  cancel()方法:取消一个线程任务(可是若是咱们的任务是在执行中,是没法cancel掉的)

 

  另外,咱们在想一下,咱们想批量的进行提交而且知道提交是否成功怎么写。

  这个时候咱们须要导入as_completed的模块,as_completed是一个生成器(咱们知道生成器最好的方式就用for循环提取出来),咱们再用比较高端的推导式的方式进行提交。

from concurrent.futures import ThreadPoolExecutor,as_completed
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
all_task = [executor.submit(get_html,(url)) for url in urls]
for future in as_completed(all_task):
    data = future.result()  #
    print("get {} page success".format(data))

 

  另外,咱们还能够经过executor自己的map方法来完成task

# 经过executor获取已经完成的task
for data in executor.map(get_html,urls):
    print("get {} page success".format(data))
    
# get page 2 success
# get page 3 success
# get 3 page success
# get 2 page success
# get page 4 success
# get 4 page success

  可是,略有有点儿差异,上面是完成一个打印一个。

 

  再加一个wait等待。这个命令其实也是很是经常使用并且也是很是好的模块。wait模块是等待某一个函数结束再执行下面的内容。另外wait模块有有个一传参return_when=后面有四种方式:

  FIRST_COMPLETED 当地一个执行完毕

  FIRST_EXCEPTION 

  ALL_COMPLETED

  _AS_COMPLETED

executor = ThreadPoolExecutor(max_workers=2)
urls = [3,2,4]
all_task = [executor.submit(get_html,(url)) for url in urls]
wait(all_task,return_when='FIRST_EXCEPTION')
print("main over")

 

  

  小结一下:

  * 这样关于线程池,咱们知道最经常使用的三个模块:ThreadPoolExecutor, as_completed(注意是一个迭代器), wait。其中方法有submit,result,cancel,done等方法。wait也是能够传递参数的。

  * concurrent.futures 中的Future对象咱们通常叫作将来对象,但实际上呢,更形象的说叫task返回容器,task执行结果都会放入里面。

 

问:线程的内容真是很多,功能也是很多,可是仍是挺有规律的。

答:其实线程这块儿,还有几个内容,都很是简单,讲解完毕咱们最线程进行总结,而后进入进程方面的讲解。

  补充1(threadLocal模块):咱们发现若是两个线程同时操做一个函数的时候,会形成函数中的变量混乱的状况。咱们能够经过threadLocal的方法,也叫作线程特定数据。给每个线程单独去分配一个本地变量能够防止这个问题:代码以下。

import threading

num = 0
local = threading.local()

def run(x,n):
    x = x + n
    x = x - n

def func(n):
    local.value = num
    for i in range(1000000):
        run(local.value,n)
    print("%s--%d" %(threading.current_thread().getName(),local.value))

if __name__ == '__main__':
    t1 = threading.Thread(target=func,args=(6,))
    t2 = threading.Thread(target=func,args=(9,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    # 
    # Thread - 1 - -0
    # Thread - 2 - -0

 

  

  补充2(barrier模块):这个单词是障碍的意思,也就是说像是一个“班车”凑够了多少个“人”才发车。这里就是凑够了多少个线程再进行线程计算,不过这个方法有一个维内托,若是数量不够时候,会一直停在那里等待线程。这种方法平时用的也不是不少。它的方法也是wait,代码以下:

import threading,time

bar = threading.Barrier(4)

def run():
    print("{} -- start".format(threading.current_thread().getName()))
    time.sleep(1)
    bar.wait()
    print("{} -- end".format(threading.current_thread().getName()))

if __name__ == '__main__':
    for i in range(6):
        threading.Thread(target=run).start()

   咱们发现:分配6个线程,其实给的是4个,线程6个并发了以后,等待2个结束并发,一直等不到,就停在那里了

 

  补充3(Timer模块):这个模块很好理解,就是控制线程并发的事件,这是一个定时器,这个定时的事件结束的时候再去开启。

import threading


def run():
    print("Thomas is running")

t = threading.Timer(5,run)

print("父线程开始......")
t.start()
t.join()
print("父线程结束......")

 

  

  补充4(Event模块):这个模块很是简单,咱们使用手工的方式进行线程之间上锁解锁的方式进行通信,咱们能够调用线程的事件(由于线程行动自己就是一个事件),让上一个线程事件等待时间触发。和Condition模块很是的相似。

import threading,time


def func():
    event = threading.Event()
    def run():
        for i in range(5):
            event.wait()
            event.clear()
            print("Thomas is running")
    threading.Thread(target=run).start()

    return event

e = func()
for i in range(5):
    e.set()
    time.sleep(2)

  分析代码咱们能够看出.wait是阻塞等待时间的触发。clear是重置的意思。set是设定的内容。

 

  补充5(enumerate模块):略

 

  总结:如今咱们能够对Python的进程进行一下总结了。

  第一:进程是运行程序最小的操做单元。在IO操做的时候会常常用到。

  第二:Python自己具有GIL(全局解释器锁),因此在CPython的解释下,一个线程放入一个CPU下,在诸如PyPy的Python解释器下,就是一种去GIL话的解释器。

  第三:进程在上面的框架解释下,是线程交替来进行多线程操做的,系统没法自动的调配多核。

  第四:因为线程自己设计的缘由,线程在运行程序后会按照自有的规则释放空间,因为这个释放空间的时间很是短暂,形成程序和程序,数据和数据之间可能产生混乱的状况。所以咱们引入了锁、event、condition等方式进行控制。

  第五:线程有一些概念是成对出现的,正是因为第四条的状况。好比守护和阻塞(daemon和join),wait和clear(event事件),wait和notify(Condition条件),done和wait等。

  第六:线程分主线程和子线程这么一说,wait这个模块实际上是属于小而精的一种阻塞操做方式,另外咱们还能够用with语句来简化代码,用推导式直接进行推送任务到进程中。

  第七:平时咱们也经常使用线程池来让Python自动推送任务到线程当中,submit就是这个动做。

  第八:诸如像ThreadLocal,Event,Timer,semaphore,barrier等小技巧也须要了解。

相关文章
相关标签/搜索