python并发编程(并发与并行,同步和异步,阻塞与非阻塞)

  最近在学python的网络编程,学了socket通讯,并利用socket实现了一个具备用户验证功能,能够上传下载文件、能够实现命令行功能,建立和删除文件夹,能够实现的断点续传等功能的FTP服务器。但在这当中,发现一些概念区分起来很难,好比并发和并行,同步和异步,阻塞和非阻塞,可是这些概念却很重要。所以在此把它总结下来。python

 1. 并发 & 并行

  并发:在操做系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。简言之,是指系统具备处理多个任务的能力。shell

  并行:当系统有一个以上CPU时,则线程的操做有可能非并发。当一个CPU执行一个线程时,另外一个CPU能够执行另外一个线程,两个线程互不抢占CPU资源,能够同时进行,这种方式咱们称之为并行(Parallel)。简言之,是指系统具备同时处理多个任务的能力。数据库

  下面咱们来两个例子:编程

import threading #线程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    music()
    game()
    print('ending.....')
View Code

music的时间为3秒,game的时间为5秒,若是按照咱们正常的执行,直接执行函数,那么将按顺序顺序执行,整个过程8秒。服务器

import threading #线程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    t1 = threading.Thread(target=music) #建立一个线程对象t1 子线程
    t2 = threading.Thread(target=game) #建立一个线程对象t2 子线程

    t1.start()
    t2.start()

    # t1.join() #等待子线程执行完 t1不执行完,谁也不许往下走
    t2.join()

    print('ending.......') #主线程
    print(time.ctime())
View Code

  在这个例子中,咱们开了两个线程,将music和game两个函数分别经过线程执行,运行结果显示两个线程同时开始,因为听音乐时间3秒,玩游戏时间5秒,因此整个过程完成时间为5秒。咱们发现,经过开启多个线程,本来8秒的时间缩短为5秒,本来顺序执行如今是否是看起来好像是并行执行的?看起来好像是这样,听音乐的同时在玩游戏,整个过程的时间随最长的任务时间变化。但真的是这样吗?那么下面我来提出一个GIL锁的概念。网络

GIL(全局解释器锁):不管你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只容许一个线程运行。
import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()

add()
mul() #串行比多线程还快

print('cost time %s'%(time.time()-start))
View Code

 

import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()
t1 = Thread(target=add)
t2 = Thread(target=mul)

l = []
l.append(t1)
l.append(t2)

for t in l:
   t.start()

for t in l:
    t.join()

print('cost time %s'%(time.time()-start))
View Code

 

  哎吆,这是怎么回事,串行执行比多线程还快?不符合常理呀。是否是颠覆了你的人生观,这个就和GIL锁有关,同一时刻,系统只容许一个线程执行,那么,就是说,本质上咱们以前理解的多线程的并行是不存在的,那么以前的例子为何时间确实缩短了呢?这里有涉及到一个任务的类型。多线程

--任务: 1.IO密集型(会有cpu空闲的时间)  注:sleep等同于IO操做, socket通讯也是IO  
2.计算密集型
  而以前那个例子刚好是IO密集型的例子,后面这个因为涉及到了加法和乘法,属于计算密集型操做,那么,就产生了一个结论,多线程对于IO密集型任务有做用,
而计算密集型任务不推荐使用多线程。
 
 而其中咱们还能够获得一个结论:因为GIL锁,多线程不可能真正实现并行,所谓的并行也只是宏观上并行微观上并发,本质上是因为遇到io操做不断的cpu切换
所形成并行的现象。因为cpu切换速度极快,因此看起来就像是在同时执行。
  --问题:没有利用多核的优点
    --这就形成了多线程不能同时执行,而且增长了切换的开销,串行的效率可能更高。

 2. 同步 & 异步

  对于一次IO访问(以read举例),数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。因此说,当一个read操做发生时,它会经历两个阶段:
     1. 等待数据准备 (Waiting for the data to be ready)
     2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等)
异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其余任务,一直等到数据接收成功,再回来处理。异步(例如发短信)
当咱们去爬取一个网页的时候,要爬取多个网站,有些人可能会发起多个请求,而后经过函数顺序调用。执行顺序也是先调用先执行。效率很是低。
下面咱们看一下异步的一个例子:
import socket
import select

"""
########http请求本质,IO阻塞########
sk = socket.socket()
#1.链接
sk.connect(('www.baidu.com',80,)) #阻塞
print('链接成功了')
#2.链接成功后发送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")

#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n区分响应头和影响体

#关闭链接
sk.close()
"""
"""
########http请求本质,IO非阻塞########
sk = socket.socket()
sk.setblocking(False)
#1.链接
try:
    sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错
    print('链接成功了')
except BlockingIOError as e:
    print(e)

#2.链接成功后发送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")

#3.等待服务端响应
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n区分响应头和影响体

#关闭链接
sk.close()
"""


class HttpRequest:
    def __init__(self,sk,host,callback):
        self.socket = sk
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()


class HttpResponse:
    def __init__(self,recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None

        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b'\r\n\r\n', 1)
        self.body = body
        header_list = headers.split(b'\r\n')
        for h in header_list:
            h_str = str(h,encoding='utf-8')
            v = h_str.split(':',1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = [] # 用于检测是否已经链接成功

    def add_request(self,host,callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host,80))
        except BlockingIOError as e:
            pass
        request = HttpRequest(sk,host,callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
            for w in wlist:
                print(w.host,'链接成功...')
                # 只要能循环到,表示socket和服务器端已经链接成功
                tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n"  %(w.host,)
                w.socket.send(bytes(tpl,encoding='utf-8'))
                self.connection.remove(w)
            for r in rlist:
                # r,是HttpRequest
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break


def f1(response):
    print('保存到文件',response.header_dict)


def f2(response):
    print('保存到数据库', response.header_dict)


url_list = [
    {'host':'www.youku.com','callback': f1},
    {'host':'v.qq.com','callback': f2},
    {'host':'www.cnblogs.com','callback': f2},
]

req = AsyncRequest()
for item in url_list:
    req.add_request(item['host'],item['callback'])

req.run()
View Code

  咱们能够看到,三个请求发送顺序与返回顺序,并不同,这样就体现了异步请求。即我同时将请求发送出去,哪一个先回来我先处理哪一个。并发

  即咱们能够理解为:我打电话的时候只容许和一我的通讯,和这我的通讯结束以后才容许和另外一我的开始。这就是同步。app

           咱们发短信的时候发完能够不去等待,去处理其余事情,当他回复以后咱们再去处理,这样就大大解放了咱们的时间。这就是异步。异步

  体如今网页请求上面就是我请求一个网页时候等待他回复,不然不接收其它请求,这就是同步。另外一种就是我发送请求以后不去等待他是否回复,而去处理其它请求,当处理完其余请求以后,某个请求说,个人回复了,而后程序转而去处理他的回复数据。这就是异步请求。因此,异步能够充分cpu的效率。

 

  3. 阻塞 & 非阻塞

  调用blocking IO会一直block住对应的进程直到操做完成,而non-blocking IO在kernel还准备数据的状况下会马上返回。
下面咱们经过socket实现一个命令行功能来感觉一下。
  
#服务端
from socket import *
import subprocess
import struct

ip_port = ('127.0.0.1', 8000)
buffer_size = 1024
backlog = 5

tcp_server = socket(AF_INET, SOCK_STREAM)
tcp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
tcp_server.bind(ip_port)
tcp_server.listen(backlog)

while True:
    conn, addr = tcp_server.accept()
    print('新的客户端连接:', addr)
    while True:
        try:
            cmd = conn.recv(buffer_size)
            print('收到客户端命令:', cmd.decode('utf-8'))

            #执行命令cmd,获得命令的结果cmd_res
            res = subprocess.Popen(cmd.decode('utf-8'),shell=True,
                                   stderr=subprocess.PIPE,
                                   stdout=subprocess.PIPE,
                                   stdin=subprocess.PIPE,
                                   )
            err = res.stderr.read()
            if err:
                cmd_res = err
            else:
                cmd_res = res.stdout.read()
            if not cmd_res:
                cmd_res = '执行成功'.encode('gbk')
            #解决粘包
            length = len(cmd_res)
            data_length = struct.pack('i',length)
            conn.send(data_length)
            conn.send(cmd_res)
        except Exception as e:
            print(e)
            break
    conn.close()


#客户端
from socket import *

ip_port = ('127.0.0.1',8000)
buffer_size = 1024
backlog = 5

tcp_client = socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    cmd = input('>>:').strip()
    if not cmd:
        continue
    if cmd == 'quit':
        break
    tcp_client.send(cmd.encode('utf-8'))

    #解决粘包
    length = tcp_client.recv(4)
    length = struct.unpack('i',length)[0]

    recv_size = 0
    recv_msg = b''
    while recv_size < length:
        recv_msg += tcp_client.recv(buffer_size)
        recv_size = len(recv_msg)

    print(recv_msg.decode('gbk'))
View Code

开启了服务器和一个客户端以后,咱们在客户端输入一些命令,而后正确显示,功能实现。这是在我再打开一个客户端,输入命令,发现服务器迟迟没有响应。

这个就是当一个客户端在请求的时候,当这个客户端没有结束的时候,服务器不会去处理其余客户端的请求。这时候就阻塞了。

如何让服务器同时处理多个客户端请求呢?

#服务端
import socketserver


class Myserver(socketserver.BaseRequestHandler):
    """socketserver内置的通讯方法"""
    def handle(self):
        print('conn is:',self.request)  #conn
        print('addr is:',self.client_address)  #addr

        while True:
            try:
                #发消息
                data = self.request.recv(1024)
                if not data:break
                print('收到的客户端消息是:',data.decode('utf-8'),self.client_address)

                #发消息
                self.request.sendall(data.upper())
            except Exception as e:
                print(e)
                break


if __name__ == '__main__':
    s = socketserver.ThreadingTCPServer(('127.0.0.1',8000), Myserver)  #通讯循环
    # s = socketserver.ForkingTCPServer(('127.0.0.1',8000), Myserver)  #通讯循环
    print(s.server_address)
    print(s.RequestHandlerClass)
    print(Myserver)
    print(s.socket)
    s.serve_forever()

#客户端

from socket import *

ip_port = ('127.0.0.1',8000)
buffer_size = 1024
backlog = 5

tcp_client = socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    msg = input('>>:').strip()
    if not msg:continue
    if msg == 'quit':break

    tcp_client.send(msg.encode('utf-8'))

    data = tcp_client.recv(buffer_size)
    print(data.decode('utf-8'))

tcp_client.close()
View Code

这段代码经过socketserver模块实现了socket的并发。这个过程当中,当一个客户端在向服务器请求的时候,另外一个客户端也能够正常请求。服务器在处理一个客户端请求的时候,另外一个请求没有被阻塞。

总结:只要有一丁点阻塞,就是阻塞IO。

   异步IO的特色就是全程无阻塞。

   有些人常把同步阻塞和异步非阻塞联系起来,但实际上通过分析,阻塞与同步,非阻塞和异步的定义是不同的。同步和异步的区别是遇到IO请求是否等待。阻塞和非阻塞的区别是数据没准备好的状况下是否当即返回。同步多是阻塞的,也多是非阻塞的,而非阻塞的有多是同步的,也有多是异步的。

  这里面其实涉及到的知识点还不少,这里只是凭个人记忆简单总结了一下,之后会补充更多。

相关文章
相关标签/搜索