帶你學習swoole_process

帶你學習swoole_process

推薦(免費):swoole

swoole 的進程之間有兩種通信方式,一種是消息隊列(queue),另一種是管道(pipe),對swoole_process 的研究在swoole中顯得尤為重要。

預備知識

IO多路復用

swoole 中的io多路復用表現為底層的 epoll進程模型,在c語言中表現為 epoll 函數。

epoll 模型下會持續監聽自己名下的素有socket 描述符 fd
當觸發了 socket 監聽的事件時,epoll 函數才會響應,并返回所有監聽該時間的 socket 集合
epoll 的本質是阻塞IO,它的優點在于能同事處理大量socket連接
Event loop 事件循環

swoole 對 epoll 實現了一個Reactor線程模型封裝,設置了read事件和write事件的監聽回調函數。(詳見swoole_event_add)

Event loop 是一個Reactor線程,其中運行了一個epoll實例。
通過swoole_event_add將socket描述符的一個事件添加到epoll監聽中,事件發生時將執行回調函數
不可用于fpm環境下,因為fpm在任務結束時可能會關掉進程。

swoole_process

基于C語言封裝的進程管理模塊,方便php來調用
內置管道、消息隊列接口,方便實現進程間通信
我們在php-fpm.conf配置文件中發現,php-fpm中有兩種進程池管理設置。

靜態模式 即初始化固定的進程數,當來了一個請求時,從中選取一個進程來處理。
動態模式 指定最小、最大進程數,當請求量過大,進程數不超過最大限制時,新增線程去處理請求

接下來用swoole代碼來實現,這里只是為理解swoole_process、進程間通信、定時器等使用,實際情況使用封裝好的swoole_server來實現task任務隊列池會更方便。

假如有個定時投遞的任務隊列:

<?php /**  * 動態進程池,類似fpm  * 動態新建進程  * 有初始進程數,最小進程數,進程不夠處理時候新建進程,不超過最大進程數  */// 一個進程定時投遞任務/**  * 1. tick  * 2. process及其管道通訊  * 3. event loop 事件循環  */class processPool{   private $pool;    /**    * @var swoole_process[] 記錄所有worker的process對象    */   private $workers = [];    /**    * @var array 記錄worker工作狀態    */   private $used_workers = [];    /**    * @var int 最小進程數    */   private $min_woker_num = 5;    /**    * @var int 初始進程數    */   private $start_worker_num = 10;    /**    * @var int 最大進程數    */   private $max_woker_num = 20;    /**    * 進程閑置銷毀秒數    * @var int    */   private $idle_seconds = 5;    /**    * @var int 當前進程數    */   private $curr_num;    /**    * 閑置進程時間戳    * @var array    */   private $active_time = [];    public function __construct()   {     $this->pool?=?new?swoole_process(function?()?{ ??????//?循環建立worker進程 ??????for?($i?=?0;?$i?start_worker_num;?$i++)?{ ????????$this-&gt;createWorker(); ??????} ??????echo?'初始化進程數:'?.?$this-&gt;curr_num?.?PHP_EOL; ??????//?每秒定時往閑置的worker的管道中投遞任務 ??????swoole_timer_tick(1000,?function?($timer_id)?{ ????????static?$count?=?0; ????????$count++; ????????$need_create?=?true; ????????foreach?($this-&gt;used_workers?as?$pid?=&gt;?$used)?{ ??????????if?($used?==?0)?{ ????????????$need_create?=?false; ????????????$this-&gt;workers[$pid]-&gt;write($count?.?'?job'); ????????????//?標記使用中 ????????????$this-&gt;used_workers[$pid]?=?1; ????????????$this-&gt;active_time[$pid]?=?time(); ????????????break; ??????????} ????????} ????????foreach?($this-&gt;used_workers?as?$pid?=&gt;?$used) ??????????//?如果所有worker隊列都沒有閑置的,則新建一個worker來處理 ??????????if?($need_create?&amp;&amp;?$this-&gt;curr_num?max_woker_num)?{ ????????????$new_pid?=?$this-&gt;createWorker(); ????????????$this-&gt;workers[$new_pid]-&gt;write($count?.?'?job'); ????????????$this-&gt;used_workers[$new_pid]?=?1; ????????????$this-&gt;active_time[$new_pid]?=?time(); ??????????}  ????????//?閑置超過一段時間則銷毀進程 ????????foreach?($this-&gt;active_time?as?$pid?=&gt;?$timestamp)?{ ??????????if?((time()?-?$timestamp)?&gt;?$this-&gt;idle_seconds?&amp;&amp;?$this-&gt;curr_num?&gt;?$this-&gt;min_woker_num)?{ ????????????//?銷毀該進程 ????????????if?(isset($this-&gt;workers[$pid])?&amp;&amp;?$this-&gt;workers[$pid]?instanceof?swoole_process)?{ ??????????????$this-&gt;workers[$pid]-&gt;write('exit'); ??????????????unset($this-&gt;workers[$pid]); ??????????????$this-&gt;curr_num?=?count($this-&gt;workers); ??????????????unset($this-&gt;used_workers[$pid]); ??????????????unset($this-&gt;active_time[$pid]); ??????????????echo?"{$pid}?destroyedn"; ??????????????break; ????????????} ??????????} ????????}  ????????echo?"任務{$count}/{$this-&gt;curr_num}n";  ????????if?($count?==?20)?{ ??????????foreach?($this-&gt;workers?as?$pid?=&gt;?$worker)?{ ????????????$worker-&gt;write('exit'); ??????????} ??????????//?關閉定時器 ??????????swoole_timer_clear($timer_id); ??????????//?退出進程池 ??????????$this-&gt;pool-&gt;exit(0); ??????????exit(); ????????} ??????});  ????});  ????$master_pid?=?$this-&gt;pool-&gt;start(); ????echo?"Master?$master_pid?startn";  ????while?($ret?=?swoole_process::wait())?{ ??????$pid?=?$ret['pid']; ??????echo?"process?{$pid}?existedn"; ????} ??}  ??/** ???*?創建一個新進程 ???*?@return?int?新進程的pid ???*/ ??public?function?createWorker() ??{ ????$worker_process?=?new?swoole_process(function?(swoole_process?$worker)?{ ??????//?給子進程管道綁定事件 ??????swoole_event_add($worker-&gt;pipe,?function?($pipe)?use?($worker)?{ ????????$data?=?trim($worker-&gt;read()); ????????if?($data?==?'exit')?{ ??????????$worker-&gt;exit(0); ??????????exit(); ????????} ????????echo?"{$worker-&gt;pid}?正在處理?{$data}n"; ????????sleep(5); ????????//?返回結果,表示空閑 ????????$worker-&gt;write("complete"); ??????}); ????});  ????$worker_pid?=?$worker_process-&gt;start();  ????//?給父進程管道綁定事件 ????swoole_event_add($worker_process-&gt;pipe,?function?($pipe)?use?($worker_process)?{ ??????$data?=?trim($worker_process-&gt;read()); ??????if?($data?==?'complete')?{ ????????//?標記為空閑//????????echo?"{$worker_process-&gt;pid}?空閑了n"; ????????$this-&gt;used_workers[$worker_process-&gt;pid]?=?0; ??????} ????});  ????//?保存process對象 ????$this-&gt;workers[$worker_pid]?=?$worker_process; ????//?標記為空閑 ????$this-&gt;used_workers[$worker_pid]?=?0; ????$this-&gt;active_time[$worker_pid]?=?time(); ????$this-&gt;curr_num?=?count($this-&gt;workers); ????return?$worker_pid; ??}}new?processPool();

以上內容希望幫助到大家

? 版權聲明
THE END
喜歡就支持一下吧
點贊5 分享