Gunicorn 源码阅读

gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,咱们能够把以前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:html

  • gunicorn 项目结构简介
  • gunicorn 使用
  • gunicorn-application 实现
  • arbiter实现
  • sync-worker实现
  • 小结
  • 小技巧

gunicorn 项目结构简介

gunicorn 源码选择的版本是 20.0.0,主要的文件及包以下:python

文件 描述
app包 guincorn 的 Application (不是wsgi定义的applicaton)
http包 gunicorn 对 http协议的一些处理
workers包 gunicorn 的工做类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等
arbiter.py guicorn 的master实现

根据gunicorn的设计特色:linux

Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.nginx

gunicorn使用pre-fork 工做模型,也就是master提早fork出预约数量的work,管理worker集合。全部的request和response都由worker进程处理。web

咱们重点放在:gunicorn的服务实现,master-worker如何实现和协做上。django

gunicorn 使用

编写测试app,能够看到这是一个符合wsgi规范的application:flask

# myapp.py

def app(environ, start_response):  # env 和 http 状态及头设定回调
    data = b"Hello, World!\n"
    start_response("200 OK", [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data)))
    ])
    return iter([data])  # 返回数据
复制代码

使用4个work节点,日志级别debug的方式启动服务,加载 myapp:appruby

# gunicorn -w 4 --log-level debug  myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration:  # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0  # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted  # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462)  # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
复制代码

使用 curl 测试服务markdown

# curl http://127.0.0.1:8000
Hello, World!
复制代码

同时gunicorn中能够看到 worker=50465 处理了这个http请求session

[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
复制代码

运行时候,还能够经过发送信号,手动扩充work节点数

# kill -TTIN 50462
复制代码

观察服务日志,会发现 master=50462 进程处理了 ttin 信号,而且扩展worker节点数到5

...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
复制代码

使用 Ctrl+C 关闭服务,能够看到也是 master=50462 进程处理了 int 信号,而且在关闭worker节点后关闭本身

^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
复制代码

若是对gunicon的参数不了解,可使用下面命令查看帮助

# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]

optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]
复制代码

帮助使用咱们熟悉的 argparse 实现。

class Setting(object):
			
	def add_option(self, parser):
        args = tuple(self.cli)

        help_txt = "%s [%s]" % (self.short, self.default)
        help_txt = help_txt.replace("%", "%%")

        kwargs = {
            "dest": self.name,
            "action": self.action or "store",
            "type": self.type or str,
            "default": None,
            "help": help_txt
        }
        ...
        parser.add_argument(*args, **kwargs)  # 添加选项

class Workers(Setting):  # --workers 的选项类
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = int
    default = int(os.environ.get("WEB_CONCURRENCY", 1))
    desc = """\
        The number of worker processes for handling requests.

        A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
        You'll want to vary this a bit to find the best for your particular
        application's work load.

        By default, the value of the ``WEB_CONCURRENCY`` environment variable.
        If it is not defined, the default is ``1``.
        """

def parser(self):
    kwargs = {
        "usage": self.usage,
        "prog": self.prog
    }
    parser = argparse.ArgumentParser(**kwargs)
    parser.add_argument("-v", "--version",
            action="version", default=argparse.SUPPRESS,
            version="%(prog)s (version " + __version__ + ")\n",
            help="show program's version number and exit")
    parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)

    keys = sorted(self.settings, key=self.settings.__getitem__)  # 动态添加参数选项
    for k in keys:
        self.settings[k].add_option(parser)

    return parser
复制代码

gunicorn-application 实现

gunicorn的application主要是下面三个类实现。须要注意的是这里的application能够理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。

  • BaseApplication
    • Application
      • WSGIApplication

3个Application梳理后,大概的代码模版以下:

class WSGIApplication(Application)
	
	def __init__(self, usage=None, prog=None):
		self.do_load_config()  # 加载配置
			
	def do_load_config():
		...
		cfg = self.init(parser, args, args.args)  # 初始化配置
		...
	
	def init(...):
	    ...
	    self.app_uri = args[0]  # 获取wsgi-application参数
	
  def load(...):
  		util.import_app(self.app_uri)  # 动态加载wsgi-application
  		...
	
	def run(...):
		self.load()
		Arbiter(self).run()  # 启动master,也就是Arbiter

def run():  # 运行服务
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

if __name__ == '__main__':
    run()
复制代码

application部分的实现,相对比较简单,就再也不赘述。

arbiter实现

Arbiter 仲裁者,事实上的master进程核心,整理后代码模版以下:

class Arbiter(object):
	def __init__(self, app):
		self.worker_class = self.cfg.worker_class  # worker类
		self.num_workers = self.cfg.worker  # worker数量
        ...
    
    def start():
		self.init_signals()  # 初始化信号监听
		...
		sock.create_socket(...) # 建立socket服务
    
    def run(self):  
		self.start()
		try:
            self.manage_workers() # 启动节点

            while True: #  无限循环
                ...
                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()  # 持续休眠
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue
    							# 处理信号
                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                ...
                handler()
                self.wakeup()  # 唤醒
        except (StopIteration, KeyboardInterrupt):
           ...
    		
复制代码

在了解Arbiter工做前先了解一下信号, linux 系统可使用下面命令查看信号清单

# kill -l
 1) SIGHUP	 2) SIGINT	 3) SIGQUIT	 4) SIGILL	 5) SIGTRAP
 6) SIGABRT	 7) SIGBUS	 8) SIGFPE	 9) SIGKILL	10) SIGUSR1
 11) SIGSEGV	12) SIGUSR2	13) SIGPIPE	14) SIGALRM	15) SIGTERM
...
复制代码
  • 1 (SIGHUP): terminate a connection, or reload the configuration for daemons
  • 2 (SIGINT): interrupt the session from the dialogue station
  • 3 (SIGQUIT): terminate the session from the dialogue station
  • 4 (SIGILL): illegal instruction was executed
  • 5 (SIGTRAP): do a single instruction (trap)
  • 6 (SIGABRT): abnormal termination
  • 7 (SIGBUS): error on the system bus
  • 8 (SIGFPE): floating point error
  • 9 (SIGKILL): immmediately terminate the process
  • 10 (SIGUSR1): user-defined signal
  • 11 (SIGSEGV): segmentation fault due to illegal access of a memory segment
  • 12 (SIGUSR2): user-defined signal
  • 13 (SIGPIPE): writing into a pipe, and nobody is reading from it
  • 14 (SIGALRM): the timer terminated (alarm)
  • 15 (SIGTERM): terminate the process in a soft way

信号是操做系统提供的事件,能够用来进行跨进程的通讯。Arbiter.init_signals 作的工做以下:

SIGNALS = [getattr(signal, "SIG%s" % x)
               for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
               
def init_signals(self):
    ...

    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)
    signal.signal(signal.SIGCHLD, self.handle_chld)  # 添加信号监听器

def signal(self, sig, frame):
	if len(self.SIG_QUEUE) < 5:
			self.SIG_QUEUE.append(sig)
			self.wakeup()
复制代码

以前演示的扩容信号 TTIN 是这样处理的 :

def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1  # 扩容 
    self.manage_workers()  # 管理worker 
复制代码

Arbiter的sleep和warkeup是这样实现的:

self.PIPE = pair = os.pipe()  # 建立管道
	
def sleep(self):
	"""\
    Sleep until PIPE is readable or we timeout.
    A readable PIPE means a signal occurred.
    """
    try:
        ready = select.select([self.PIPE[0]], [], [], 1.0)  # 使用select监听管道的数据变化
        if not ready[0]:
            return
        while os.read(self.PIPE[0], 1):  # 读取管道数据
            pass
    except (select.error, OSError) as e:
        ...
        
def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')  # 管道写入
    except IOError as e:
        ...
    
复制代码

须要说明的是Arbiter经过 sock.create_sockets 建立了socket,并绑定端口和监听,而后在fork-worker的时候,将socket传递给了子进程。

worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
		worker.pid = pid  # 记录worker的pid
    self.WORKERS[pid] = worker # 添加到worker集合
    return pid
复制代码

销毁worker是使用信号:

def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)
复制代码

sync-worker实现

接下来,咱们看看worker,主要是sync-worker的实现。worker的关系主要以下:

  • Worker 处理信号
    • SyncWorker 同步处理http请求
    • ThreadWorker 使用线程处理http请求

接以前Arbiter中fork-worker的代码,建立完成的work进入 init_process

# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)
复制代码

work的init_process模版以下:

def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """
    # For waking ourselves up
    self.PIPE = os.pipe()  # 建立管道
    ...
    self.wait_fds = self.sockets + [self.PIPE[0]]  # 监听管道和socket
			...
    self.init_signals()  # 初始化信号监听
			...
    self.load_wsgi()  # 加载wsgi的应用
    ...
    # Enter main run loop
    self.booted = True
    self.run()  # 工做循环
复制代码

work同样的进行信号监听:

SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
    		signal.signal(s, signal.SIG_DFL)
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    ...

    if hasattr(signal, 'set_wakeup_fd'):
    		signal.set_wakeup_fd(self.PIPE[1])  # 等待select唤醒
复制代码

work最重要的run循环:

def run(self, timeout):
    listener = self.sockets[0]
    while self.alive:
					...
        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)  # 接受客户端连接
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
              ...

        try:
            self.wait(timeout)  # 休眠等待 
        except StopWaiting:
            return
复制代码

处理客户端链接,这一部分和以前介绍http比较相似,也再也不赘述。

def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)
复制代码

work处理完成请求后进入等待

def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise
复制代码

小结

能够用下面一张图展现gunicorn的工做流程,做为咱们的小结论

Request flow of Django with Gunicorn and Nginx as a reverse proxy.

小技巧

可使用thread,实现一个定时器

# reloader.py

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)
复制代码

在使用 gunicorn myapp:app 命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:

# util.py

klass = components.pop(-1)

mod = importlib.import_module('.'.join(components))

return getattr(mod, klass)
复制代码

参考连接

相关文章
相关标签/搜索