进程池线程池及协程

进程池和线程池

什么是池?程序员

在保证计算机硬件安全的状况下最大限度的利用计算机,池实际上是下降了程序的运行效率,可是保证了计算机硬件的安全(硬件的发展跟不上软件的速度)安全

进程池和线程池就是咱们能够提早在池子里放上固定数量的进程或者线程,等到任务来了,就直接从池子里拿一个进程或线程来处理任务,等任务处理完毕以后,进程或线程也不关闭,而是继续放回池子中等待任务,若是有不少任务须要执行,池中的进程数量不够,任务就要等待以前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增长操做系统的调度难度,还节省了开闭进程的时间,也必定程度上可以实现并发效果。多线程

#进程池
from concurrent.futures import ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor() #能够传参数,若是不传默认是计算机CPU的个数
def task(n):
    print(n)
    time.sleep(2)

if __name__ == '__main__':
    for i in range(20):
        pool.submit(task,i)  # 朝进程池提交任务

#线程池
rom concurrent.futures import ThreadPoolExecutor
import time
import os

pool = ThreadPoolExecutor() #能够传参数指定线程池内的线程个数,若是不传默认是计算机CPU的个数乘5
def task(n):
    print(n)
    time.sleep(2)

if __name__ == '__main__':
    for i in range(20):
        pool.submit(task,i)  # 朝进程池提交任务

提交任务的方式:同步和异步

同步:提交任务以后 原地等待任务的返回结果 期间不作任何事
异步:提交任务以后 不等待任务的返回结果(异步的结果怎么拿???) 直接执行下一行代码并发

from concurrent.futures import ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)
def task(n):
    print(n)
    time.sleep(2)
    return n**2

t_list = []
if __name__ == '__main__':
    for i in range(20):
        res = pool.submit(task,i)
        #print(res.result())  # 原地等待任务的结果(将并发变成了串行)
        t_list.append(res)

pool.shutdown()  # 关闭池子,等待池子中全部的任务执行完毕才能往下执行代码
for p in t_list:
    print('>>>:',p.result())

异步回调机制:当异步提交的任务有返回结果以后,会自动触发回调函数的执行app

from concurrent.futures import ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)
def task(n):
    print(n)
    time.sleep(2)
    return n**2

def call_back(n):
    print('拿到了异步提交的结果:',n.result())

t_list = []
if __name__ == '__main__':
    for i in range(20):
        res = pool.submit(task,i).add_done_callback(call_back)  # # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 马上执行对于的回调函数
        t_list.append(res)

协程

进程:资源单位异步

线程:执行单位socket

协程:单线程下实现并发函数

并发就是切换+保留状态(看起来像同时执行的,就能够称之为并发)工具

协程:彻底 是程序员想象出来的名词,单线程下实现并发spa

程序员本身经过代码本身检测程序中的IO,一旦遇到IO本身经过代码切换,给操做系统的感受就是你这个线程没有任何IO状态。(就是欺骗操做系统,让它觉得你这个程序一直没有IO,从而保证程序在运行态和就绪态来回切换,能够提高代码的运行效率)

切换+保存状态就必定可以提高效率吗???

  当你的任务是iO密集型的状况下 提高效率
  若是你的任务是计算密集型的 下降效率

#经过代码进行验证
#串行并发 0.12008428573608398
import time
def func1():
    for i in range(10000):
        i+1

def func2():
    for i in range(10000):
        i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)

#基于yield保存状态的特色来并发 0.2261803150177002
import time
def func1():
    while True:
        10000+1
        yield

def func2():
    g=func1()
    for i in range(1000000):
        i+1
        next(g)
start = time.time()
func2()
stop=time.time()
print(stop-start)

咱们也能够经过time.sleep来模拟IO,然而yield并不会捕捉到并自动切换,因此咱们找一个可以识别IO操做的工具,因而咱们须要了解一个新的模块,gevent模块。

gevent模块

可是gevent模块并不能自动识别time.sleep等IO状况,因此要手动再配置一个参数

from gevent import monkey
monkey.patch_all() 
# 因为该模块常常被使用 因此建议写成一行,以下
from gevent import monkey;monkey.patch_all()
#验证
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import time

def heng():
    print('')
    time.sleep(1)
    print('')

def ha():
    print('')
    time.sleep(2)
    print('')

def hei():
    print('')
    time.sleep(3)
    print('')

start = time.time()
g1 = spawn(heng) # spawn会检测全部任务
g2 = spawn(ha)
g3 = spawn(hei)
g1.join()
g2.join()
g3.join()

print(time.time()-start)

spawn就像一个列表,第一个任务来了就把它放进去,第二个任务来了也把它放进去,当第一个任务遇到IO操做时,立刻去执行第二个任务,若是第二个任务遇到IO的时候又切换回第一个任务。

利用协程实现TCP单线程的并发

#客户端
import socket
from threading import Thread,current_thread

def client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:

        data = '%s %s'%(current_thread().name,n)
        client.send(data.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
        n += 1
#经过开启多线程来完成多个客户端的并发
for i in range(400):
    t = Thread(target=client)
    t.start()

#服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn

server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):  # 把通讯循环拿出来作成一个函数
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

def ser():  # 把链接循环拿出来作一个函数
    while True:
        conn,addr = server.accept()
        spawn(talk,conn)  # 检测talk

if __name__ == '__main__':
    g1 = spawn(ser)  # 检测 ser
    g1.join()  # 让主线程不会直接结束

I/O模型介绍

阻塞IO

非阻塞IO

 

 IO多路复用

 异步IO

相关文章
相关标签/搜索