回顾:同步、异步、阻塞、非阻塞html
同步:python
所谓同步,就是在发出一个功能调用时,在没有获得结果以前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。可是通常而言,咱们在说同步、异步的时候,特指那些须要其余部件协做或者须要必定时间完成的任务。程序员
举例:编程
1. multiprocessing.Pool下的apply #发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算仍是在io阻塞,总之就是一股脑地等任务结束windows
2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()安全
3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()服务器
异步:网络
异步的概念和同步相对。当一个异步功能调用发出后,调用者不能马上获得结果。当该异步功能完成后,经过状态、通知或回调来通知调用者。若是异步功能用状态来通知,那么调用者就须要每隔必定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这实际上是一 种很严重的错误)。若是是使用通知的方式,效率则很高,由于异步功能几乎不须要作额外的操做。至于回调函数,其实和通知没太多区别。多线程
举例:并发
1. multiprocessing.Pool().apply_async() #发起异步调用后,并不会等待任务结束才返回,相反,会当即获取一个临时结果(并非最终的结果,多是封装好的一个对象)。
2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
阻塞:
阻塞调用是指调用结果返回以前,当前线程会被挂起(如遇到io操做)。函数只有在获得结果以后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不一样的。对于同步调用来讲,不少时候当前线程仍是激活的,只是从逻辑上当前函数没有返回而已。
举例:
1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即使是被抢走cpu的执行权限,那也是处于就绪态);
2. 阻塞调用:当socket工做在阻塞模式的时候,若是没有数据的状况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
非阻塞:
非阻塞和阻塞的概念相对应,指在不能马上获得结果以前也会马上返回,同时该函数不会阻塞当前线程。
对于一个network IO (这里咱们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另外一个就是系统内核(kernel)。当一个read操做发生时,该操做会经历两个阶段:
1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
记住这两点很重要,由于这些IO模型的区别就是在两个阶段上各有不一样的状况。
一、输入操做:read、readv、recv、recvfrom、recvmsg共5个函数,若是会阻塞状态,则会经理wait data和copy data两个阶段,若是设置为非阻塞则在wait 不到data时抛出异常
二、输出操做:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,若是设置为非阻塞,则会抛出异常
三、接收外来连接:accept,与输入操做相似
四、发起外出连接:connect,与输出操做相似
在UI编程中,经常要对鼠标点击进行相应,首先如何得到鼠标点击呢?
方式一:建立一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有如下几个缺点:
1. CPU资源浪费,可能鼠标点击的频率很是小,可是扫描线程仍是会一直循环检测,这会形成不少的CPU资源浪费;若是扫描鼠标点击的接口是阻塞的呢?
2. 若是是堵塞的,又会出现下面这样的问题,若是咱们不但要扫描鼠标点击,还要扫描键盘是否按下,因为扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
3. 若是一个循环须要扫描的设备很是多,这又会引来响应时间的问题;
因此,该方式是很是很差的。
方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增长一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不一样的事件,调用不一样的函数,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的处理函数指针,这样,每一个消息都有独立的处理函数;
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。
在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。
当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:
一、程序中有许多任务,并且…
二、任务之间高度独立(所以它们不须要互相通讯,或者等待彼此)并且…
三、在等待事件到来时,某些任务会阻塞。
当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。
网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。
此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,而后主程序就能够继续干其它的事情了,只到io处理完毕后,继续恢复以前中断的任务,这本质上是怎么实现的呢?哈哈,下面咱们就来一块儿揭开这神秘的面纱。。。。
select,poll,epoll都是IO多路复用的机制。I/O多路复用就是经过一种机制,一个进程能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。但select,poll,epoll本质上都是同步I/O,由于他们都须要在读写事件就绪后本身负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需本身负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select
select(rlist, wlist, xlist, timeout=None)
函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,若是当即返回设为null便可),函数返回。当select函数返回后,能够 经过遍历fdset,来找到就绪的描述符。
select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势。select的一 个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,能够经过修改宏定义甚至从新编译内核的方式提高这一限制,可是这样也会形成效率的下降。
select 多并发socket 例子
#select socket server
#_*_coding:utf-8_*_
__author__ = 'YL'
import select import socket import sys import queue
server = socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #本身也要监测呀,由于server自己也是个fd
outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是没有任何fd就绪,那程序就会一直阻塞在这里
for s in readable: #每一个s就是一个socket
if s is server: #别忘记,上面咱们server本身也当作一个fd放在了inputs列表里,传给了select,若是这个s是server,表明server这个fd就绪了,
#就是有活动了, 什么状况下它才有活动? 固然 是有新链接进来的时候 呀
#新链接进来了,接受这个链接
conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #为了避免阻塞整个程序,咱们不会马上在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新链接
#就会被交给select去监听,若是这个链接的客户端发来了数据 ,那这个链接的fd在server端就会变成就续的,select就会把这个链接返回,返回到
#readable 列表里,而后你就能够loop readable列表,取出这个链接,开始接收数据了, 下面就是这么干 的
message_queues[conn] = queue.Queue() #接收到客户端的数据后,不马上返回 ,暂存在队列里,之后发送
else: #s不是server的话,那就只能是一个 与客户端创建的链接的fd了
#客户端的数据过来了,在这接收
data = s.recv(1024) if data: print("收到来自[%s]的数据:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
if s not in outputs: outputs.append(s) #为了避免影响处理与其它客户端的链接 , 这里不马上返回数据给客户端
else:#若是收不到data表明什么呢? 表明客户端断开了呀
print("客户端断开了",s) if s in outputs: outputs.remove(s) #清理已断开的链接
inputs.remove(s) #清理已断开的链接
del message_queues[s] ##清理已断开的链接
for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
#select socket client
#_*_coding:utf-8_*_
__author__ = 'YL'
import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening
print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets
for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets
for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() )
安装:http://www.rabbitmq.com/install-windows.html
安装 python rabbitMQ module :pip install pika
实现最简单的队列通讯:
send端
#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #声明queue
channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()