如前文所述,arbiter是gunicorn master進(jìn)程的核心。arbiter主要負(fù)責(zé)管理worker進(jìn)程,包括啟動、監(jiān)控、殺掉worker進(jìn)程;同時,arbiter在某些信號發(fā)生的時候還可以熱更新(reload)app應(yīng)用,或者在線升級gunicorn。arbiter的核心代碼在一個文件里面,代碼量也不大,源碼在此:https://github.com/benoitc/gunicorn。
Arbiter主要有以下方法:
setup:
? ? 處理配置項,最重要的是worker數(shù)量和worker工作模型
init_signal:
? ? 注冊信號處理函數(shù)
handle_xxx:
? ? 各個信號具體的處理函數(shù)
kill_worker,kill_workers:
? ? 向worker進(jìn)程發(fā)信號
spawn_worker,?spawn_workers:
? ? fork出新的worker進(jìn)程
murder_workers:
? ? 殺掉一段時間內(nèi)未響應(yīng)的worker進(jìn)程
manage_workers:
? ??根據(jù)配置文件的worker數(shù)量,以及當(dāng)前active的worker數(shù)量,決定是要fork還是kill worker進(jìn)程
reexec:
? ? 接收到信號SIGUSR2調(diào)用,在線升級gunicorn
reload:
? ? 接收到信號SIGHUP調(diào)用,會根據(jù)新的配置新啟動worker進(jìn)程,并殺掉之前的worker進(jìn)程
sleep:
? ? 在沒有信號處理的時候,利用select的timeout進(jìn)行sleep,可被喚醒
wakeup:
? ? 通過向管道寫消息,喚醒進(jìn)程
run:
? ? 主循環(huán)
Arbiter真正被其他代碼(Application)調(diào)用的函數(shù)只有__init__和run方法,在一句代碼里:
? ? Arbiter(self).run()
上面代碼中的self即為Application實例,其中__init__調(diào)用setup進(jìn)行配置項設(shè)置。下面是run方法偽代碼
def run() self.init_signal() self.LISTENERS = create_sockets(self.cfg, self.log) self.manage_workers() while True: if no signal in SIG_QUEUE self.sleep() else: handle_signal()
?關(guān)于fork子進(jìn)程
fork子進(jìn)程的代碼在 spawn_worker, 源碼如下:
?Arbiter.spawn_worker
主要流程:
? ? (1)加載worker_class并實例化(默認(rèn)為同步模型 SyncWorker)
? ? (2)父進(jìn)程(master進(jìn)程)fork之后return,之后的邏輯都在子進(jìn)程中運行
? ? (3)調(diào)用worker.init_process 進(jìn)入循環(huán),新航道雅思培訓(xùn)的所有工作都在這個循環(huán)中
? ? (4)循環(huán)結(jié)束之后,調(diào)用sys.exit(0)
? ? (5)最后,在finally中,記錄worker進(jìn)程的退出
? ??
? ? 下面是我自己寫的一點代碼,把主要的fork流程簡化了一下
1 # prefork.py 2 import sys 3 import socket 4 import select 5 import os 6 import time 7 8 def do_sub_process(): 9 pid = os.fork()10 if pid < 0:11 print 'fork error'12 sys.exit(-1)13 elif pid > 0:14 print 'fork sub process %d' % pid15 return16 17 # must be child process18 time.sleep(1)19 print 'sub process will exit', os.getpid(), os.getppid()20 sys.exit(0)21 22 def main():23 sub_num = 224 for i in range(sub_num):25 do_sub_process()26 time.sleep(10)27 print 'main process will exit', os.getpid()28 29 if __name__ == '__main__':30 main()
在測試環(huán)境下輸出:
fork?sub?process?9601
fork?sub?process?9602
sub?process?will?exit?9601?9600
sub?process?will?exit?9602?9600
main?process?will?exit?9600
需要注意的是第20行調(diào)用了sys.exit, 保證子進(jìn)程的結(jié)束,否則會繼續(xù)main函數(shù)中for循環(huán),以及之后的邏輯。注釋掉第19行重新運行,看輸出就明白了。
關(guān)于kill子進(jìn)程
master進(jìn)程要kill worker進(jìn)程就很簡單了,直接發(fā)信號,源碼如下:
1 def kill_worker(self, pid, sig): 2 """ 3 Kill a worker 4 5 :attr pid: int, worker pid 6 :attr sig: `signal.SIG*` value 7 """ 8 try: 9 os.kill(pid, sig)10 except OSError as e:11 if e.errno == errno.ESRCH:12 try:13 worker = self.WORKERS.pop(pid)14 worker.tmp.close()15 self.cfg.worker_exit(self, worker)16 return17 except (KeyError, OSError):18 return19 raise
關(guān)于sleep與wakeup
我們再來看看Arbiter的sleep和wakeup。Arbiter在沒有信號需要處理的時候會”sleep”,當(dāng)然,不是真正調(diào)用time.sleep,否則信號來了也不能第一時間處理。這里得實現(xiàn)比較巧妙,利用了管道和select的timeout??创a就知道了
def sleep(self): """ Sleep until PIPE is readable or we timeout. A readable PIPE means a signal occurred. """ ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe() if not ready[0]: return while os.read(self.PIPE[0], 1): pass
代碼里面的注釋寫得非常清楚,要么PIPE可讀立即返回,要么等待超時。管道可讀是因為有信號發(fā)生。這里看看pipe函數(shù)
-
os.pipe()
-
Create a pipe. Return a pair of file descriptors?(r,w)?usable for reading and writing, respectively.
那我們看一下什么時候管道可讀:肯定是往管道寫入的東西,這就是wakeup函數(shù)的功能
def wakeup(self): """ Wake up the arbiter by writing to the PIPE """ os.write(self.PIPE[1], b'.')
最后附上Arbiter的信號處理:
退出,INT:快速關(guān)閉
TERM:?優(yōu)雅關(guān)機(jī)。等待工作人員完成其當(dāng)前請求,直到超時。
HUP:重新加載配置,用新配置啟動新的工作進(jìn)程,并優(yōu)雅地關(guān)閉舊的工作進(jìn)程。如果應(yīng)用程序未預(yù)加載(使用–preload選項),Gunicorn也將加載新版本。
TTIN:將進(jìn)程數(shù)增加一個
TTOU:將進(jìn)程數(shù)減少一個
USR1:重新打開日志文件
USR2:在飛行中升級Gunicorn。應(yīng)使用單獨的術(shù)語信號終止舊進(jìn)程。此信號也可用于使用預(yù)加載應(yīng)用程序的新版本。
絞盤:當(dāng)Gunicorn被守護(hù)時,優(yōu)雅地關(guān)閉工作進(jìn)程。