协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。html
协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:python
协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。程序员
协程的好处:编程
缺点:数组
使用yield实现协程操做例子 缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import
time
import
queue
def
consumer(name):
print
(
"--->starting eating baozi..."
)
while
True
:
new_baozi
=
yield
print
(
"[%s] is eating baozi %s"
%
(name,new_baozi))
#time.sleep(1)
def
producer():
r
=
con.__next__()
r
=
con2.__next__()
n
=
0
while
n <
5
:
n
+
=
1
con.send(n)
con2.send(n)
print
(
"\033[32;1m[producer]\033[0m is making baozi %s"
%
n )
if
__name__
=
=
'__main__'
:
con
=
consumer(
"c1"
)
con2
=
consumer(
"c2"
)
p
=
producer()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
greenlet
import
greenlet
def
test1():
print
12
gr2.switch()
print
34
gr2.switch()
def
test2():
print
56
gr1.switch()
print
78
gr1
=
greenlet(test1)
gr2
=
greenlet(test2)
gr1.switch()
|
Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import
gevent
def
foo():
print
(
'Running in foo'
)
gevent.sleep(
0
)
print
(
'Explicit context switch to foo again'
)
def
bar():
print
(
'Explicit context to bar'
)
gevent.sleep(
0
)
print
(
'Implicit context switch back to bar'
)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
|
输出:服务器
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
同步与异步的性能区别 网络
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn
。 初始化的greenlet列表存放在数组threads
中,此数组被传给gevent.joinall
函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。 数据结构
遇到IO阻塞时会自动切换任务
经过gevent实现单线程下的多socket并发
server side
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import
sys
import
socket
import
time
import
gevent
from
gevent
import
socket,monkey
monkey.patch_all()
def
server(port):
s
=
socket.socket()
s.bind((
'0.0.0.0'
, port))
s.listen(
500
)
while
True
:
cli, addr
=
s.accept()
gevent.spawn(handle_request, cli)
def
handle_request(conn):
try
:
while
True
:
data
=
conn.recv(
1024
)
print
(
"recv:"
, data)
conn.send(data)
if
not
data:
conn.shutdown(socket.SHUT_WR)
except
Exception as ex:
print
(ex)
finally
:
conn.close()
if
__name__
=
=
'__main__'
:
server(
8001
)
|
在UI编程中,经常要对鼠标点击进行相应,首先如何得到鼠标点击呢?
方式一:建立一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有如下几个缺点:
1. CPU资源浪费,可能鼠标点击的频率很是小,可是扫描线程仍是会一直循环检测,这会形成不少的CPU资源浪费;若是扫描鼠标点击的接口是阻塞的呢?
2. 若是是堵塞的,又会出现下面这样的问题,若是咱们不但要扫描鼠标点击,还要扫描键盘是否按下,因为扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
3. 若是一个循环须要扫描的设备很是多,这又会引来响应时间的问题;
因此,该方式是很是很差的。
方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增长一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不一样的事件,调用不一样的函数,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的处理函数指针,这样,每一个消息都有独立的处理函数;
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
让咱们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展现了随着时间的推移,这三种模式下程序所作的工做。这个程序有3个任务须要完成,每一个任务都在等待I/O操做时阻塞自身。阻塞在I/O操做上所花费的时间已经用灰色框标示出来了。
在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙且使人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其余昂贵的操做时,注册一个回调到事件循环中,而后当I/O操做完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询全部的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽量的得以执行而不须要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,由于程序员不须要关心线程安全问题。
当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:
当应用程序须要在任务间共享可变的数据时,这也是一个不错的选择,由于这里不须要采用同步处理。
网络应用程序一般都有上述这些特色,这使得它们可以很好的契合事件驱动编程模型。
首先列一下,sellect、poll、epoll三者的区别
select
select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。
select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。
select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。
poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。
poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。
epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。
另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。
Python select
Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files, and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable, 或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器。
注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.
接下来经过echo server例子要以了解select 是如何经过单进程实现同时处理多个非阻塞的socket链接的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import
select
import
socket
import
sys
import
Queue
# Create a TCP/IP socket
server
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(
0
)
# Bind the socket to the port
server_address
=
(
'localhost'
,
10000
)
print
>>sys.stderr,
'starting up on %s port %s'
%
server_address
server.bind(server_address)
# Listen for incoming connections
server.listen(
5
)
|
select()方法接收并监控3个通讯列表, 第一个是全部的输入的data,就是指外部发过来的数据,第2个是监控和接收全部要发出去的data(outgoing data),第3个监控错误信息,接下来咱们须要建立2个列表来包含输入和输出信息来传给select().
# Sockets from which we expect to read
inputs = [ server ] # Sockets to which we expect to write outputs = [ ]
全部客户端的进来的链接和数据将会被server的主循环程序放在上面的list中处理,咱们如今的server端须要等待链接可写(writable)以后才能过来,而后接收数据并返回(所以不是在接收到数据以后就马上返回),由于每一个链接要把输入或输出的数据先缓存到queue里,而后再由select取出来再发出去。
Connections are added to and removed from these lists by the server main loop. Since this version of the server is going to wait for a socket to become writable before sending any data (instead of immediately sending the reply), each output connection needs a queue to act as a buffer for the data to be sent through it.
# Outgoing message queues (socket:Queue)
message_queues = {}
The main portion of the server program loops, calling select() to block and wait for network activity.
下面是此程序的主循环,调用select()时会阻塞和等待直到新的链接和数据进来
while inputs: # Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)
当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,咱们上面将他们分别赋值为readable,writable,exceptional, 全部在readable list中的socket链接表明有数据可接收(recv),全部在writable list中的存放着你能够对其进行发送(send)操做的socket链接,当链接通讯出现error时会把error写到exceptional列表中。
select() returns three new lists, containing subsets of the contents of the lists passed in. All of the sockets in the readable list have incoming data buffered and available to be read. All of the sockets in the writable list have free space in their buffer and can be written to. The sockets returned in exceptional have had an error (the actual definition of “exceptional condition” depends on the platform).
Readable list 中的socket 能够有3种可能状态,第一种是若是这个socket是main "server" socket,它负责监听客户端的链接,若是这个main server socket出如今readable里,那表明这是server端已经ready来接收一个新的链接进来了,为了让这个main server能同时处理多个链接,在下面的代码里,咱们把这个main server的socket设置为非阻塞模式。
The “readable” sockets represent three possible cases. If the socket is the main “server” socket, the one being used to listen for connections, then the “readable” condition means it is ready to accept another incoming connection. In addition to adding the new connection to the list of inputs to monitor, this section sets the client socket to not block.
1
2
3
4
5
6
7
8
9
10
11
12
|
# Handle inputs
for
s
in
readable:
if
s
is
server:
# A "readable" server socket is ready to accept a connection
connection, client_address
=
s.accept()
print
>>sys.stderr,
'new connection from'
, client_address
connection.setblocking(
0
)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection]
=
Queue.Queue()
|
第二种状况是这个socket是已经创建了的链接,它把数据发了过来,这个时候你就能够经过recv()来接收它发过来的数据,而后把接收到的数据放到queue里,这样你就能够把接收到的数据再传回给客户端了。
The next case is an established connection with a client that has sent data. The data is read with recv(), then placed on the queue so it can be sent through the socket and back to the client.
1
2
3
4
5
6
7
8
9
|
else
:
data
=
s.recv(
1024
)
if
data:
# A readable client socket has data
print
>>sys.stderr,
'received "%s" from %s'
%
(data, s.getpeername())
message_queues[s].put(data)
# Add output channel for response
if
s
not
in
outputs:
outputs.append(s)
|
第三种状况就是这个客户端已经断开了,因此你再经过recv()接收到的数据就为空了,因此这个时候你就能够把这个跟客户端的链接关闭了。
A readable socket without data available is from a client that has disconnected, and the stream is ready to be closed.
1
2
3
4
5
6
7
8
9
10
11
|
else
:
# Interpret empty result as closed connection
print
>>sys.stderr,
'closing'
, client_address,
'after reading no data'
# Stop listening for input on the connection
if
s
in
outputs:
outputs.remove(s)
#既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
inputs.remove(s)
#inputs中也删除掉
s.close()
#把这个链接关闭掉
# Remove message queue
del
message_queues[s]
|
对于writable list中的socket,也有几种状态,若是这个客户端链接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,不然就把这个链接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个链接,那就会认为这个链接还处于非活动状态
There are fewer cases for the writable connections. If there is data in the queue for a connection, the next message is sent. Otherwise, the connection is removed from the list of output connections so that the next time through the loop select() does not indicate that the socket is ready to send data.
1
2
3
4
5
6
7
8
9
10
11
|
# Handle outputs
for
s
in
writable:
try
:
next_msg
=
message_queues[s].get_nowait()
except
Queue.Empty:
# No messages waiting so stop checking for writability.
print
>>sys.stderr,
'output queue for'
, s.getpeername(),
'is empty'
outputs.remove(s)
else
:
print
>>sys.stderr,
'sending "%s" to %s'
%
(next_msg, s.getpeername())
s.send(next_msg)
|
最后,若是在跟某个socket链接通讯过程当中出了错误,就把这个链接对象在inputs\outputs\message_queue中都删除,再把链接关闭掉
1
2
3
4
5
6
7
8
9
10
11
|
# Handle "exceptional conditions"
for
s
in
exceptional:
print
>>sys.stderr,
'handling exceptional condition for'
, s.getpeername()
# Stop listening for input on the connection
inputs.remove(s)
if
s
in
outputs:
outputs.remove(s)
s.close()
# Remove message queue
del
message_queues[s]
|
客户端
下面的这个是客户端程序展现了如何经过select()对socket进行管理并与多个链接同时进行交互,
The example client program uses two sockets to demonstrate how the server with select() manages multiple connections at the same time. The client starts by connecting each TCP/IP socket to the server.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
socket
import
sys
messages
=
[
'This is the message. '
,
'It will be sent '
,
'in parts.'
,
]
server_address
=
(
'localhost'
,
10000
)
# Create a TCP/IP socket
socks
=
[ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print
>>sys.stderr,
'connecting to %s port %s'
%
server_address
for
s
in
socks:
s.connect(server_address)
|
接下来经过循环经过每一个socket链接给server发送和接收数据。
Then it sends one pieces of the message at a time via each socket, and reads all responses available after writing new data.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
for
message
in
messages:
# Send messages on both sockets
for
s
in
socks:
print
>>sys.stderr,
'%s: sending "%s"'
%
(s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for
s
in
socks:
data
=
s.recv(
1024
)
print
>>sys.stderr,
'%s: received "%s"'
%
(s.getsockname(), data)
if
not
data:
print
>>sys.stderr,
'closing socket'
, s.getsockname()
|
最后服务器端的完整代码以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#_*_coding:utf-8_*_
__author__
=
'Alex Li'
import
select
import
socket
import
sys
import
queue
# Create a TCP/IP socket
server
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(
False
)
# Bind the socket to the port
server_address
=
(
'localhost'
,
10000
)
print
(sys.stderr,
'starting up on %s port %s'
%
server_address)
server.bind(server_address)
# Listen for incoming connections
server.listen(
5
)
# Sockets from which we expect to read
inputs
=
[ server ]
# Sockets to which we expect to write
outputs
=
[ ]
message_queues
=
{}
while
inputs:
# Wait for at least one of the sockets to be ready for processing
print
(
'\nwaiting for the next event'
)
readable, writable, exceptional
=
select.select(inputs, outputs, inputs)
# Handle inputs
for
s
in
readable:
if
s
is
server:
# A "readable" server socket is ready to accept a connection
connection, client_address
=
s.accept()
print
(
'new connection from'
, client_address)
connection.setblocking(
False
)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection]
=
queue.Queue()
else
:
data
=
s.recv(
1024
)
if
data:
# A readable client socket has data
print
(sys.stderr,
'received "%s" from %s'
%
(data, s.getpeername()) )
message_queues[s].put(data)
# Add output channel for response
if
s
not
in
outputs:
outputs.append(s)
else
:
# Interpret empty result as closed connection
print
(
'closing'
, client_address,
'after reading no data'
)
# Stop listening for input on the connection
if
s
in
outputs:
outputs.remove(s)
#既然客户端都断开了,我就不用再给它返回数据了,因此这时候若是这个客户端的链接对象还在outputs列表中,就把它删掉
inputs.remove(s)
#inputs中也删除掉
s.close()
#把这个链接关闭掉
# Remove message queue
del
message_queues[s]
# Handle outputs
for
s
in
writable:
try
:
next_msg
=
message_queues[s].get_nowait()
except
queue.Empty:
# No messages waiting so stop checking for writability.
print
(
'output queue for'
, s.getpeername(),
'is empty'
)
outputs.remove(s)
else
:
print
(
'sending "%s" to %s'
%
(next_msg, s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for
s
in
exceptional:
print
(
'handling exceptional condition for'
, s.getpeername() )
# Stop listening for input on the connection
inputs.remove(s)
if
s
in
outputs:
outputs.remove(s)
s.close()
# Remove message queue
del
message_queues[s]
|
selectors模块
This module allows high-level and efficient I/O multiplexing, built upon the select
module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import
selectors
import
socket
sel
=
selectors.DefaultSelector()
def
accept(sock, mask):
conn, addr
=
sock.accept()
# Should be ready
print
(
'accepted'
, conn,
'from'
, addr)
conn.setblocking(
False
)
sel.register(conn, selectors.EVENT_READ, read)
def
read(conn, mask):
data
=
conn.recv(
1000
)
# Should be ready
if
data:
print
(
'echoing'
,
repr
(data),
'to'
, conn)
conn.send(data)
# Hope it won't block
else
:
print
(
'closing'
, conn)
sel.unregister(conn)
conn.close()
sock
=
socket.socket()
sock.bind((
'localhost'
,
10000
))
sock.listen(
100
)
sock.setblocking(
False
)
sel.register(sock, selectors.EVENT_READ, accept)
while
True
:
events
=
sel.select()
for
key, mask
in
events:
callback
=
key.data
callback(key.fileobj, mask)
|