咱们经过一个简单web server程序来观察python的线程,首先写一个耗时的小函数python
def fib(n): if n <= 2: return 1 else: return fib(n - 1) + fib(n - 2)
而后写一个fib web server,程序比较简单就不解释了。linux
from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) fib_handle(client) def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002))
运行shell命令能够看到计算结果web
nc localhost 25002shell
10编程
55windows
因为服务段是单线程的,若是另外启动一个链接将得不到计算结果安全
nc localhost 25002服务器
10多线程
为了能让咱们的server支持多个请求,咱们对服务端代码加入多线程支持并发
#sever.py #服务端代码 from socket import * from fib import fib from threading import Thread def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) #fib_handler(client) Thread(target=fib_handler, args=(client,), daemon=True).start() #须要在python3下运行 def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002)) #在25002端口启动程序
运行shell命令能够看到计算结果
nc localhost 25002
10
55
因为服务端是多线程的,启动一个新链接将获得计算结果
nc localhost 25002
10
55
咱们加入一段性能测试代码
#perf1.py from socket import * from threading import Thread import time sock = socket(AF_INET, SOCK_STREAM) sock.connect(('localhost', 25002)) n = 0 def monitor(): global n while True: time.sleep(1) print(n, 'reqs/sec') n = 0 Thread(target=monitor).start() while True: start = time.time() sock.send(b'1') resp = sock.recv(100) end = time.time() n += 1 #代码很是简单,经过全局变量n来统计qps(req/sec 每秒请求数)
在shell中运行perf1.py能够看到结果以下:
平均每秒请求数大概是10w左右
若是咱们另外启动一个进程来进行性能测试就会发现python的GIL对线程形成的影响
python3 perf1.py
而且原来的shell中的qps也是相似结果
若是咱们再运行
nc localhost 25002
40
来彻底占用服务器资源一段时间,就能够看到shell窗口内的rqs迅速降低到
这也反映了Python的GIL的一个特色,会优先处理占用CPU资源大的任务
具体缘由我也不知道,可能须要阅读GIL实现源码才能知道。
python有个库叫作cherrypy,最近用到,大体浏览了一下其源代码,其内核使用的是python线程池技术。
cherrypy经过Python线程安全的队列来维护线程池,具体实现为:
class ThreadPool(object): """A Request Queue for an HTTPServer which pools threads. ThreadPool objects must provide min, get(), put(obj), start() and stop(timeout) attributes. """ def __init__(self, server, min=10, max=-1, accepted_queue_size=-1, accepted_queue_timeout=10): self.server = server self.min = min self.max = max self._threads = [] self._queue = queue.Queue(maxsize=accepted_queue_size) self._queue_put_timeout = accepted_queue_timeout self.get = self._queue.get def start(self): """Start the pool of threads.""" for i in range(self.min): self._threads.append(WorkerThread(self.server)) for worker in self._threads: worker.setName('CP Server ' + worker.getName()) worker.start() for worker in self._threads: while not worker.ready: time.sleep(.1) .... def put(self, obj): self._queue.put(obj, block=True, timeout=self._queue_put_timeout) if obj is _SHUTDOWNREQUEST: return def grow(self, amount): """Spawn new worker threads (not above self.max).""" if self.max > 0: budget = max(self.max - len(self._threads), 0) else: # self.max <= 0 indicates no maximum budget = float('inf') n_new = min(amount, budget) workers = [self._spawn_worker() for i in range(n_new)] while not all(worker.ready for worker in workers): time.sleep(.1) self._threads.extend(workers) .... def shrink(self, amount): """Kill off worker threads (not below self.min).""" [...] def stop(self, timeout=5): # Must shut down threads here so the code that calls # this method can know when all threads are stopped. [...]
能够看出来,cherrypy的线程池将大小初始化为10,每当有一个httpconnect进来时就将其放入任务队列中,而后WorkerThread会不断从任务队列中取出任务执行,能够看到这是一个很是标准的线程池模型。
因为Python的thread没法利用多核,为了充分利用多核CPU,Python可使用了多进程来模拟线程以提升并发的性能。Python的进程代价比较高能够看作是另外再启动一个python进程。
#server_pool.py from socket import * from fib import fib from threading import Thread from concurrent.futures import ProcessPoolExecutor as Pool #这里用的python3的线程池,对应python2的threadpool pool = Pool(4) #启动一个大小为4的进程池 def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) Thread(target=fib_handler, args=(client,), daemon=True).start() def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) future = pool.submit(fib, n) result = future.result() resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002))
能够看到新的server的qps为:
这个结果远低于前面的10w qps主要缘由是进程启动速度较慢,进程池内部逻辑比较复杂,涉及到了数据传输,队列等问题。
可是经过多进程咱们能够保证每个连接相对独立,不会受其余请求太大的影响。
即便咱们使用如下耗时的命令也不会影响到性能测试
nc localhost 25502
40
协程是一个古老的概念,最先出如今早期的os中,它出现的时间甚至比线程进程还要早。
协程也是一个比较难以理解和运用的并发方式,用协程写出来的代码比较难以理解。
python中使用yield和next来实现协程的控制。
def count(n): while(n > 0): yield n #yield起到的做用是blocking,将代码阻塞在这里,生成一个generator,而后经过next调用。 n -= 1 for i in count(5): print(i) #能够看到运行结果: 5 4 3 2 1
下面咱们经过例子来介绍如何书写协程代码。首先回到以前的代码。首先咱们要想到咱们为何要用线程,固然是为了防止阻塞,
这里的阻塞来自socket的IO和cpu占用2个方面。协程的引入也是为了防止阻塞,所以咱们先将代码中的阻塞点标记出来。
#sever.py #服务端代码 from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() #blocking print('Connection', addr) fib_handler(client) def fib_handler(client): while True: req = client.recv(100) #blocking if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) #blocking print('Closed') fib_server(('', 25002)) #在25002端口启动程序
上面标记了3个socket IO阻塞点,咱们先忽略CPU占用。
#sever.py #服务端代码 from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() #blocking print('Connection', addr) fib_handler(client) def fib_handler(client): while True: yield 'recv', client req = client.recv(100) #blocking if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' yield 'send', client client.send(resp) #blocking print('Closed') fib_server(('', 25002)) #在25002端口启动程序
from socket import * from fib import fib from threading import Thread from collections import deque from concurrent.futures import ProcessPoolExecutor as Pool from select import select tasks = deque() recv_wait = {} send_wait = {} def run(): while any([tasks, recv_wait, send_wait]): while not tasks: can_recv, can_send, _ = select(recv_wait, send_wait, []) for s in can_recv: tasks.append(recv_wait.pop(s)) for s in can_send: tasks.append(send_wait.pop(s)) task = tasks.popleft() try: why, what = next(task) if why == 'recv': recv_wait[what] = task elif why == 'send': send_wait[what] = task else: raise RuntimeError("ARG!") except StopIteration: print("task done") def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() print('Connection', addr) tasks.append(fib_handler(client)) def fib_handler(client): while True: yield 'recv', client req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' yield 'send', client client.send(resp) print('Closed') tasks.append(fib_server(('', 25003))) run()
能够看到新的server的qps为:
与以前的thread模型性能比较接近,协程的好处是异步的,可是协程 仍然只能使用到一个CPU
当咱们让服务器计算40的fib从而占满cpu时,qps迅速降低到了0。
tornado是facebook出品的异步web框架,tornado中协程的使用比较简单,利用coroutine.gen装饰器能够将本身的异步函数注册进tornado的ioloop中,tornado异步方法通常的书写方式为:
@gen.coroutime def post(self): resp = yield GetUser() self.write(resp)
def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ self._running = True while True: [ ... ] if not self._running: break [ ... ] try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: if e.args == (4, "Interrupted system call"): logging.warning("Interrupted system call", exc_info=1) continue else: raise # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except KeyboardInterrupt: raise except OSError, e: if e[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)
这是tornado异步调度的核心主循环,poll()方法返回一个形如(fd: events)的键值对,并赋值给event_pairs变量,在内部的while循环中,event_pairs中的内容被一个一个的取出,而后相应的处理器会被调用,tornado经过下面的函数讲socket注册进epoll中。tornado在linux默认选择epoll,在windows下默认选择select(只能选择select)。
def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" self._handlers[fd] = handler self._impl.register(fd, events | self.ERROR)
咱们经过最简单程序运行在单机上进行性能比较
测试的语句为:
ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"
其中cherrypy的表现为:
Time taken for tests: 10.773 seconds
tornado的表现为:
Time taken for tests: 0.377 seconds
能够看出tornado的性能仍是很是惊人的,当应用程序涉及到异步IO仍是要尽可能使用tornado
本文主要介绍了python的线程、进程和协程以及其应用,并对这几种模型进行了简单的性能分析,python因为GIL的存在,不论是线程仍是协程都不能利用到多核。
略