目录:1.C/S架构 2.TCP/IP协议 3.socket套接字 4.粘包与解决办法 5.操做系统 6.进程理论 7.开启进程 8.join方法 9.守护进程 10.互斥锁 11.队列Queue 12.生产者消费者模型 13.线程理论 14.开启线程 15.守护线程 16.GIL 17.进程池与线程池 18.死锁与递归锁 19.信号量 20.Event 21.定时器 22.线程queue 23.协程 24.greenlet 25.gevent 26.IO模型 27.IO多路复用与selectors模块 28.socketserverpython
C/S架构linux
C指的是client(客户端软件),S指的是server(服务端软件)web
客户端软件想要基于网络发送一条消息给服务端软件,流程以下:算法
1.客户端产生数据,存放于客户端软件的内存中,调用接口将内存中的数据发送/拷贝到操做系统内存shell
2.客户端操做系统收到数据后,按照客户端软件指定的规则(即协议),调用网卡发送数据编程
3.网络传输数据json
4.服务端调用系统接口,想要将数据从操做系统内存拷贝到本身的内存中windows
5.服务端操做系统收到4的指令后,使用与客户端相同的规则(即协议)从网卡接收到数据,拷贝给服务端软件缓存
TCP/IP协议安全
Transmission Control Protocol/Internet Protocol
传输控制协议/因特网互联协议
五层模型:
1.物理层,光缆、电缆、双绞线、无线电波等,功能:主要是基于电器特性
发送高(1)低(0)电压电信号
2.数据链路层,功能:规定了电信号的分组方式
Ethernet以太网协议,规定了一组电信号构成一个数据包,叫作‘帧’
每个数据帧分为报头(head)和数据(data)
报头 固定由6个字节的发送者/原地址+6个字节的接受者/目标地址+6个字节的数据类型组成,固定18个字节
数据包含最短46个字节,最长1500个字节
因此一个帧最小64字节,最大1518字节,超过限度就分片发送
但凡接入互联网的机器就必须有一块网卡,每块网卡都有独一无二的一个mac地址
mac地址:每块网卡在出厂时都会烧制上世界上惟一的一个mac地址
3.网络层,功能:引入一套新的地址用来区分不一样的广播域/子网,这套地址即网络地址
IP协议,是规定网络地址的协议,ipv4规定网络地址由32位2进制表示,范围是0.0.0.0到255.255.255.255
这一层发出的数据就是:IP头+data,这样一种结构
ip地址+mac地址就标示了全世界独一无二的一台机器
子网掩码,就是表示子网络特征的一个参数,形式上与IP地址同样,由32位2进制组成
其网络部分所有为1,主机部分所有为0,子网掩码是用来标识一个ip地址的哪些位表明网络位,
哪些位表明主机位,区分网络为和主机位是为了划分子网,避免广播风暴和地址浪费
A类ip地址:1个网络位3个主机位1.0.0.0-126.0.0.0
B类ip地址:2个网络位2个主机位128.0.0.0-191.255.255.255
C类ip地址:3个网络位1个主机位192.0.0.0-223.255.255.255
0.0.0.0对应当前主机,255.255.255.255是当前子网的广播地址
4.传输层,功能:创建端口到端口的通讯
网络层的ip帮咱们区分子网,数据链路层的mac地址帮咱们找到主机
端口即应用程序与网卡关联的编号,端口范围0-65535,其中0-1023为系统占用端口
找到端口就找到软件了,IP+端口就能找到全世界惟一一个软件
tcp协议,可靠传输,理论上tcp数据包没有长度限制,但为了保证网络效率,一般不超过ip数据包长度
流式协议,它的数据是数据流,该协议没有封包,因此极可能出现粘包,由于消息边界不明确
以太网头-ip头-tcp头-数据
tcp协议是可靠协议,由于在数据传输过程当中,只要不获得确认,就从新发送数据,直到获得确认
tcp要先创建双向通路,tcp是流式协议,双向通路就像管道,建好双向通路才能够发数据
tcp创建双向通路与断开双向通路须要三次握手四次挥手
为何是四次挥手呢,二、3可能不是同时的,2是确认客户端断链接的请求的,这个能够直接回复
可是3是服务端发起断链接的请求,须要服务端到客户端的数据发完才能断链接,因此二、3可能不是同时的,
那么若是这两步合并就会出现数据没传完的状况
udp协议,不可靠传输
udp不须要创建通路,直接就发送数据
udp协议自带封包,就算内容是空的,也会在外面包一层,这样的话udp的数据包就是有边界的,不会粘包
以太网头-ip头-udp头-数据
tcp协议虽然安全性高,可是网络开销大,而udp虽然没有提供安全机制,但网络开销小
在如今这个网络安全已经相对较高的状况下,为了保证传输速率,优先考虑udp协议
5.应用层
通过层层包装后的数据结构是:Ethernet头 + ip头 + tcp/udp头 + data
socket:就是套接字
什么是socket?socket是应用层与tcp/ip协议族通讯的中间软件抽象层,它是一组接口
基于tcp的套接字实例:
server端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#买手机
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
phone.bind(('127.0.0.1',8080)) #绑定手机卡 注意这里,传进去的是一个由ip和端口组成的元组
这个地方127.0.0.1就限制了客户端服务端必须都在本机上才能用
phone.listen(5) #开机 5是挂起数量,ip、端口和挂起数量都应该写在配置文件中,不该该写死在这里
while True: #连接循环,是为了能够循环服务,一个客户端终止了就等着再服务新的客户端
conn,addr=phone.accept() #等待电话链接 这里拿到的是一个由两个元素组成的元组,分别赋值给conn,addr
conn就是客户端与服务端之间创建的通路,addr就是客户端的地址
print('电话线路是',conn)
print('客户端的手机号是',addr)
while True: #通讯循环,对应客户端的通讯循环,发数据收数据
try: #应对windows系统
data=conn.recv(1024) #收消息 从本身的缓存中收最大1024个字节的数据
if not data:break #若是一直收到空的内容,linux系统会进入死循环,这样应对
具体讲是这样的,正常状况下不会发空过来,若是客户端单方面给终止了才会一直发空过来,
那么windows系统上会直接报错,同try...except处理
而linux系统上不会报错会进入死循环,死循环对cpu的占用率极高,因此用if判断这句来处理
print('客户端发来的消息是',data)
conn.send(data.upper())
except Exception:
break
conn.close()
phone.close()
client端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080)) 注意这里,传进去的是一个由ip和端口组成的元组
while True:#通讯循环
msg=input('>>:').strip()
if not msg:continue #判断msg是否为空,为空的话就continue
phone.send(msg.encode('utf-8')) #要指定编码方式,转成bytes格式才能进行网络传输
data=phone.recv(1024)
print(data.decode('utf-8'))
phone.close()
socket套接字方法
socket实例类:socket.socket(family=AF_INET,type=SOCK_STREAM,proto=0,fileno=None)
family(socket家族) 这两个内容能够忽略
socket.AF_INET 用于网络编程,大部分时候都用这个
socket.AF_UNIX 用于本机进程间通信
socket type类型
socket.SOCK_STREAM 用于tcp
socket.SOCK_DGRAM 用于udp
socket.SOCK_RAW 原始套接字
socket.SOCK_RDM 是一种可靠的udp形式,即保证交付数据报但不保证顺序
服务端套接字函数
s.bind() 绑定(主机,端口号)到套接字
s.listen() 开始tcp监听
s.accept() 被动接受tcp客户端的链接,(阻塞式)等待链接的到来
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
客户端套接字函数
s.connect() 主动初始化tcp服务器链接
s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常
公共用途的套接字函数
s.recv() 接收数据
s.send() 发送数据,若是待发送数据量大于己端缓存区剩余空间,数据会丢失
s.sendall() 发送完整的tcp数据,本质上就是循环调用send(),直到发完为止
s.recvfrom() 从套接字接收数据,返回值是(bytes,address)
s.close() 关闭套接字
socket.setblocking(flag),设置socket为非阻塞模式
基于udp的套接字实例: 了解
udp服务端
import socket
udpserver=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
udpserver.bind(('127.0.0.1',8080))
while True:#通信循环 udp协议没有连接,因此没有连接循环
data,client_addr=udpserver.recvfrom(1024)
print(data.decode('utf-8'))
print(client_addr)
msg=input('>>:')
udpserver.sendto(msg.encode('utf-8'),client_addr)
udp客户端
import socket
udpclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
server_ip_port=('127.0.0.1',8080)
while True:#通信循环 udp协议没有连接,因此没有连接循环
inp=input('>>:')
udpclient.sendto(inp.encode('utf-8'),server_ip_port)
data,server_addr=udpclient.recvfrom(1024)
print(data.decode('utf-8'))
send与recv
1.这二者都不是直接接收对方的数据,而是操做本身的操做系统内存,
就是send把内容发给本身的操做系统的内存,由操做系统来发,数据到了后先存在操做系统的内存中,而recv是去操做系统的内存中拿
这二者没有一一对应关系,不须要一个send对应一个recv
2.recv有两个阶段,wait for data阶段和copy data阶段,时间都耗费在第一阶段
而send只有一个copy data阶段
粘包现象与解决办法
tcp协议是面向流的协议,容易出现粘包,而udp是面向消息的协议,每一个udp字段都是一条消息
应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,因此udp不会粘包
粘包问题的发生主要仍是接收方不知道消息之间的界限,不知道一次性提取多少数据
在数据量比较小而且时间间隔很是短的状况下,或数据量很是大超出了接受范围时才可能发生粘包问题
总结:
1.tcp(transport control protocol 传输控制协议)
是面向链接的,面向流的,提供高可靠性服务,收发两端一一成对,为了更有效地收发,
使用了优化算法(Ngale算法),将屡次间隔较短且数据量小的数据合并成一个大的数据块,
而后封包,这样一来,接收方难以分辨数据边界,必须提供科学的拆包机制,
即面向流的通讯没有消息保护边界
2.udp(user datagram protocol 用户数据包协议)
是无链接的,面向消息的,提供高效率的服务,不会使用块的合并优化算法
因为udp支持一对多的模式,因此接收端的skbuff(套接字缓冲区)采用链式结构来记录每一个到达的udp包
每一个udp包有消息头,对接收端来讲,就容易区分处理了
即面向消息的通讯是有消息保护边界的
3.tcp基于数据流,因而收发的消息不能为空,这就须要在客户端服务端都添加空消息处理机制
udp基于数据报,便是输入空内容,那也不是空消息,udp协议会封装上消息头
用json和struct解决粘包实例:
server端
import socket,subprocess,struct,json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
ip_port=('127.0.0.1',8080)
phone.bind(ip_port)
phone.listen(5)
while True:
conn,client_addr=phone.accept()
while True:
try:
# 1.收命令
cmd=conn.recv(1024)
if not cmd:break
# 2. 执行命令,拿到结果,使用os.system是拿不到结果的,须要用subprocess模块,使用管道
res=subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
# err=res.stderr.read()
# if err:
# cmd_res=err
# else:
# cmd_res=res.stdout.read()
# conn.send(cmd_res)
out_res=res.stdout.read()
err_res=res.stderr.read()
data_size=len(out_res)+len(err_res) #数据的长度
head_dic={'data_size':data_size} #作一个字典,数据长度为value
head_json=json.dumps(head_dic) #把包含数据长度的字典转成字符串格式
head_bytes=head_json.encode('utf-8') #把字符串格式的字典转成二进制格式用于网络传输,这个就是报头
#先发送报头长度
head_len=len(head_bytes) #提取报头长度
conn.send(struct.pack('i',head_len)) 使用struct将报头长度转成固定的4个字节发送给客户端,这个i是整型的意思,后面必须跟数字
#发送报头内容
conn.send(head_bytes) #把报头发过去
#最后发送数据部分
#3.把结果返回给客户端
conn.send(out_res)
conn.send(err_res)
except Exception:
break
conn.close()
phone.close()
client端
import socket,struct,json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
ip_port=('127.0.0.1',8080)
phone.connect(ip_port)
while True:
cmd=input('>>:').strip()
if not cmd:continue
phone.send(bytes(cmd,encoding='utf-8'))
#收报头长度
head_struct=phone.recv(4)
head_len=struct.unpack('i',head_struct)[0] 提取报头长度,解包出来是一个元组的格式,索引值为0的位置就是要的报头长度
#收报头内容
head_bytes=phone.recv(head_len) #按照报头长度接收,拿到报头部分,是二进制格式的
head_json=head_bytes.decode('utf-8') #把二进制格式的报头解码成json字符串格式的
head_dic=json.loads(head_json)提取报头内容,把字符串格式的报头内容反序列化为字典格式
data_size=head_dic['data_size'] 获取数据长度
#收数据
recv_size=0
recv_data=b''
while recv_size < data_size:
data=phone.recv(1024)
recv_size+=len(data)
recv_data+=data
print(recv_data.decode('gbk')) #这个地方是个坑,虽然输入cmd的时候用的utf-8,服务端那边解码也是utf-8
可是由于pycharm是运行在windows系统上,系统的编码是gbk,因此要看到显示
须要用gbk解码
phone.close()
是否是能够这样理解,先拿到数据长度,再把数据长度作成字典,再把字典先序列化后编码成二进制格式,这个通过加工处理的‘字典’就是报头,包含数据的长度信息
而后,len拿到报头的长度,把这个长度struck.pack转成固定4个字节,而后分三次发送,第一次发4个字节,再发报头内容,也就是那个特殊的‘字典’,
最后发送数据部分
接收的时候,先接收固定4个字节,把4个字节的内容unpack解包,拿到的就是报头长度的信息,而后根据这个报头长度的信息区接收,
接收到的内容是二进制格式,先解码成字符串格式,再反序列化成本来的字典格式,再用key取值拿到字典里面关于数据长度的信息,
最后根据这个信息去循环收取数据
struck能够有两种模式,i与l,i是整型,l是长整形,l的范围大于i
操做系统
操做系统的做用
1.隐藏丑陋复杂的硬件接口,提供良好的抽象接口
2.管理、调度进程,而且将多个进程对硬件的竞争变得有序
多道技术
1.产生背景:针对单核,实现并发。如今的主机都是多核,每一个核都会利用多道技术
2.空间上的多路复用:如内存中同时有多道程序
3.时间上的多路复用:复用一个cpu的时间片
遇到io切,占用cpu时间过长也切,核心在于切以前将进程的状态保存下来
这样才能保证下次切回来时,能基于上次切走的位置继续运行
4.空间上的复用最大的问题是:
程序之间的内存必须分割,这种分割须要在硬件层面实现,由操做系统控制。若是内存彼此不分割,则一个程序能够访问另一个程序的内存,
首先丧失的是安全性,好比你的qq程序能够访问操做系统的内存,这意味着你的qq能够拿到操做系统的全部权限。
其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操做系统的内存给回收了,则操做系统崩溃。
第一代计算机:真空管和穿孔卡片
第二代计算机:晶体管和批处理系统,现代操做系统的前身在这里出现
第三代计算机:集成电路芯片和多道程序设计,出现了多道技术,可是其操做系统仍是批处理系统,并无使用多道技术,由于尚未解决上面的4中的问题
第四代计算机:我的计算机
进程理论
进程是资源单位,线程是执行单位
进程:
进程就是正在进行的一个过程或者说一个任务。负责执行任务的是cpu
程序仅仅只是一堆代码而已,进程指的是程序的运行过程
同一个程序执行两次那也是两个进程
并发与并行
不管是并发仍是并行,在用户看来都是‘同时’运行的,无论是进程仍是线程,
都只是一个任务而已,真正干活的是cpu,而一个cpu同一时刻只能执行一个任务
并发:伪并行,即看起来是同时运行,单个cpu+多道技术就能够实现
即在一个时间段内有不少任务要作,但cpu同一时刻只能作一个任务,那就先作一会1,
再作一会2,再作一会3....这就保证了每一个人任务都在进行中
并行:同时运行,只有具有多个cpu才能实现并行
开启进程
python中的多线程没法利用多核优点,若是想要充分使用cpu的多核资源python中大部分状况下须要使用多进程
python提供multiprocessing模块,用来开启子进程
与线程不一样,进程间没有任何共享状态,进程修改的数据,改动仅限于该进程内,内存空间是隔离的
Process类
Process([group[,target[,name[,args[,kwargs]]]]])
由该类实例化获得的对象,可用来开启一个子进程
须要使用关键字的方式来指定参数
args指定的为传给target函数的位置参数,是一个元祖形式,必须有逗号
注意:在windows中,Process()必须放在 if __name__ == '__main__'下
参数介绍:
group 参数未使用,值始终未None
target 表示调用对象,即子进程要执行的任务
name 为子进程的名称
args 表示调用对象的位置参数元组,args=(1,2,'egon')
kwargs 表示调用对象的字典,kwargs={'name':'egon','egon':18}
方法介绍:
p.start() 开启进程,并调用该子进程中的p.run()
p.run() 进程启动时运行的方法,正是它去调用target指定的函数,自定义类时必须实现run()
p.terminate() 强制终止进程p,不会进行任何清理操做,也是给操做系统发信号,由操做系统进行清理操做
若是p还建立了子进程,那这样操做后其子进程就变成了僵尸进程
若是p还保存了一个锁,那么将不会被释放,进而致使死锁
p.is_alive() 若是p仍然运行,返回True
p.join([timeout]) 主线程等待p终止,是主线程处于等待状态,p处于运行状态
timeout是可选的超时时间
属性介绍:
p.daemon 默认值是False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,
p也随之终止,而且设为True后不能建立本身的新进程,必须在p.start()以前设置
p.name 进程的名称
p.pid 进程的pid
建立并开启子进程
方式一:调用Process类
import time,random
from multiprocessing import Process # 这个Process必定要是大写开头的才对
def piao(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
#实例化获得四个对象
p1 = Process(target=piao,args=('egon',)) #注意这里args须要传的值是元组,因此,很重要
p2 = Process(target=piao,args=('alex',)) # 也能够用kwargs传字典进去
p3 = Process(target=piao,args=('wupeiqi',))
p4 = Process(target=piao,args=('yuanhao',))
#调用对象下的方法,开启四个进程
p1.start() # 仅仅只是给操做系统发送了一个信号
p2.start() # 并非先运行p1再运行p2再运行p3最后运行p4这样的顺序
p3.start() # 只是发一个信号,开进程由操做系统来执行
p4.start()
print('主进程')
方式二:继承Process类,必须本身写run()方法,本身写一个类继承Process类
import time,random
from multiprocessing import Process 这个Process必定要是大写开头的才对
class Piao(Process):
def __init__(self,name):
super().__init__() 或 Process.__init__(self) super方法不用写self
self.name=name
def run(self): # 必定要是run()
print('%s is piaoing'%self.name)
time.sleep(random.randrange(1,5))
print('%s piao end'%self.name)
if __name__ == '__main__':
p1=Piao('egon')
p2=Piao('alex')
p3=Piao('wupeiqi')
p4=Piao('yuanhao')
p1.start()
p2.start()
p3.start()
p4.start()
print('主进程')
查看pid:
1.用os模块,os.getpid()查看本身的pid,os.getppid()查看父进程的pid
2.p.pid
全部的子进程都要经历僵尸进程这个状态,就是子进程执行完毕了后要清理掉的时候,成为僵尸进程,
保留一点点子进程的消息,以供父进程查看,父进程终结掉时会清理掉僵尸进程,有害
孤儿进程:子进程没终结掉,而父进程终结了,由init进程接收,无害
进程间内存空间是隔离的
基于多进程实现并发的套接字通讯,实例:
server:
import socket
from multiprocessing import Process
def talk(conn):
while True:
try:
data = conn.recv(1024)
if not data:
continue
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
p = Process(target=talk, args=(conn,))
p.start()
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8800)
client:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
join方法
在主进程运行过程当中,若是想并发的执行其余的任务,咱们能够开启子进程,
此时主进程任务与子进程任务分两种状况:
1.在主进程任务与子进程任务彼此独立的状况下,主进程的任务先执行完毕后,
主进程还须要等待子进程执行完毕,而后统一回收资源
2.若是主进程任务执行到某一个阶段时,须要等待子进程执行完毕后才能继续执行,
此时就须要一种机制可以让主进程检测子进程是否执行完毕,执行完毕就继续执行主进程,
未执行完毕,主进程一直阻塞,这种机制就是join方法的做用
join方法实例:
import time,random
from multiprocessing import Process
def task(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
p1 = Process(target=task,args=('egon',))
p2 = Process(target=task,args=('alex',))
p3 = Process(target=task,args=('wupeiqi',))
p4 = Process(target=task,args=('yuanhao',))
p1.start() 进程只要start就会开始运行,这里start4个,那么系统中就有4个并发的进程
p2.start()
p3.start()
p4.start()
p1.join() join方法是让主进程阻塞等待,p1-p4仍然并发执行
p2.join() 因此4个join花费的总时间就是耗费时间最长的那个进程运行的时间
p3.join()
p4.join()
#上面start与join能够简写以下
p_l = [p1,p2,p3,p4]
for i in p_l:
i.start()
for v in p_l:
v.join()
print('主进程')
守护进程
主进程建立子进程,而后将该子进程设置成守护进程
守护进程会在 主进程代码 执行结束后就终止
守护进程内没法再开启子进程,不然抛异常
若是咱们有两个任务须要并发执行,那么开一个主进程和一个子进程就能够了
若是子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成
守护进程,主进程任务执行完,守护进程随即结束
代码实例:
import time,random
from multiprocessing import Process
def task(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
p = Process(target=task,args=('egon',))
p.daemon = True 设置守护进程,必定要放在p.start()以前,否则会报错
p.start()
print('主进程') 这是主进程,只要终端中打印了这一行,守护进程也随之终止
这个函数的执行结果是只打印’主进程‘这三个字
互斥锁
针对的是对共享数据的修改,只要对修改共享数据的代码加锁
原理上就是把并发改为串行,下降了效率,可是保证了数据安全不错乱
实例:
from multiprocessing import Process,Lock
import time,os
def work(lock):
lock.acquire() #加锁
print('%s is running'%os.getpid())
time.sleep(2)
print('%s is done'%os.getpid())
lock.release() #释放锁
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=work,args=(lock,))
p.start()
模拟抢票,实例:
db.txt >>: {"count": 1} 这个count必定要用双引号,否则用json读不出来
from multiprocessing import Process, Lock
import json
import time
def search(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
print('<%s> check number:[%s]' % (name, dic.get('count')))
def get(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
if dic.get('count') > 0:
dic['count'] -= 1
time.sleep(1)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print('<%s> successful' % name)
else:
print('sorry<%s>,no more' % name)
def task(name, lock):
search(name)
lock.acquire()
get(name)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(1, 10):
t = Process(target=task, args=('egon %s' % i, lock))
t.start()
join是将一个任务总体串行,多个子进程仍是并发的,子进程与主进程改为了串行,而互斥锁则是将一个任务中的某一段代码串行
队列Queue
进程彼此之间隔离,要实现进程间通讯(IPC),multiprocessing提供两种方式进行消息传递
队列和管道,都是使用内存空间
建立队列的类(底层就是以管道和锁定的方式实现),队列就是管道+锁
Queue([maxsize]) 建立共享的进程队列
maxsize是队列中容许最大项数,省略则无大小限制
队列里面存放的是消息,而非数据,因此不要放大的,要放小的
队列占用的是内存空间,因此maxsize即便没有大小限制也受到内存大小的限制
主要方法:
q.put() 在队列中插入数据
q.get() 能够从队列中读取并删除一个元素
q.full() 判断队列满了没有
q.empty() 判断队列空了没有
实例:
from multiprocessing import Process, Queue
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
生产者消费者模型
生产者指的是生产数据的任务,消费者指的是处理数据的任务
在并发编程中,可能会出现生产者跟消费者效率不协调的状况,这时候就须要生产者消费者模型
生产者消费者模型:
是经过一个容器来解决生产者和消费者的强耦合问题。
生产者与消费者之间不直接通信,而是经过一个阻塞队列进行通信
生产者把数据交给阻塞队列,消费者去阻塞队列那里拿数据
阻塞队列至关于一个缓冲区,来平衡生产者与消费者的处理能力,解耦合
用队列queue来实现
一种用于生产者消费者模型的队列的机制
JoinableQueue([maxsize])
方法与queue相似,q.get() q.put(),其特有的方法:
q.task_done() 消费者使用此方法发出信号,表示q.get()的返回项目已经被处理
q.join() 生产者调用此方法阻塞,直到队列中全部项目均被处理
实例:
from multiprocessing import Process,JoinableQueue
import time
def producer(q,name,food):
for i in range(1,10):
time.sleep(2)
res = '%s%s'%(food,i)
q.put(res)
print('----->%s produce %s'%(name,res))
q.join() 这里是为了阻塞住子进程,等待q.task_done()发消息
def consumer(q,name):
while True:
res = q.get()
time.sleep(1)
print('--->%s eat %s'%(name,res))
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'egon1','baozi'))
p2 = Process(target=producer,args=(q,'egon2','gutou'))
c1 = Process(target=consumer,args=(q,'alex1',))
c2 = Process(target=consumer,args=(q,'alex2',))
c1.daemon = True 设置守护进程
c2.daemon = True 设置守护进程
p1.start()
p2.start()
c1.start()
c2.start()
p1.join() 这里是为了阻塞主进程,主进程等待p1 p2结束才会执行 而p1 p2会被q.join阻塞,要等待c1 c2里面的
p2.join() q.task_done()执行,把所有取完的信号发回后才会把p1 p2结束掉,而p1 p2结束了,
print('主程序') c1 c2做为消费者也不必存在了,设置成守护进程
这样的话,p1,p2执行完了执行主程序,主程序执行完了c1 c2一块跟着结束
线程理论
进程是资源单位,线程是执行单位,线程才是cpu上的执行单位
进程比如是上海地铁,线程就是2号线,3号线,4号线
多线程就是一个进程中存在多个线程,多个线程共享该进程的地址空间和全部资源
建立线程的开销要远小于建立进程的开销
开启线程
方式1:Thread类直接建立
import threading
def countNum(n):
print('running on number:%s'%n)
if __name__=='__main__':
t1=threading.Thread(target=countNum,args=(23,))
t2=threading.Thread(target=countNum,args=(34,))
t1.start()
t2.start()
print('ending')
或者from threading import Thread,这样的话在建立对象的时候就能够直接Thread(),我的感受用这个好一点,占内存少,调用也方便
方式2:Thread类继承式建立
import threading
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num=num
def run(self):
print('running on number:%s'%self.num)
t1=MyThread(56)
t2=MyThread(78)
t1.start() 调用执行run()方法
t2.start()
print('ending')
进程与线程的区别:
1.建立进程的开销大于建立线程的开销
2.在主进程下开启多个线程,每一个线程的pid都跟主进程同样,而开多个进程,每一个进程的pid各不相同
3.进程之间的地址空间是隔离的,同一个进程内的多个线程地址空间是共享的,一个线程内的修改会反映到全部的线程中
Thread对象的其余属性或方法,这个是Thread下的方法
Thread.isAlive() 返回线程是否活动
Thread.getName() 返回线程名
Thread.setName() 设置线程名
threading模块提供的一些方法,这是threading下的方法,与Thread一个级别
threading.currentThread() 返回当前的线程变量,能够用这个拿到当前线程变量
threading.enumerate() 返回一个包含正在运行的线程的list
threading.activeCount() 返回正在运行的线程数量
与len(threading.enumerate())结果同样
代码示例:
join方法阻塞主线程
import threading
import time
def tingge():
print('tingge')
time.sleep(3)
def xieboke():
print('xieboke')
time.sleep(5)
t1=threading.Thread(target=tingge)
t2=threading.Thread(target=xieboke)
t1.start()
t2.start()
t1.join() #join在子线程完成运行以前,这个子线程的父线程将一直被阻塞
t2.join()
print('ending')
守护线程
不管是进程仍是线程,都遵循:守护xxx会等待主xxx运行完毕后就被销毁
须要强调的是,运行完毕并不是终止运行
1.对主进程而言,运行完毕指的是主进程代码运行完毕
2.对主线程而言,运行完毕指的是主线程所在的进程内全部非守护线程通通运行完毕
主线程才算运行完毕
代码实例:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hi'%name)
if __name__ == '__main__':
t = Thread(target=sayhi,args=('egon',))
t.setDaemon(True) 设置守护线程,放在start以前 或:t.daemon = True
t.start()
print('主线程')
由于sayhi中有sleep(2),因此在主线程print以后,被设置成守护线程的t就终止了
基于多线程实现并发的套接字通讯,实例:
server端:
import socket
from threading import Thread
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
t = Thread(target=communicate, args=(conn,))
t.start()
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8800)
client端:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
GIL 全局解释器锁
在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,没法利用多核
GIL是加在cpython解释器上的一把互斥锁,只是咱们经常使用的是cpython解释器而已,可是GIL并非python的锁
GIL并非python的特性,而是在实现Cpython时所引进的一个概念,python彻底能够不依赖于GIL
在cpython中,由于GIL的存在没法实现多线程的并行,没法利用多核,可是能够实现并发
GIL本质就是一把互斥锁(mutex),因此就是在把并发改串行
GIL与Lock:
首先,锁的目的是为了保护共享数据,同一时间只能有一个线程来修改共享的数据
而后,保护不一样的数据就应该加不一样的锁
最后,GIL与Lock保护的数据不同,GIL保护的是解释器级别的,好比垃圾回收的数据
Lock保护用户本身开发的应用程序的数据
互斥锁代码示例:
import time,threading
# from threading import Thread,Lock 这样写比较好
def subNum():
global num #100个线程会同时走到这一步
r.acquire() #加上互斥锁以后,acquire与release之间的内容被上锁,
temp=num #待资源访问结束后才会释放,而后让下一个线程进来
time.sleep(0.0001)
num=temp-1
r.release()
num=100
thread_list=[]
r=threading.Lock()
for i in range(100):
t=threading.Thread(target=subNum) #产生100个线程
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
print('Result:',num)
对于计算来讲,cpu越多越好,但对与I/O来讲,再多的cpu也没用
对于运行一个程序来讲,cpu越多执行效率越高,由于一个程序不会是纯计算或纯I/O
对于I/O密集型,不管是多核仍是单核,多线程合适
对于计算密集型,单核时使用多线程,多核时使用多进程
即,多进程适合多核时进行计算密集型任务,其他状况都是多线程更合适
多线程适合于I/O密集型,好比socket,爬虫,web
多进程适合于计算密集型,好比金融分析
如今都是多核了,因此在多核状况下:计算密集型用多进程,这样才能利用多核优点
IO密集型用多线程,由于咱们如今作的大都是IO密集型的操做,因此大部分状况下用多线程
代码实例:
对于计算密集型,多核时的多进程:效率高,8秒多
from multiprocessing import Process
import time
def counter():
i = 0
for i in range(40000000):
i += 1
return True
def main():
l=[]
start_time=time.time()
for i in range(2):
t=Process(target=counter)
t.start()
l.append(t)
for t in l:
t.join()
# counter()
# counter()
end_time=time.time()
print('Total time:{}'.format(end_time-start_time))
if __name__=='__main__':
main()
对于计算密集型,多核时的多线程:12秒多,效率低于多进程
from threading import Thread
import time
def counter():
i = 0
for i in range(40000000):
i += 1
return True
def main():
l=[]
start_time=time.time()
for i in range(2):
t=Thread(target=counter)
t.start()
l.append(t)
t.join()
end_time=time.time()
print('Total time:{}'.format(end_time-start_time))
if __name__=='__main__':
main()
进程池与线程池
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor 线程池,提供异步调用
ProcessPoolExecutor 进程池,提供异步调用
进程池与线程池的用法彻底同样
提交任务的两种方式:同步调用与异步调用
同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果再执行下一行代码
异步调用:提交完任务后,不在原地等待任务执行完毕,直接执行下面的代码
实例:
1.同步调用,不等于阻塞,阻塞是IO阻塞,同步调用是在等待任务执行结果,跟IO没有必然联系
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
return {'name': name, 'res': res}
def weight(shit):
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
shit1 = pool.submit(la, 'alex').result() # 等待执行结果,拿到结果后再往下走
weight(shit1)
shit2 = pool.submit(la, 'wusir').result()
weight(shit2)
2.异步调用
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
weight({'name': name, 'res': res})
def weight(shit):
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
pool.submit(la, 'alex') # 不拿到结果直接往下走
pool.submit(la, 'wusir')
pool.submit(la, 'yuan')
异步调用+回调函数:
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
return {'name': name, 'res': res}
def weight(shit):
shit = shit.result() 使用回调函数后传进来的shit是一个obj对象,要用obj.result()拿到结果
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
pool.submit(la, 'alex').add_done_callback(weight) weight拿到的是一个对象obj,因此须要用obj.result()拿到结果
pool.submit(la, 'wusir').add_done_callback(weight)
pool.submit(la, 'yuan').add_done_callback(weight)
基本方法:
submit(func, *args, **kwargs) 异步提交任务
map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操做
shutdown(wait=True) 至关于进程池的pool.close()+pool.join()的操做
wait=True 等待池内全部任务执行完毕回收完资源后才继续
wait=False 当即返回,并不会等待池内的任务执行完毕
可是无论wait的参数是什么,整个程序都会等到全部任务执行完毕
submit与map必须在shutdown以前
result(timeout=None) 取得结果
add_done_callback(func) 回调函数
为进程池或线程池内的每个进程或线程绑定一个函数func,该函数func在进程或线程的任务执行完毕后自动触发
并接受任务的返回值做为参数传给func
实例:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os
import time
import random
def task(name):
print('name:%s pid:%s run' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 进程池 指定进程池最大数目就是4个,这样的话最多就是开4个进程,多余的任务等着有进程结束才能执行
pool = ThreadPoolExecutor(4) # 线程池,10个线程的pid都是同样的,而上面进程池的时候10个进程的pid是固定的4个pid
for i in range(10):
pool.submit(task, 'egon%s' % i)
pool.shutdown(wait=True) # 会等待全部10个进程都走完才走下面的print
print('zhu')
练习:
from concurrent.futures import ThreadPoolExecutor
import time
def get(url):
print('GET %s' % url)
response = requests.get(url)
time.sleep(2)
return {'url': url, 'content': response.text}
def parse(res):
res = res.result()
print('%s res is %s' % (res.get('url'), len(res.get('content'))))
if __name__ == '__main__':
urls = [
'http://www.baidu.com',
'http://www.bilibili.com',
'http://www.zhihu.com',
'http://www.acfun.cn',
]
pool = ThreadPoolExecutor(3)
for url in urls:
pool.submit(get, url).add_done_callback(parse)
基于线程池实现并发的套接字通讯
client:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
server:
import socket
from concurrent.futures import ThreadPoolExecutor
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
pool.submit(communicate, conn)
server.close()
if __name__ == '__main__':
pool = ThreadPoolExecutor(2)
server('127.0.0.1', 8800)
死锁与递归锁
死锁:两个及以上的进程或线程在执行过程当中,因争夺资源而形成的相互等待的现象
解决办法就是递归锁,为了支持在同一线程中屡次请求同一资源,可重入锁RLock
递归锁能够acquire屡次,而互斥锁只能acquire一次
递归锁内部有一个count变量,记录acquire的计数,只要count的计数不是0,其余线程就抢不到,直到全部的acquire都被release,count为0时才能被其余线程抢到
代码实例:能够多用RLock
import time,threading
Rlock=threading.RLock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
Rlock.acquire()
print('i am %s,get res:%s---%s'%(self.name,'ResA',time.time()))
Rlock.acquire()
print('i am %s,get res:%s---%s'%(self.name,'ResB',time.time()))
Rlock.release()
Rlock.release()
def fun2(self):
Rlock.acquire()
print('i am %s,get res:%s---%s' % (self.name, 'ResB', time.time()))
time.sleep(0.2)
Rlock.acquire()
print('i am %s,get res:%s---%s' % (self.name, 'ResA', time.time()))
Rlock.release()
Rlock.release()
if __name__=='__main__':
print('start------------------------%s'%time.time())
for i in range(0,10):
my_thread=MyThread()
my_thread.start()
信号量
也是一把锁,能够指定信号量为5,互斥锁使得同一时间只能有一个任务抢到锁去执行,
那么信号量同一时间能够有5个任务拿到锁去执行,同一时间能够有多个任务抢到锁去执行
代码实例:
from threading import Thread,Semaphore
import threading,time
def func():
sm.acquire()
print('%s get sm'%threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm = Semaphore(5)
for i in range(23):
t = Thread(target=func)
t.start()
Event
线程的一个关键特性是每一个线程都是独立运行且状态不可预测的
若是程序中的线程须要判断某个线程的状态来肯定本身下一步的操做,那就用到Event方法
from threading import Event
event.isSet() 返回event的状态值
event.wait() 若是状态值为False将阻塞线程
event.set() 设置状态值为Ture,因此阻塞池的线程激活进入就绪状态,等待系统调用
event.clear() 恢复状态值为False
实例:
1.
from threading import Thread, Event
import time
event = Event()
def student(name):
print('%s 正在听课' % name)
event.wait(2) # 设置超时时间,超过这个时间没有拿到set的True就不等了继续往下执行
print('%s 课间活动' % name)
def teacher(name):
print('%s 正在授课' % name)
time.sleep(7)
event.set()
if __name__ == '__main__':
stu1 = Thread(target=student, args=('alex',))
stu2 = Thread(target=student, args=('wusir',))
stu3 = Thread(target=student, args=('yuan',))
t1 = Thread(target=teacher, args=('egon',))
stu1.start()
stu2.start()
stu3.start()
t1.start()
2.
from threading import Thread, Event, currentThread
import time
event = Event()
def conn():
n = 0
while not event.is_set():
if n == 3:
print('%s try too many times' % currentThread().getName())
return
print('%s try %s' % (currentThread().getName(), n ))
event.wait(0.5)
n += 1
print('%s is connected' % currentThread().getName())
def check():
print('%s is checking' % currentThread().getName())
time.sleep(5)
event.set()
if __name__ == '__main__':
for i in range(3):
t = Thread(target=conn)
t.start()
t = Thread(target=check)
t.start()
定时器
指定n秒后执行某操做
代码实例:
1.
from threading import Timer
def hello():
print('hello world')
t = Timer(1,hello) 一秒后,执行hello函数
t.start()
2. 作一个4位数的验证码,有效期是5秒,超过5秒不输入就刷新出新的验证码
from threading import Timer
import random
class Code:
def __init__(self):
self.make_cache()
def make_cache(self, interval=5):
self.cache = self.make_code()
print(self.cache)
self.t = Timer(interval, self.make_cache)
self.t.start()
def make_code(self, n=4):
res = ''
for i in range(n):
s1 = str(random.randint(0, 9))
s2 = chr(random.randint(65, 90))
res += random.choice([s1, s2])
return res
def check(self):
while True:
code = input('>>:').strip()
if code.upper() == self.cache:
print('right')
self.t.cancel()
break
obj = Code()
obj.check()
线程queue
实例:
import queue
#最经常使用
q=queue.Queue() #默认先进先出(FIFO)括号里面能够设定最大值
q.put(111)
q.put('hello')
q.put(222)
print(q.get())
print(q.get())
print(q.get())
print(q.get()) 这里由于只有三个数据,第四次get没有数据,因此会一直阻塞住
#堆栈,后进先出
q1=queue.LifoQueue() last in first out
q1.put(111)
q1.put(222)
q1.put(333)
print(q1.get()) 取到的是333
#优先级队列,按照优先级
q2=queue.PriorityQueue()
q2.put([4,'hello4']) put进去的元素,我目前知道列表或元组,第一个元素规定优先级
q2.put([1,'hello']) 一般是数字,数字越小,优先级越高
q2.put([2,'hello2'])
print(q2.get()) 取到的是[1,'hello']
print(q2.get()) 取到的是[2,'hello2']
协程
并发的本质是切换+保存状态
协程:是单线程下的并发,又称为微线程、纤程Coroutine
协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的
python的线程属于内核级别的,即由操做系统控制调度
单线程内开启协程,一旦遇到I/O就会从应用程序级别控制切换
优势:
1.协程的切换开销小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
2.单线程内就能够实现并发的效果,最大限度利用cpu
缺点:
1.协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
2.协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程
协程必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里本身保存多个控制流的上下文栈
一个协程遇到IO操做会自动切换到其余协程
实例:
import time
def consumer():
r=''
while True:
n=yield r
if not n:
return
print('[CONSUMER]<<Consuming%s...'%n)
time.sleep(1)
r='200 OK'
def produce(c):
next(c)
n=0
while n<5:
n+=1
print('[PRODUCER]>>Producing %s...'%n)
cr=c.send(n)
print('[PRODUCER] Consumer return:%s'%cr)
c.close()
if __name__=='__main__':
c=consumer()
produce(c)
greenlet
在单个线程内有多个任务,须要在多个任务之间切换,yield太麻烦,就用greenlet模块
代码实例:
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch() # 这里能够传参数,只有在第一次switch时能够传参数
可是greenlet不能处理I/O阻塞操做,能够用gevent模块解决
gevent
异步提交任务,就由于是异步提交的,因此才须要用jion方法阻塞住
协程只有一个单线程,异步提交任务,不join的话,spawn提交了任务就走完了,线程就结束了,可能提交的任务还没开始,就会直接死掉
因此必定要用join阻塞住主线程,保证线程不死
用法:
g1 = gevent.spawn(func1,1,2,3,4,x=5,y=6) 建立一个协程对象g1,第一个参数放函数名
后面的所有都是给该函数名传值的
g2 = gevent.spawn(func2)
g1.join() 等待g1的结果
g2.join()
或者上面两个合并:gevent.joinall([gevent.spawn(func1),gevent.spawn(func2)])
g1.value 拿到g1的返回值
代码实例:
import gevent,time
def foo():
print('running in foo')
gevent.sleep(2) 用来模拟gevent能够识别的I/O阻塞,time.sleep()不能识别
print('switch to foo again')
def bar():
print('switch to bar')
gevent.sleep(5)
print('switch to bar again')
start=time.time()
print(start)
#g1 = gevent.spawn(foo)
#g2 = gevent.spawn(bar)
#g1.join()
#g2.join() # 或用下面的方法
gevent.joinall(
[gevent.spawn(foo),
gevent.spawn(bar)] 里面放列表的形式
)
print(time.time()-start)
切记:须要加上下面这句代码,gevent才能识别正常的I/O阻塞,否则不能识别time.sleep(1)
from gevent import monkey;monkey.patch_all()
干脆就直接记住,要用gevent,就在文件的开头加上这句话!开头!import socket的前面
#爬虫
import time
from gevent import monkey;monkey.patch_all()
import gevent
from urllib import request
def f(url):
print('GET:%s'%url)
resp=request.urlopen(url)
data=resp.read()
print('%d bytes received from %s.'%(len(data),url))
start=time.time()
gevent.joinall([
gevent.spawn(f,'https://nba.hupu.com/'),
gevent.spawn(f,'https://www.zhihu.com/'),
gevent.spawn(f,'https://www.bilibili.com/'),
])
print(time.time()-start)
基于gevent模块实现并发的套接字通讯
client:
import socket
from threading import Thread, currentThread
def client():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
client.send(('%s say hello' % currentThread().getName()).encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
if __name__ == '__main__':
for i in range(500): # 开500个线程,这样的话,就是服务端单线程处理500个并发
t = Thread(target=client)
t.start()
server:
from gevent import monkey; monkey.patch_all()
import socket
import gevent
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
gevent.spawn(communicate, conn)
server.close()
if __name__ == '__main__':
g1 = gevent.spawn(server, '127.0.0.1', 8800)
g1.join()
其实使用yield、greenl和gevent都是在实现协程,单线程下的并发,yield最麻烦,greenlet稍好一点,这二者都不能处理IO操做
gevent最好,其底层封装的仍是greenlet,能够处理IO操做
IO模型
同步:在发出一个功能调用时,在没有获得结果以前,该调用就不会返回,好比打电话
异步:在发出一个功能调用时,调用者不能马上获得结果,好比发短信,一般跟回调机制结合到一块儿
阻塞:调用结果返回前,当前线程会被挂起,函数只有获得结果以后才会将阻塞的线程激活
非阻塞:在不能马上获得结果以前也会马上返回,同时该函数不会阻塞当前线程
当一个read操做发生时,该操做会经历两个阶段:
等待数据准备 waiting for the data to be ready
将数据从内核拷贝到进程中 copying the data from the kernel to the process
阻塞IO(blocking IO)
在linux中,默认状况下,全部的socket都是blocking
特色是在IO执行的两个阶段都被block
能够用多线程模型来解决小规模的服务请求,大规模的要用的非阻塞IO
非阻塞IO(non-blocking IO)
屡次发送系统调用,wait for data阶段,数据没准备好会返回error,而后不停的发
循环往复的进行recvfrom系统调用,这称之为轮询,轮询检查内核数据,直到数据准备好
拷贝数据的过程,进程处于阻塞状态
优势:wait for data无阻塞 copy data阻塞
缺点:系统调用发送太多,数据不能实时接收到
可是,非阻塞IO毫不被推荐
异步IO
全程无阻塞
IO多路复用(IO multiplexing)
监听多个连接 select/epoll,优点在于处理多个链接
特色:全程阻塞,比阻塞IO多一次系统调用
可以监听多个文件描述符(套接字对象),进而实现并发
代码实例:
这是server端
import socket,time,select
sock=socket.socket()
sock.bind(('127.0.0.1',8800))
sock.listen(5)
sock.setblocking(False)
inputs=[sock,] #这个sock是在有一个clent链接的时候才会有变化
while 1:
r,w,e=select.select(inputs,[],[]) #监听的是有变化的套接字
for obj in r:
if obj==sock:
conn,addr=obj.accept() #这个conn是在clent发送数据的时候发生变化
print('conn',conn) #conn是连接的客户端的套接字对象
inputs.append(conn)
else:
try:
data = obj.recv(1024)
print(data.decode('utf-8'))
send_data = input('>>:')
obj.send(send_data.encode('utf-8'))
except Exception:
inputs.remove(obj)
#在linux上这样写,上面的写法是应用于windows上
# if not data:
# inputs.remove(obj)
# continue
在全部的IO模型里面,阻塞IO、非阻塞IO、IO多路复用以及没学的驱动信号,都是同步IO
只有异步IO是异步的。
关于同步异步的判断:不管是wait for data 仍是 copy data 只要出现阻塞,那就是同步的
IO多路复用
对于windows而言,只有select,对于linux有select poll epoll三种方式
epoll>poll>select
select缺点:
1.每次调用select都须要把全部的文件描述符fd拷贝到内核空间,致使效率降低;
2.每次调用,都遍历全部的fd是否有数据访问,这个过程效率过低,这个问题最重要;
3.最大链接数有上限(1024)
poll:跟select基本一致,可是最大链接数没有限制
epoll:
第一个函数:建立epoll句柄,将全部的文件描述符fd拷贝到内存空间,可是只须要拷一次
第二个函数:回调函数,是某一个函数或动做成功完成后会触发的函数,
为全部的fd绑定一个回调函数,一旦有数据访问,就会触发该回调函数,
回调函数将fd放到一个链表中
第三个函数:判断链表是否为空
epoll的最大链接数没有上限(相对的)
selectors模块
三种IO多路复用模型在不一样平台有不一样支持,
使用seletors帮咱们默认选择当前平台下合适的IO多路复用模型
代码实例:
server端
import selectors #基于select模块实现的IO多路复用,建议使用
import socket
sock=socket.socket()
sock.bind(('127.0.0.1',8800))
sock.listen(5)
sock.setblocking(False)
=selectors.DefaultSelector() #根据具体平台选择最佳IO多路复用机制,好比linux上选epoll
def read(conn,mask):
try:
data=conn.recv(1024)
print(data.decode('utf-8'))
send_data=input('>>:')
conn.send(send_data.encode('utf-8'))
except Exception:
sel.unregister(conn) #解除绑定,避免出现一个客户端认为关闭服务端崩掉的状况
def accept(sock,mask):
conn,addr=sock.accept()
#print(conn)
sel.register(conn,selectors.EVENT_READ,read)
sel.register(sock,selectors.EVENT_READ,accept) #注册事件
while 1:
#print('waiting...')
events=sel.select() #监听
for key,mask in events:
#print(key.data) #当前绑定的方法,好比accept,read
#print(key.fileobj) #当前活动的文件描述符,好比sock,conn
func=key.data
obj=key.fileobj
func(obj,mask)
#官方的列子以下:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
socketserver
服务端的特色:
1.一直运行提供服务,即连接循环,通讯循环是基于一个连接的
2.绑定一个惟一的地址
socketserver模块分为两大类:server类解决连接问题,request解决通讯问题
代码实例:
server端:
import socketserver
class FTPserver(socketserver.BaseRequestHandler): #通信
def handle(self): 这个是固定死的,必须定义handle方法,上面的继承也是固定的
print(self)
print(self.request) 独有的request方法,其实就是conn,一个套接字对象
while True:
data=self.request.recv(1024)
print(data)
self.request.send(data.upper())
if __name__=='__main__':
obj=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FTPserver) 基于多线程实现并发
obj.serve_forever() #连接循环
client端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))
while True:#通讯循环
msg=input('>>:').strip()
if not msg:continue #判断msg是否为空,为空的话就continue
phone.send(msg.encode('utf-8'))
data=phone.recv(1024)
print(data)
phone.close()