【python之路36】进程、线程、协程相关

线程详细用法请参考:http://www.cnblogs.com/sunshuhai/articles/6618894.html

1、初始多线程html

经过下面两个例子的运行效率,能够得知多线程的速度比单线程的要快不少python

#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
def print_num(arg):
    time.sleep(1)
    print(arg)
#每秒打印一个数字,速度很是慢
for i in range(10):
    print_num(i)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def print_num(arg):
    time.sleep(1)
    print(arg)
#多线程打印速度很快
for i in range(10):
    t = threading.Thread(target=print_num, args=(i,))
    t.start()

 线程是电脑处理事务的最小单元,线程属于进程,每一个进程单独开辟的内存空间,进程越多消耗的内存就越大,每一个进程内能够建立多个线程,可是CPU调度的时候有线程锁,每一个进程只能调度一个线程,因此原则上,计算密集型的用进程,IO密集型的用线程。linux

当t.start()后,默认是等待线程运行结束后(即默认t.setDaemon(False)),主程序才结束的,若是使用:t.setDaemon(True)则程序直接结束,后台运行线程。windows

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
    time.sleep(5)
def f2(f1, num):
    print('开始打印....' + num)
    f1()
    print('结束打印....' + num)
t = threading.Thread(target=f2, args=(f1,'1111'))
t.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t.start()

t1 = threading.Thread(target=f2, args=(f1,'2222'))
t1.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t1.start()

t2 = threading.Thread(target=f2, args=(f1,'3333'))
#t2.setDaemon(True) #程序不等待线程运行结束,主程序运行完毕后会直接终止程序
t2.start()

 t.join(2) ,若是参数为空,则默认当改线程执行结束后才继续向后执行,若是参数为2,则最多等待2秒,若是1秒运行完,则1秒后继续运行,若是两秒内未运行完毕,则也再也不等待,主程序继续向下运行,原来的线程后台继续运行。以下面程序运行4秒后会直接结束:数组

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
    time.sleep(100)
    print('程序结束....')

t = threading.Thread(target=f1)
t.setDaemon(True)
t.start()
t.join(2)  #最多等待2秒,后继续向下运行

t1 = threading.Thread(target=f1)
t1.setDaemon(True)
t1.start()
t1.join(2)  #最多等待2秒,后继续向下运行

2、线程锁,rlock,若是有一个变量num = 0,多个线程去让这个变量自增1,那么可能两个线程同时去拿num的初始值,都自增了1。因此就须要线程锁,只有一条线程能处理这个变量,其余线程等待,等这个线程处理完毕后,其余线程才能处理。缓存

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

lock = threading.RLock() #建立线程锁
num = 0
def func():
    lock.acquire() #得到锁
    global num
    num += 1
    time.sleep(1)
    print(num)
    lock.release() #释放锁


t = threading.Thread(target=func)
t.start()

t1 = threading.Thread(target=func)
t1.start()
#结果1秒钟输出1,第2秒钟输出2

3、线程事件,threading.Event多线程

Event是线程间通讯最间的机制之一:一个线程发送一个event信号,其余的线程则等待这个信号。用于主线程控制其余线程的执行。 Events 管理一个flag,这个flag可使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True以前。flag默认为False。并发

Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(若是提供了参数timeout)。app

Event.set() :将标识位设为Ture异步

Event.clear() : 将标识伴设为False。

Event.isSet() :判断标识位是否为Ture。

其实就至关于Event.wait()就至关于遇到了红灯,当用Event.set()设置为True时全部的线程暂停在这,一旦用Event.clear()设置为False,至关于遇到了绿灯,全部线程继续暂停的地方继续向下运行

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
obj_event = threading.Event()
def func(event):
    print('start')
    event.wait()
    print('end')

obj_event.clear() #标识位设置为True至关于打开红灯
for i in range(10):
    t = threading.Thread(target=func, args=(obj_event,))
    t.start()

inp = input('请输入False改变标识位:')
if inp == 'False':
    obj_event.set()  ##标识位设置为False至关于打开绿灯

 4、生产者消费者模型及队列queue

举例来讲,卖包子,厨师作好包子后推动管道内,顾客从另外一端取走,这个模型叫生产消费者模型,优势是:厨师能够不断生产,至关于缓存,经常使用方法:

put  get  get_nowait不等待

import queue

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

q.join()    # 等到队列为kong的时候,在执行别的操做
q.qsize()   # 返回队列的大小 (不可靠,由于队列时刻在发生变化)
q.empty()   # 当队列为空的时候,返回True 不然返回False (不可靠,由于队列时刻在发生变化)
q.full()    # 当队列满的时候,返回True,不然返回False (不可靠,由于队列时刻在发生变化)
q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,能够参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                      #为False时为非阻塞,此时若是队列已满,会引起queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,事后,
                      #若是队列没法给出放入item的位置,则引起 queue.Full 异常
q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,若是队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引起 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,事后,若是队列为空,则引起Empty异常。
q.put_nowait(item) #   等效于 put(item,block=False)
q.get_nowait() #    等效于 get(item,block=False)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading

q = queue.Queue(10)
def product(i):
    q.put(i)
    print(i)
def customer(i):
    q.get(i)
    print(i)

for i in range(12):
    t = threading.Thread(target=product, args=(i,))
    t.start()
for i in range(9):
    t1 = threading.Thread(target=customer, args=(i,))
    t1.start()

 之后写程序的时候须要建立多少线程呢?例若有5000个任务,那么就须要建立5000个线程么,答案确定不是的。例如你的电脑开10个线程是最优的,那么每次最多开10个线程。这就引入线程池的概念,最多建立10个线程,若是大于10个线程,那么先处理10个线程,哪一个线程处理完毕后,再建立一个线程继续处理下一个任务。

python内部没有提供线程池,须要咱们自定义线程池,之后使用中不多本身建立线程,通常都使用线程池。

 

5、进程相关multiprocessing

在windows中是不支持进程的,要想使用进程必须加到if __name__ =="__main__":里面,不然会报错,但不可能全部的程序都加这个,因此建议进程程序在linux下进行调试.

代码从上向下解释是主进程中的主线程作的。

一、建立进程

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(i):
    time.sleep(15)
    print(i)

if __name__ == "__main__":
    p = multiprocessing.Process(target=func, args=(11,))
    p.daemon = True #True表示不等待直接结束主程序;默认为False,等待进程中的线程运行完毕才继续运行
    p.start()
    p.join(2) #表示最多等待2秒

二、进程至关因而从新创造一个房间,房间内的资源所有从新建立一份,即在内存中从新开辟一块内存,其全部变量及对象从新建立一份。

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
li = []
def func(i):
    li.append(i)
    print(li)

if __name__ == "__main__":
    for i in range(10):
        p = multiprocessing.Process(target=func, args=(i,))
        p.start()

#结果输出以下:
# [1]
# [0]
# [2]
# [3]
# [4]
# [6]
# [7]
# [5]
# [8]
# [9]

 三、虽然默认进程之间的数据是不共享的,但能够经过建立数据类型,来实现数据共享

(1)数组的Array特性:数组的类型必须指定,数组的个数指定后不能超出范围

      此方法由于限制太多通常不用

#!usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing

def Foo(i,temp):
    temp[i] = 100 + i
    print(list(temp))

if __name__ == "__main__":
    temp = multiprocessing.Array('i', [11, 22, 33, 44])
    for i in range(4):
        p = multiprocessing.Process(target=Foo, args=(i,temp))
        p.start()

(2)经过建立特殊的字典Manager,来实现不一样进程之间的数据共享,此字典和python普通的字典几乎是同样的(能取值和赋值),但也有区别

 

#!usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing
import time
def Foo(i,dic):
    dic[i] = 100 + i
    print(len(dic))

if __name__ == "__main__":
    m = multiprocessing.Manager()
    dic = m.dict()
    for item in range(3):
        p = multiprocessing.Process(target=Foo, args=(item,dic,))
        p.start()
        #p.join()
    #进程之间数据共享,基层是经过进程之间数据链接实现的,因此若是主进程执行到最后,等待子进程结束,
    #当主进程hold住的时候(即执行到最后),其自进程就没法进行数据共享了,此时用join,或sleep能够解决
    time.sleep(3)

 四、进程池Pool

进程池主要有两个方法:

区别:apply和apply_async,apply是每次只能一个进程进行工做每个任务都是排队进行,至关于进程.join()

        apply_async经过异步的方式,每次进程能够同时工做,并发进行,能够设置回调函数

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(args):
    time.sleep(1)
    print(args + 100)
    return(args + 100)
def f1(args):
    print(args)
if __name__ == "__main__":
    pool = multiprocessing.Pool(5)
    for i in range(50):
        # pool.apply(func, args=(i,))  #逐个进程运行,由于apply只支持一个进程同时工做
        pool.apply_async(func, args=(i,),callback=f1) #同时运行5个进程,异步操做
        #注意上面参数callback是回调函数,调用完func后,再调用f1函数并将func函数的返回值做为参数传给f1
        print('11')  #先打印50个11说明for循环50次是所有执行完毕的,而后写入进程等待
    pool.close() #在进程池.join以前必须加pool.close()-等待进程池全部进程处理完毕或pool.terminate()不等待直接结束
    pool.join()#此句表示进程池中的进程执行完毕后再关闭,若是注释,那么程序直接关闭

6、自定义线程池

一、自定义low版线程池

#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import time
from threading import Thread

class Thread_Pool:
    def __init__(self, maxsize):
        self.que = queue.Queue(maxsize)  #定义队列赋值给对象的变量que
        for i in range(maxsize):   #循环将线程类放进队列
            self.que.put(Thread)
    def get_thread(self):
        return self.que.get()
    def put_thread(self):
        self.que.put(Thread)
def func(t,args):
    time.sleep(1)
    print(args)
    t.put_thread()


t = Thread_Pool(5)
for i in range(50):
    Thread_class = t.get_thread()
    thread = Thread_class(target=func, args=(t,i))  #把t线程池对象传给func,以便拿走线程后随时给线程池补充线程
    thread.start()

二、low版的线程池与进程池相比有几个问题:

一、线程池队列中无论用不用,线程类都被建立,可能存在大于5个线程的状况,由于,拿走1个线程,运行任务,而后增长1个线程,可能拿走的线程没有运行完毕

二、须要手动获得线程类,而后再根据线程类建立线程对象,最后还须要运行线程对象

三、没有回调函数

四、线程池补充是手动补回的

绝版高大上线程,能够直接做为线程池模块使用:

#!usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import time
StopEvent = object()
class TreadPool:
    def __init__(self, max_num, max_tast_num = 0):
        self.max_num = max_num  #最大线程数
        if max_tast_num:  #根据是否制定最大任务数来指定队列程度
            self.q = queue.Queue()  #队列不限定长度
        else:
            self.q = queue.Queue(max_tast_num)  #根据用户指定长度建立队列
        self.generate_list = []   #记录生成的线程
        self.free_list = []   #记录空闲的线程
        self.terminal = False
    def run(self, target, args, callback=None):
        """运行该函数,调用线程池"""
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            #没有空闲线程而且当前建立线程数量小于最大容许线程数量时容许建立线程
            self.creat_thread() #调用建立线程函数
        tast = (target, args, callback)  #将任务打包成为元组放入队列
        self.q.put(tast)

    def creat_thread(self):
        """建立线程,而且运行,这时调用call函数,全部实现均在call函数内"""
        thread = threading.Thread(target=self.call)
        thread.start()
    def call(self):
        """线程调用该函数"""
        current_thread = threading.currentThread()  #获取执行该函数的当前线程
        self.generate_list.append(current_thread)  #将线程加入生成的线程列表

        tast = self.q.get()  #从队列中取出一个任务包

        while tast != StopEvent:
            target, args, backcall = tast  #将元组人物包,赋值给变量
            try:
                result = target(*args)  #执行函数,并将返回值赋值给result
            except Exception as e:
                result = None

            if backcall:
                try:
                    backcall(result)  #执行回调函数,并将result做为参数传给回调函数
                except Exception as e:
                    pass

            self.free_list.append(current_thread) #执行完毕,将当前线程对象加入空闲列表
            if self.terminal: #是否强制终止
                tast = StopEvent
            else:
                tast = self.q.get()  #等待那任务,若是有任务直接循环执行,若是没有则等待,一旦run方法中放入任务则继续执行任务,无需再建立线程
            self.free_list.remove(current_thread) #拿到任务后,清除空闲线程
        else:
            self.generate_list.remove(current_thread)
    def close(self):
        """全部线程所有运行完毕后,中止线程
        call函数运行完毕后,全部的线程此时都在等待拿任务,此时,只要给队列里面添加StopEvent对象则线程就会结束"""
        generate_size = len(self.generate_list)
        while generate_size:
            self.q.put(StopEvent)
            generate_size -= 1
    def terminate(self):
        """不等待线程所有运行完毕任务,直接终止"""
        self.terminal = True  #正在运行的线程运行完毕后会终止
        generate_size = len(self.generate_list)
        while generate_size:  #终止已经在等待的那一部分线程
            self.q.put(StopEvent)
            generate_size -= 1


def func(i):
    time.sleep(1)
    print(i)


pool = TreadPool(5)
for i in range(100):
    pool.run(target=func, args=(i,))
pool.close()

7、协程

什么是协程呢,举个例子:

从网站向下抓图片,请求发出后,2秒后可以返回请求,网站一共100张图片,若是用5个线程,那么这5个线程中2秒共请求5张图片。

若是用一个线程,请求后继续请求下张图片,请求返回后接收图片,那么可能0.01秒就能请求一个,这用能够少用线程,而且效率也很高。

一、协程的基础greenlet

首先用 pip3 install greenlet  安装greelet模块,而后测试下面代码

#!usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet
def f1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def f2():
    print(45)
    gr1.switch()
    print(56)

gr1 = greenlet(f1)
gr2 = greenlet(f2)

gr1.switch() #遇到swith切换到f1函数

#输出结果为:
# 12
# 45
# 34
# 56

二、gevent是基于greenlet进行封装的协程处理模块,系统自动的进行控制,以下面代码

经过:pip3 install gevent 导入gevent模块。

gevent当遇到运行较慢时自动会执行下一个任务,当执行完毕后会回来继续执行。

 

#!usr/bin/env python
# -*- coding:utf-8 -*-

import gevent
def foo():
    print('Runing in foo')
    gevent.sleep(0) #切换为下一个任务
    print('Explicit context switch to foo again')
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch to bar')

gevent.joinall([gevent.spawn(foo),
                gevent.spawn(bar)])

#运行结果以下:
# Runing in foo
# Explicit context to bar
# Explicit context switch to foo again
# Implicit context switch to bar

 

爬虫应用gevent,最主要的应用场景:

#!usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey;monkey.patch_all()
import gevent
import requests
def f(url):
    print('GET:%s' % url)
    resp = requests.get(url)
    data = resp.text
    print(url, len(data))

gevent.joinall([gevent.spawn(f, 'https://www.baidu.com/'),
                gevent.spawn(f,'https://www.yahoo.com/'),
                gevent.spawn(f,'https://taobao.com/'),
                gevent.spawn(f,'http://www.people.com.cn/')])

#返回结果:
# GET:https://www.baidu.com/
# GET:https://www.yahoo.com/
# GET:https://taobao.com/
# GET:http://www.people.com.cn/
# http://www.people.com.cn/ 135707
# https://www.baidu.com/ 2443
# https://taobao.com/ 134324
# https://www.yahoo.com/ 416613
相关文章
相关标签/搜索