gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,咱们能够把以前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:html
gunicorn 源码选择的版本是 20.0.0
,主要的文件及包以下:python
文件 | 描述 |
---|---|
app包 | guincorn 的 Application (不是wsgi定义的applicaton) |
http包 | gunicorn 对 http协议的一些处理 |
workers包 | gunicorn 的工做类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等 |
arbiter.py | guicorn 的master实现 |
根据gunicorn的设计特色:linux
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.nginx
gunicorn使用pre-fork 工做模型,也就是master提早fork出预约数量的work,管理worker集合。全部的request和response都由worker进程处理。web
咱们重点放在:gunicorn的服务实现,master-worker如何实现和协做上。django
编写测试app,能够看到这是一个符合wsgi规范的application:flask
# myapp.py
def app(environ, start_response): # env 和 http 状态及头设定回调
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data]) # 返回数据
复制代码
使用4个work节点,日志级别debug的方式启动服务,加载 myapp:appruby
# gunicorn -w 4 --log-level debug myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
复制代码
使用 curl
测试服务markdown
# curl http://127.0.0.1:8000
Hello, World!
复制代码
同时gunicorn中能够看到 worker=50465 处理了这个http请求session
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
复制代码
运行时候,还能够经过发送信号,手动扩充work节点数
# kill -TTIN 50462
复制代码
观察服务日志,会发现 master=50462 进程处理了 ttin
信号,而且扩展worker节点数到5
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
复制代码
使用 Ctrl+C
关闭服务,能够看到也是 master=50462 进程处理了 int
信号,而且在关闭worker节点后关闭本身
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
复制代码
若是对gunicon的参数不了解,可使用下面命令查看帮助
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
-h, --help show this help message and exit
...
-w INT, --workers INT
The number of worker processes for handling requests. [1]
复制代码
帮助使用咱们熟悉的 argparse 实现。
class Setting(object):
def add_option(self, parser):
args = tuple(self.cli)
help_txt = "%s [%s]" % (self.short, self.default)
help_txt = help_txt.replace("%", "%%")
kwargs = {
"dest": self.name,
"action": self.action or "store",
"type": self.type or str,
"default": None,
"help": help_txt
}
...
parser.add_argument(*args, **kwargs) # 添加选项
class Workers(Setting): # --workers 的选项类
name = "workers"
section = "Worker Processes"
cli = ["-w", "--workers"]
meta = "INT"
validator = validate_pos_int
type = int
default = int(os.environ.get("WEB_CONCURRENCY", 1))
desc = """\
The number of worker processes for handling requests.
A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
You'll want to vary this a bit to find the best for your particular
application's work load.
By default, the value of the ``WEB_CONCURRENCY`` environment variable.
If it is not defined, the default is ``1``.
"""
def parser(self):
kwargs = {
"usage": self.usage,
"prog": self.prog
}
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("-v", "--version",
action="version", default=argparse.SUPPRESS,
version="%(prog)s (version " + __version__ + ")\n",
help="show program's version number and exit")
parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
keys = sorted(self.settings, key=self.settings.__getitem__) # 动态添加参数选项
for k in keys:
self.settings[k].add_option(parser)
return parser
复制代码
gunicorn的application主要是下面三个类实现。须要注意的是这里的application能够理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。
3个Application梳理后,大概的代码模版以下:
class WSGIApplication(Application)
def __init__(self, usage=None, prog=None):
self.do_load_config() # 加载配置
def do_load_config():
...
cfg = self.init(parser, args, args.args) # 初始化配置
...
def init(...):
...
self.app_uri = args[0] # 获取wsgi-application参数
def load(...):
util.import_app(self.app_uri) # 动态加载wsgi-application
...
def run(...):
self.load()
Arbiter(self).run() # 启动master,也就是Arbiter
def run(): # 运行服务
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
复制代码
application部分的实现,相对比较简单,就再也不赘述。
Arbiter 仲裁者,事实上的master进程核心,整理后代码模版以下:
class Arbiter(object):
def __init__(self, app):
self.worker_class = self.cfg.worker_class # worker类
self.num_workers = self.cfg.worker # worker数量
...
def start():
self.init_signals() # 初始化信号监听
...
sock.create_socket(...) # 建立socket服务
def run(self):
self.start()
try:
self.manage_workers() # 启动节点
while True: # 无限循环
...
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 持续休眠
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
# 处理信号
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
handler()
self.wakeup() # 唤醒
except (StopIteration, KeyboardInterrupt):
...
复制代码
在了解Arbiter工做前先了解一下信号, linux 系统可使用下面命令查看信号清单
# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...
复制代码
信号是操做系统提供的事件,能够用来进行跨进程的通讯。Arbiter.init_signals 作的工做以下:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
...
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信号监听器
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
复制代码
以前演示的扩容信号 TTIN
是这样处理的 :
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1 # 扩容
self.manage_workers() # 管理worker
复制代码
Arbiter的sleep和warkeup是这样实现的:
self.PIPE = pair = os.pipe() # 建立管道
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select监听管道的数据变化
if not ready[0]:
return
while os.read(self.PIPE[0], 1): # 读取管道数据
pass
except (select.error, OSError) as e:
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.') # 管道写入
except IOError as e:
...
复制代码
须要说明的是Arbiter经过 sock.create_sockets
建立了socket,并绑定端口和监听,而后在fork-worker的时候,将socket传递给了子进程。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid # 记录worker的pid
self.WORKERS[pid] = worker # 添加到worker集合
return pid
复制代码
销毁worker是使用信号:
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`
:attr sig: `signal.SIG*` value
"""
worker_pids = list(self.WORKERS.keys())
for pid in worker_pids:
os.kill(pid, sig)
复制代码
接下来,咱们看看worker,主要是sync-worker的实现。worker的关系主要以下:
接以前Arbiter中fork-worker的代码,建立完成的work进入 init_process
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
复制代码
work的init_process模版以下:
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# For waking ourselves up
self.PIPE = os.pipe() # 建立管道
...
self.wait_fds = self.sockets + [self.PIPE[0]] # 监听管道和socket
...
self.init_signals() # 初始化信号监听
...
self.load_wsgi() # 加载wsgi的应用
...
# Enter main run loop
self.booted = True
self.run() # 工做循环
复制代码
work同样的进行信号监听:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
...
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1]) # 等待select唤醒
复制代码
work最重要的run循环:
def run(self, timeout):
listener = self.sockets[0]
while self.alive:
...
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener) # 接受客户端连接
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
...
try:
self.wait(timeout) # 休眠等待
except StopWaiting:
return
复制代码
处理客户端链接,这一部分和以前介绍http比较相似,也再也不赘述。
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
复制代码
work处理完成请求后进入等待
def wait(self, timeout):
try:
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
复制代码
能够用下面一张图展现gunicorn的工做流程,做为咱们的小结论
可使用thread,实现一个定时器
# reloader.py
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super().__init__()
self.setDaemon(True)
self._interval = interval
self._callback = callback
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
复制代码
在使用 gunicorn myapp:app
命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:
# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)
复制代码