并发编程知识点剖析

 

  并发编程知识点剖析python

一.  mysql

  进程(Process):是系统进行资源分配和调度的基本单位,是操做系统结构的基础,进程是线程的容器。linux

  线程(Threading): 一条流水线的工做过程,cpu最小执行单位git

  线程与进程的区别能够概括为如下4点:github

      1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见, 进程之间是空间隔离的,一个 进程里的线程之间共享内存
      2)通讯:进程间通讯IPC,线程间能够直接读写进程数据段(如全局变量)来进行通讯——须要进程同步和 互斥手段的辅助,以保证数据的一致性。(就相似进程中的锁的做用)
      3)调度和切换:线程上下文切换比进程上下文切换要快得多。
      4)在多线程操做系统中(如今我们用的系统基本都是多线程的操做系统),进程不是一个可执行的实体,正去执行程序的不是进程,是线程,你能够理解进程就是一个线程的容器。

  并行:同时运行,只有具有多个cpu才能实现并行,利用了多核,利用了多核,多个任务真正的在同时运行sql

     将多个cpu必须成高速公路上的多个车道,进程就比如每一个车道上行驶的车辆,并行就是说,你们在本身的车道上行驶,会不影响,同时在开车。这就是并行编程

  并发:伪并行,也提升了效率,遇到IO就切换,充分的利用了IO时间json

       即看起来是同时运行。单个cpu+多道技术就能够实现并发,(并行也属于并发)windows

  同步 : 要等待任务执行结果,才能进行下一个任务,其实就是一个程序结束才执行另一个程序,串行的,不必定两个程序就有依赖关系。数组

  异步 : 不须要等待任务的执行结果,继续执行本身的任务,不须要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工做,依赖的任务也当即执行,只要本身完成了整个任务就算完成了。

  阻塞 : 等待某个事件的发生而没法继续执行,阻塞的方法:input、time.sleep,socket中的recv、accept等等。 

  非阻塞 : 不等待

 

二.建立方式

#当前文件名称为test.py
# from multiprocessing import Process
#
# def func():
#     print(12345)
#
# if __name__ == '__main__': #windows 下才须要写这个,这和系统建立进程的机制有关系,不用深究,记着windows下要写就好啦
#     #首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程咱们称为主进程
#
#     p = Process(target=func,) #将函数注册到一个进程中,p是一个进程对象,此时尚未启动进程,只是建立了一个进程对象。而且func是不加括号的,由于加上括号这个函数就直接运行了对吧。
#     p.start() #告诉操做系统,给我开启一个进程,func这个函数就被咱们新开的这个进程执行了,而这个进程是我主进程运行过程当中建立出来的,因此称这个新建立的进程为主进程的子进程,而主进程又能够称为这个新进程的父进程。
          #而这个子进程中执行的程序,至关于将如今这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就至关于当前这个文件,被另一个py文件import过去并执行了。
          #start并非直接就去执行了,咱们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,而且在这个三个状态之间不断的转换,等待cpu执行时间片到了。
#     print('*' * 10) #这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,咱们称为异步
进程的建立方式一
class MyProcess(Process): #本身写一个类,继承Process类
    #咱们经过init方法能够传参数,若是只写一个run方法,那么无法传参数,由于建立对象的是传参就是在init方法里面,面向对象的时候,咱们是否是学过
    def __init__(self,person):
        super().__init__()
        self.person=person
    def run(self):
        print(os.getpid())
        print(self.pid)
        print(self.pid)
        print('%s 正在和女主播聊天' %self.person)
    # def start(self):
    #     #若是你非要写一个start方法,能够这样写,而且在run方法先后,能够写一些其余的逻辑
    #     self.run()
if __name__ == '__main__':
    p1=MyProcess('Jedan')
    p2=MyProcess('太白')
    p3=MyProcess('alexDSB')

    p1.start() #start内部会自动调用run方法
    p2.start()
    # p2.run()
    p3.start()


    p1.join()
    p2.join()
    p3.join()
进程的建立方式二

 

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('太白',))
    t.start()
    print('主线程')
线程的建立方式一
import time
from threading import Thread
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('太白')
    t.start()
    print('主线程'
线程的建立方式二

 

三.守护进程,守护线程

  必定要在p.start()前设置,设置p为守护进程(守护线程),禁止p建立子进程(子线程),而且父进程代码执行结束,p即终止运行

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)
        time.sleep(3)
if __name__ == '__main__':
    p=Myprocess('太白')
    p.daemon=True #必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
    p.start()
    # time.sleep(1) # 在sleep时linux下查看进程id对应的进程ps -ef|grep id
    print('')
进程设置守护进程
from threading import Thread
from multiprocessing import Process
import time
def func1():
    while True:
        print(666)
        time.sleep(0.5)
def func2():
    print('hello')
    time.sleep(3)

if __name__ == '__main__':
    # t = Thread(target=func1,)
    # t.daemon = True  #主线程结束,守护线程随之结束
    # # t.setDaemon(True) #两种方式,和上面设置守护线程是同样的
    # t.start()
    # t2 = Thread(target=func2,) #这个子线程要执行3秒,主线程的代码虽然执行完了,可是一直等着子线程的任务执行完毕,主线程才算完毕,由于经过结果你会发现我主线程虽然代码执行完毕了,\
    # 可是主线程的的守护线程t1还在执行,说明什么,说明个人主线程尚未完毕,只不过是代码执行完了,一直等着子线程t2执行完毕,我主线程的守护线程才中止,说明子线程执行完毕以后,个人主线程才执行完毕
    # t2.start()
    # print('主线程代码执行完啦!')
    p = Process(target=func1,)
    p.daemon = True
    p.start()

    p2 = Process(target=func2,)
    p2.start()
    time.sleep(1) #让主进程等1秒,为了能看到func1的打印效果
    print('主进程代码执行完啦!') #经过结果你会发现,若是主进程的代码运行完毕了,那么主进程就结束了,由于主进程的守护进程p随着主进程的代码结束而结束了,守护进程被回收了,这和线程是不同的,主线程的代码完了并不表明主线程运行完毕了,须要等着全部其余的非守护的子线程执行完毕才算完毕
线程设置守护线程

  信号量(Semaphore)

  Semaphore管理一个内置的计数器,
  每当调用acquire()时内置计数器-1;
  调用release() 时内置计数器+1;
  计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 占到一间ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模拟每一个人在ktv中待的时间不一样
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
进程信号量
from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
线程信号量

  事件

  事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。
  event.isSet():返回event的状态值;
  event.wait():若是 event.isSet()==False将阻塞线程;
  event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;
  event.clear():恢复event的状态值为False。

from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #建立一个事件对象
print(e.is_set())  #is_set()查看一个事件的状态,默认为False,可经过set方法改成True
print('look here!')
# e.set()          #将is_set()的状态改成True。
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可经过set方法改成Tr
# e.clear()        #将is_set()的状态改成False
# print(e.is_set())#is_set()查看一个事件的状态,默认为False,可经过set方法改成Tr
e.wait()           #根据is_set()的状态结果来决定是否在这阻塞住,is_set()=False那么就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的状态 set-->True   clear-->False
#is_set     用来查看一个事件的状态
#wait       依据事件的状态来决定是否阻塞 False-->阻塞  True-->不阻塞
进程 事件应用
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('连接超时') #本身发起错误
        print('<%s>第%s次尝试连接' % (threading.current_thread().getName(), count))
        event.wait(0.5) #
        count+=1
    print('<%s>连接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
    t1 = random.randint(0,3)
    print('>>>>',t1)
    time.sleep(t1)
    event.set()
if __name__ == '__main__':
    event=Event()
    check = Thread(target=check_mysql)
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)

    check.start()
    conn1.start()
    conn2.start()
线程 事件应用

  数据共享(Manager模块)

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加锁而操做共享的数据,确定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
进程之间数据不共享
from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n  #修改全局变量的值
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫无疑问子进程p已经将本身的全局的n改为了0,但改的仅仅是它本身的,查看父进程的n仍然为100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()   #必须加join,由于主线程和子线程不必定谁快,通常都是主线程快一些,全部咱们要等子线程执行完毕才能看出效果
    print('',n) #查看结果为0,由于同一进程内的线程之间共享进程内的数据
# 经过一个global就实现了全局变量的使用,不须要进程的IPC通讯方法
线程之间数据共享

  队列(queue)

 

q = Queue([maxsize]) 
建立共享的进程队列。maxsize是队列中容许的最大项数。若是省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还须要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具备如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一个项目。若是q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 若是设置为False,将引起Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。若是在制定的时间间隔内没有项目变为可用,将引起Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
将item放入队列。若是队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。若是设置为False,将引起Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引起Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,由于在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引起NotImplementedError异常。


q.empty() 
若是调用此方法时 q为空,返回True。若是其余进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
若是q已满,返回为True. 因为线程的存在,结果也多是不可靠的(参考q.empty()方法)。。
队列的方法介绍

 

#看下面的队列的时候,按照编号看注释
import time
from multiprocessing import Process, Queue

# 8. q = Queue(2) #建立一个Queue对象,若是写在这里,那么在windows还子进程去执行的时候,咱们知道子进程中还会执行这个代码,可是子进程中不可以再次建立了,也就是这个q就是你主进程中建立的那个q,经过咱们下面在主进程中先添加了一个字符串以后,在去开启子进程,你会发现,小鬼这个字符串还在队列中,也就是说,咱们使用的仍是主进程中建立的这个队列。
def f(q):
    # q = Queue() #9. 咱们在主进程中开启了一个q,若是咱们在子进程中的函数里面再开一个q,那么你下面q.put('姑娘,多少钱~')添加到了新建立的这q里里面了
    q.put('姑娘,多少钱~')  #4.调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    # print(q.qsize()) #6.查看队列中有多少条数据了

def f2(q):
    print('》》》》》》》》')
    print(q.get())  #5.取数据
if __name__ == '__main__':
    q = Queue() #1.建立一个Queue对象
    q.put('小鬼')

    p = Process(target=f, args=(q,)) #2.建立一个进程
    p2 = Process(target=f2, args=(q,)) #3.建立一个进程
    p.start()
    p2.start()
    time.sleep(1) #7.若是阻塞一点时间,就会出现主进程运行太快,致使咱们在子进程中查看qsize为1个。
    # print(q.get()) #结果:小鬼
    print(q.get()) #结果:姑娘,多少钱~
    p.join()
进程队列用法

class queue.Queue(maxsize=0) #先进先出

先进先出示例代码

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue q=queue.LifoQueue() #队列,相似于栈,栈咱们提过吗,是否是先进后出的顺序啊 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 结果(后进先出): third second first '''
先进后出示例代码

  class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(一般是数字,也能够是非数字之间的比较),数字越小优先级越高 q.put((-10,'a')) q.put((-5,'a')) #负数也能够 # q.put((20,'ws')) #若是两个值的优先级同样,那么按照后面的值的acsii码顺序来排序,若是字符串第一个数元素相同,比较第二个元素的acsii码顺序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,能够是元祖,也是经过元素的ascii码顺序来排序  q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): '''
优先级队列示例代码


  管道

  会致使数据不安全

from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello 妹妹") #子进程发送了消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #创建管道,拿到管道的两端,双工通讯方式,两端均可以收发消息
    p = Process(target=f, args=(child_conn,)) #将管道的一段给子进程
    p.start() #开启子进程
    print(parent_conn.recv()) #主进程接受了消息
    p.join()
View Code


四.生产者消费者模型

    #程序中有两类角色
        一类负责生产数据(生产者)
        一类负责处理数据(消费者)    
    #引入生产者消费者模型为了解决的问题是:
        平衡生产者与消费者之间的工做能力,从而提升程序总体处理数据的速度     
    #如何实现:
        生产者<-->队列<——>消费者
    #生产者消费者模型实现类程序的解耦和

 

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))



if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨头',q))
    p3=Process(target=producer,args=('泔水',q))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必须保证生产者所有生产完毕,才应该发送结束信号
    p2.join()
    p3.join()
    q.put(None) #有几个消费者就应该发送几回结束信号None
    q.put(None) #发送结束信号
    print('')
多个生产者消费者实现

 

五.锁 GIL lock(同步锁\互斥锁) 递归锁 (RLock),

 

  GIL 与Lock是两把锁,保护的数据不同,前者是解释器级别的(固然保护的就是解释器级别的数据,好比垃圾回收的数据),后者是保护用户本身开发的应用程序的数据,很明显GIL不负责     这件事,只能用户自定义加锁处理,即Lock

#注意:首先在当前文件目录下建立一个名为db的文件
#文件db的内容为:{"count":1},只有这一行数据,而且注意,每次运行完了以后,文件中的1变成了0,你须要手动将0改成1,而后在去运行代码。
#注意必定要用双引号,否则json没法识别
#加锁保证数据安全,不出现混乱
from multiprocessing import Process,Lock
import time,json,random

#查看剩余票数
def search():
    dic=json.load(open('db')) #打开文件,直接load文件中的内容,拿到文件中的包含剩余票数的字典
    print('\033[43m剩余票数%s\033[0m' %dic['count'])

#抢票
def get():
    dic=json.load(open('db'))
    time.sleep(0.1)       #模拟读数据的网络延迟,那么进程之间的切换,致使全部人拿到的字典都是{"count": 1},也就是每一个人都拿到了这一票。
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)   #模拟写数据的网络延迟
        json.dump(dic,open('db','w'))
        #最终结果致使,每一个人显示都抢到了票,这就出现了问题~
        print('\033[43m购票成功\033[0m')
    else:
        print('sorry,没票了亲!')
def task(lock):
    search()
    #由于抢票的时候是发生数据变化的时候,全部咱们将锁加加到这里
    lock.acquire()
    get()
    lock.release()
if __name__ == '__main__':
    lock = Lock() #建立一个锁
    for i in range(3): #模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,)) #将锁做为参数传给task函数
        p.start()

#看结果分析:只有一我的抢到了票
# 剩余票数1
# 剩余票数1
# 剩余票数1
# 购票成功   #幸运的人儿
# sorry,没票了亲!
# sorry,没票了亲!
多进程加锁 抢票系统实现

 

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条'%name)
    fork_lock.acquire()
    print('%s 抢到了叉子'%name)
    print('%s 吃面'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了叉子' % name)
    time.sleep(1) 
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['taibai','wulaoban']:
    t1 = Thread(target=eat1,args=(name,))
    t1.start()
for name in ['alex','peiqi']:
    t2 = Thread(target=eat2,args=(name,))
    t2.start()
递归锁解决死锁现象


六.进程池,线程池

p.apply(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
'''须要强调的是:此操做并不会在全部池工做进程中并执行func函数。若是要经过不一样参数并发地执行func函数,必须从不一样线程调用p.apply()函数或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一个池工做进程中执行func(*args,**kwargs),而后返回结果。
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操做,不然将接收其余异步操做中的结果。'''
    
p.close():关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成

P.jion():等待全部工做进程退出。此方法只能在close()或teminate()以后调用

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具备如下方法
obj.get():返回结果,若是有必要则等待结果到达。timeout是可选的。若是在指定时间内尚未到达,将引起一场。若是远程操做中引起了异常,它将在调用此方法时再次被引起。
obj.ready():若是调用完成,返回True
obj.successful():若是调用完成且没有引起异常,返回True,若是在结果就绪以前调用此方法,引起异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():当即终止全部工做进程,同时不执行任何清理或结束任何挂起工做。若是p被垃圾回收,将自动调用此函数
方法
#一:使用进程池(异步调用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)
        # s = res.get() #若是直接用res这个结果对象调用get方法获取结果的话,这个程序就变成了同步,由于get方法直接就在这里等着你建立的进程的结果,第一个进程建立了,而且去执行了,那么get就会等着第一个进程的结果,没有结果就一直等着,那么主进程的for循环是没法继续的,因此你会发现变成了同步的效果
    print("==============================>") #没有后面的join,或get,则程序总体结束,进程池中的任务还没来得及所有执行完也都跟着主进程一块儿结束了

    pool.close() #关闭进程池,防止进一步操做。若是全部操做持续挂起,它们将在工做进程终止前完成
    pool.join()   #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证实结果已经计算完毕,剩下的事情就是调用每一个对象下的get方法去获取结果
    for i in res_l:
        print(i.get()) #使用get来获取apply_aync的结果,若是是apply,则没有get方法,由于apply是同步执行,马上获取结果,也根本无需get

#二:使用进程池(同步调用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另一个
    print("==============================>")
    pool.close()
    pool.join()   #调用join以前,先调用close函数,不然会出错。执行完close后不会有新的进程加入到pool,join函数等待全部子进程结束

    print(res_l) #看到的就是最终的结果组成的列表
    for i in res_l: #apply是同步的,因此直接获得结果,没有get()方法
        print(i)
进程池 同步 异步 操做

 

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
爬虫实例
import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主进程:',os.getpid())
    p = Pool(5)
    #args里面的10给了func1,func1的返回值做为回调函数的参数给了callback对应的函数,不能直接给回调函数直接传参数,他只能是你任务函数func1的函数的返回值
    # for i in range(10,20): #若是是多个进程来执行任务,那么当全部子进程将结果给了回调函数以后,回调函数又是在主进程上执行的,那么就会出现打印结果是同步的效果。咱们上面func2里面注销的时间模块打开看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#结果
# 主进程: 11852  #发现回调函数是在主进程中完成的,其实若是是在子进程中完成的,那咱们直接将代码写在子进程的任务函数func1里面就好了,对不对,这也是为何称为回调函数的缘由。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
进程池回调函数的使用
#server>>>>>>>>>>

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('进程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

#client.>>>>>>>>>>>>>.

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
进程池版socket聊天
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s打印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默认通常起线程的数据不超过CPU个数*5
# tpool = ProcessPoolExecutor(max_workers=5) #进程池的使用只须要将上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其余都不用改
#异步执行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交执行函数,返回一个结果对象,i做为任务函数的参数 def submit(self, fn, *args, **kwargs):  能够传任意形式的参数
    t_lst.append(t)  #
    # print(t.result())
    #这个返回的结果对象t,不能直接去拿结果,否则又变成串行了,能够理解为拿到一个号码,等全部线程的结果都出来以后,咱们再去经过结果对象t获取结果
tpool.shutdown() #起到原来的close阻止新任务进来 + join的做用,等待全部的线程执行完毕
print('主线程')
for ti in t_lst:
    print('>>>>',ti.result())

# 咱们还能够不用shutdown(),用下面这种方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每一个两秒去去一次结果,哪一个有结果了,就能够取出哪个,想表达的意思就是说不用等到全部的结果都出来再去取,能够轮询着去取结果,由于你的任务须要执行的时间很长,那么你须要等好久才能拿到结果,经过这样的方式能够将快速出来的结果先拿出来。若是有的结果对象里面尚未执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪个的结果,能够经过枚举enumerate来搞,记录你是哪个位置的结果对象的结果已经被取过了,取过的就再也不取了

#结果分析: 打印的结果是没有顺序的,由于到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,可是最后的咱们经过结果对象取结果的时候拿到的是有序的,由于咱们主线程进行for循环的时候,咱们是按顺序将结果对象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主线程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
ThreadPoolExecutor的简单使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,须要用obj.result()拿到结果
进程池回调函数的使用


七. 协程

  协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。、

   总结协程特色:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))

   Greenlet

  若是咱们在单个线程内有20个任务,要想实如今多个任务之间切换,使用yield生成器的方式过于麻烦(须要先获得初始化一次的生成器,而后再调用     send。。。很是麻烦),而使用greenlet模块能够很是简单地实现这20个任务直接的切换

#安装 pip3 install greenlet
复制代码
复制代码
 
  #真正的协程模块就是使用greenlet完成的切换
from greenlet import greenlet def eat(name): print('%s eat 1' %name) #2 g2.switch('taibai') #3 print('%s eat 2' %name) #6 g2.switch() #7 def play(name): print('%s play 1' %name) #4 g1.switch() #5 print('%s play 2' %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai')#能够在第一次switch时传入参数,之后都不须要 1

   Gevent

   #安装
  pip3 install gevent

 

用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束,上面只是建立协程对象,这个join才是去执行

g2.join() #等待g2结束  有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,可是你会发现,若是g2里面的任务执行的时间长,可是不写join的话,就不会执行完等到g2剩下的任务了


#或者上述两步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

from gevent import monkey;monkey.patch_all() #必须写在最上面,这句话后面的全部阻塞所有可以识别了

import gevent  #直接导入便可
import time
def eat():
    #print()  
    print('eat food 1')
    time.sleep(2)  #加上mokey就可以识别到time模块的sleep了
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)  #来回切换,直到一个I/O的时间结束,这里都是咱们个gevent作得,再也不是控制不了的操做系统了。
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
gevent使用
from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。
gevent的同步异步
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
协程在爬虫中的应用

 

八.IO多路复用

    同步:提交一个任务以后要等待这个任务执行完毕

    异步:只管提交任务,不等待这个任务执行完毕就能够去作其余的事情

    阻塞:recv、recvfrom、accept,线程阶段  运行状态-->阻塞状态-->就绪

    非阻塞:没有阻塞状态

  在一个线程的IO模型中,咱们recv的地方阻塞,咱们就开启多线程,可是无论你开启多少个线程,这个recv的时间是否是没有被规避掉,无论是多线程仍是多进程都没有规避掉这个IO时间。

  selectors模块

 

#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至关于网select的读列表里append了一个文件句柄server_fileobj,而且绑定了一个回调函数accept

while True:
    events=sel.select() #检测全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
selectors模块代码实例
相关文章
相关标签/搜索