swoole如何實(shí)現(xiàn)實(shí)時推送

swoole如何實(shí)現(xiàn)實(shí)時推送

?swoole+redis實(shí)現(xiàn)實(shí)時數(shù)據(jù)推送 ? ? ? ? ? (推薦學(xué)習(xí): swoole視頻教程

<?php /**  * ***************************************  *            單進(jìn)程保護(hù)                 *  * ***************************************  */ $phpSelf 			= realpath($_SERVER[&#39;PHP_SELF&#39;]); $lockFile			= $phpSelf.&#39;.lock&#39;; $lockFileHandle 	= fopen($lockFile, "w"); if ($lockFileHandle == false) { 	exit("Can not create lock file $lockFilen"); } if (!flock($lockFileHandle, LOCK_EX + LOCK_NB)) { 	exit(date("Y-m-d H:i:s")."Process already exist.n"); }   /**  * ***************************************  *     進(jìn)入程序,定義相關(guān)配置            *  * ***************************************  */ set_time_limit(0); //socket會話的超時時間,根據(jù)業(yè)務(wù)場景設(shè)置,這里設(shè)置為永不超時 //如果設(shè)置了時間,則從socket建立=>傳輸=&gt;關(guān)閉整個過程必須在定義的時間內(nèi)完成,否則自動close該socket并拋出warning ini_set('default_socket_timeout',?-1); $conf?=?array( 	'listen'??=&gt;?array('host'?=&gt;?'0.0.0.0','port'?=&gt;?'8008'), 	'setting'?=&gt;?array( 		//程序允許的最大連接數(shù),用以設(shè)置server最大允許維持多少個TCP連接,超過該數(shù)量后,新連接將被拒絕,默認(rèn)為ulimit?-n的值,如果設(shè)置大于ulimit?-n則強(qiáng)制重置為ulimit-?n,如果確實(shí)需要設(shè)置超過ulimit?-n的值,請修改系統(tǒng)值?vim?/etc/security/limits.conf?修改nofile的值 		"max_conn"			=&gt;?1024, 		//啟用CPU親和設(shè)置(在全異步非阻塞是可啟用),在多核的服務(wù)器中,啟用此特性會將swoole的reactor線程/worker進(jìn)程綁定到固定的一個核上。可以避免進(jìn)程/線程的運(yùn)行時在多個核之間互相切換,提高CPU?Cache的命中率,如何確定綁定在了哪個核上,請參考文檔,?查看命令:?taskset?-p?進(jìn)程id 		'open_cpu_affinity'	=&gt;?0, 		//配置task進(jìn)程數(shù)量,配置此參數(shù)后將會啟用task功能。所以Server務(wù)必要注冊onTask、onFinish2個事件回調(diào)函數(shù)。如果沒有注冊,服務(wù)器程序?qū)o法啟動.Task進(jìn)程是同步阻塞的,配置方式與Worker同步模式一致。 		'task_worker_num'	=&gt;?20, 		//設(shè)置task進(jìn)程的最大任務(wù)數(shù)。一個task進(jìn)程在處理完超過此數(shù)值的任務(wù)后將自動退出。這個參數(shù)是為了防止PHP進(jìn)程內(nèi)存溢出。如果不希望進(jìn)程自動退出可以設(shè)置為0,?默認(rèn)是0 		'task_max_request'	=&gt;?1024,? 		//設(shè)置task的數(shù)據(jù)臨時目錄,在swoole_server中,如果投遞的數(shù)據(jù)超過8192字節(jié),將啟用臨時文件來保存數(shù)據(jù)。這里的task_tmpdir就是用來設(shè)置臨時文件保存的位置。 		'task_tmpdir'		=&gt;?'/tmp/', 		//worker進(jìn)程數(shù)量,根據(jù)業(yè)務(wù)代碼的模式作調(diào)整,全異步非阻塞可設(shè)置為CPU核數(shù)的1-4倍;同步阻塞,請參考文檔調(diào)整 		'worker_num'		=&gt;?8, 		//指定swoole錯誤日志文件 		'log_file'?			=&gt;?'/tmp/log/log.txt', 		//SSL公鑰和私鑰的位置,啟用wss必須在編譯swoole時加入--enable-openssl選項(xiàng) 		'ssl_cert_file'		=&gt;?'/usr/local/nginx/conf/server.cer', 		'ssl_key_file'		=&gt;?'/usr/local/nginx/conf/server.key', 	), ); ? /** ?*?*************************************** ?*???????初始化Redis連接?????????????????* ?*?*************************************** ?*/ $redis?=?null; $redis?=?new?Redis(); $redis-&gt;connect(REDIS_HOST,?REDIS_PORT); $redis-&gt;auth(REDIS_PWD); $GLOBALS['redis']=$redis; ? /** ?*?*************************************** ?*????????腳本重啟時,清除歷史的數(shù)據(jù)?????* ?*?*************************************** ?*/ $sArr?=?$redis-&gt;sMembers(REDIS_S_KEY); if?(!empty($sArr))?{ 	foreach?((array)$sArr?as?$key?=&gt;?$sc)?{ 		$fdArr?=?$redis-&gt;sMembers(REDIS_S_FD.$sc); 		foreach?((array)$fdArr?as?$k?=&gt;?$fd)?{ 			$res1?=?$redis-&gt;del(REDIS_FD_S.$fd); 		} 		$res2?=?$redis-&gt;del(REDIS_S_FD.$sc); 	} 	$redis-&gt;del(REDIS_S_KEY); } $redis-&gt;del(REDIS_ZS_KEY); ? /** ?*?*************************************** ?*???????????綁定回調(diào)事件????????????????* ?*?*************************************** ?*/ $ws?=?null; //wss服務(wù) $ws?=?new?swoole_websocket_server($conf['listen']['host'],?$conf['listen']['port'],?SWOOLE_PROCESS,?SWOOLE_SOCK_TCP?|?SWOOLE_SSL); $ws-&gt;set($conf['setting']); ? /** ?*?Server啟動在主進(jìn)程的主線程回調(diào)此函數(shù) ?*?在此事件之前Swoole?Server已進(jìn)行了如下操作 ?*?已創(chuàng)建了manager進(jìn)程 ?*?已創(chuàng)建了worker子進(jìn)程 ?*?已監(jiān)聽所有TCP/UDP端口 ?*?已監(jiān)聽了定時器 ?*?在onStart中創(chuàng)建的全局資源對象不能在worker進(jìn)程中被使用,因?yàn)榘l(fā)生onStart調(diào)用時,worker進(jìn)程已經(jīng)創(chuàng)建好了。新創(chuàng)建的對象在主進(jìn)程內(nèi),worker進(jìn)程無法訪問到此內(nèi)存區(qū)域。因此全局對象創(chuàng)建的代碼需要放置在swoole_server_start之前 ?*/ $ws-&gt;on('start',?function?($ws)?{ 	swoole_set_process_name(PROCESS_NAME.'_master'); }); ? /** ?*?與onStart回調(diào)在不同進(jìn)程中并行執(zhí)行的回調(diào)函數(shù)(不存在先后順序) ?*?@param:?$ws?swoole_websocket_server?object ?*?@param:?$wid?創(chuàng)建該進(jìn)程時swoole分配的id(不是進(jìn)程id) ?*?注意點(diǎn): ?*?1.?此事件在worker進(jìn)程/task進(jìn)程啟動時發(fā)生。onWorkerStart/onStart是并發(fā)執(zhí)行的,沒有先后順序,這里創(chuàng)建的對象可以在進(jìn)程生命周期內(nèi)使用 ?*?2.?swoole1.6.11之后task_worker中也會觸發(fā)onWorkerStart,故而在下面的處理中,加入了判斷業(yè)務(wù)類型$jobType是task還是work,如果是task則命名為****_Tasker_$id,如果是worker則命名為****_Worker_$id ?*?3.?發(fā)生PHP致命錯誤或者代碼中主動調(diào)用exit時,Worker/Task進(jìn)程會退出,管理進(jìn)程會重新創(chuàng)建新的進(jìn)程 ?*?5.?如果想使用swoole_server_reload實(shí)現(xiàn)代碼重載入,必須在workerStart中require你的業(yè)務(wù)文件,而不是在文件頭部。在onWorkerStart調(diào)用之前已包含的文件,不會重新載入代碼。 ?*?6.?可以將公用的,不易變的php文件放置到onWorkerStart之前(例如上面的redis配置)。這樣雖然不能重載入代碼,但所有worker是共享的,不需要額外的內(nèi)存來保存這些數(shù)據(jù)。 ?*?7.?onWorkerStart之后的代碼每個worker都需要在內(nèi)存中保存一份 ?*/ $ws-&gt;on('workerstart',?function?($ws,?$wid)?{ 	$jobType?=?$ws-&gt;taskworker???'Tasker'?:?'Worker'; 	swoole_set_process_name(PROCESS_NAME.'_'.$jobType.'_'.$wid); 	$GLOBALS['ws']?=?$ws;?//保存server對象到全局中以待使用 	if?($jobType?==?'Worker')?{?//在某個worker進(jìn)程上綁定redis訂閱進(jìn)程 		if?($wid?===?0)?{ ????????????$dataRedis?=?null; ????????????$dataRedis?=?new?Redis(); ????????????$dataRedis-&gt;connect(REDIS_HOST_DATA,?REDIS_PORT_DATA); ????????????$dataRedis-&gt;auth(REDIS_PWD_DATA); ????????????//使用psubscribe訂閱指定模式的頻道,這里*表示所有頻道 ????????????//請注意,redis訂閱不提供區(qū)分庫(db)的功能,所以多個庫都同時在發(fā)布同一個名字的頻道時,都將被訂閱到 			$dataRedis-&gt;psubscribe(array("*"),?"sendTask"); 		} 	} }); ? /** ?*?管理進(jìn)程啟用時,調(diào)用該回調(diào)函數(shù) ?*?注意manager進(jìn)程中不能添加定時器 ?*?manager進(jìn)程中可以調(diào)用sendMessage接口向其他工作進(jìn)程發(fā)送消息 ?*/ $ws-&gt;on('managerstart',?function?($ws)?{ 	swoole_set_process_name(PROCESS_NAME.'_manage'); }); ? /** ?*?swoole?websocket服務(wù)特有的回調(diào)函數(shù),此函數(shù)在websocket服務(wù)器中必須定義實(shí)現(xiàn),否則websocket服務(wù)將無法啟動 ?*?當(dāng)服務(wù)器收到來自客戶端的數(shù)據(jù)幀時會回調(diào)此函數(shù) ?*?@param:?$ws為swoole_websocket_server對象,其結(jié)構(gòu)在調(diào)試時可var_dump查看 ?*?@param:?$frame為swoole_websocket_frame對象,包含了客戶端發(fā)來的數(shù)據(jù)幀信息,包含以下四個屬性: ?*?@param:?$frame-&gt;fd:?客戶端的socket?id,每個id對應(yīng)一個客戶端,推送消息的時候需要指定 ?*?@param:?$frame-&gt;data:?數(shù)據(jù)內(nèi)容,可以是文本內(nèi)容或者是二進(jìn)制數(shù)據(jù)(圖片等),可以通過opcode的值來判斷。$data?如果是文本類型,編碼格式必然是UTF-8,這是WebSocket協(xié)議規(guī)定的 ?*?@param:?$frame-&gt;opcode:?WebSocket的OpCode類型,可以參考WebSocket協(xié)議標(biāo)準(zhǔn)文檔,?WEBSOCKET_OPCODE_TEXT?=?0x1?,文本數(shù)據(jù);?WEBSOCKET_OPCODE_BINARY?=?0x2?,二進(jìn)制數(shù)據(jù) ?*?@param:?$frame-&gt;finish:?表示數(shù)據(jù)幀是否完整,一個WebSocket請求可能會分成多個數(shù)據(jù)幀進(jìn)行發(fā)送 ?*?注意點(diǎn):?客戶端發(fā)送的ping幀不會觸發(fā)onMessage,底層會自動回復(fù)pong包 ?*/ $ws-&gt;on('message',?function?($ws,?$frame)?{ ????echo?"Server?has?receive?messagen"; ????//接收到客戶端請求,并建立連接之后,進(jìn)行相應(yīng)業(yè)務(wù)的處理 ????handleClientData($ws,?$frame); }); ? /** ?*?在task_worker進(jìn)程內(nèi)被調(diào)用。worker進(jìn)程可以使用swoole_server_task函數(shù)向task_worker進(jìn)程投遞新的任務(wù)(此處使用的是taskwait) ?*?當(dāng)前的Task進(jìn)程在調(diào)用onTask回調(diào)函數(shù)時會將進(jìn)程狀態(tài)切換為忙碌,這時將不再接收新的Task,當(dāng)onTask函數(shù)返回時會將進(jìn)程狀態(tài)切換為空閑然后繼續(xù)接收新的Task。 ?*?@param:?$ws?swoole_websocket_server?object ?*?@param:?$tid?task?process?id ?*?@param:?$wid?from?id?表示來自哪個Worker進(jìn)程。$task_id和$wid組合起來才是全局唯一的,不同的worker進(jìn)程投遞的任務(wù)ID可能會有相同 ?*?@param:?$data?需要執(zhí)行的任務(wù)內(nèi)容 ?*?注意點(diǎn):?onTask函數(shù)執(zhí)行時遇到致命錯誤退出,或者被外部進(jìn)程強(qiáng)制kill,當(dāng)前的任務(wù)會被丟棄,但不會影響其他正在排隊(duì)的Task ?*/ $ws-&gt;on('task',?function?($ws,?$tid,?$wid,?$data)?{ 	switch?($data['cmd'])?{ 		case?'pushToClient':?$ret?=?pushToClientTask($ws,?$data['key'],?$data['val']);?break; 	} 	//1.7.2以上的版本,在onTask函數(shù)中?return字符串,表示將此內(nèi)容返回給worker進(jìn)程。worker進(jìn)程中會觸發(fā)onFinish函數(shù),表示投遞的task已完成。return的變量可以是任意非null的PHP變量 	return?$returnContent; 	//1.7.2以前的版本,需要調(diào)用swoole_server-&gt;finish()函數(shù)將結(jié)果返回給worker進(jìn)程 	//?$ws-&gt;finish($data); }); ? /** ?*?當(dāng)worker進(jìn)程投遞的任務(wù)在task_worker中完成時,task進(jìn)程會通過$ws-&gt;finish()方法將任務(wù)處理的結(jié)果發(fā)送給worker進(jìn)程。 ?*?@param:?$ws?swoole_websocket_server?object ?*?@param:?$tid?task_id ?*?@param:?$data?任務(wù)處理后的結(jié)果內(nèi)容 ?*?注意點(diǎn):?task進(jìn)程的onTask事件中沒有調(diào)用finish方法或者return結(jié)果,worker進(jìn)程不會觸發(fā)onFinish ?*????????執(zhí)行onFinish邏輯的worker進(jìn)程與下發(fā)task任務(wù)的worker進(jìn)程是同一個進(jìn)程 ?*/ $ws-&gt;on('finish',?function($ws,?$tid,?$data)?{ ? }); ? /** ?*?TCP客戶端連接關(guān)閉后,在worker進(jìn)程中回調(diào)此函數(shù) ?*?在函數(shù)中可以做一些類似于刪除業(yè)務(wù)中與每個客戶端交互時存放的數(shù)據(jù)的操作 ?*?@param:?$ws?swoole_websocket_server?object ?*?@param:?$fd?已關(guān)閉的fd?interger ?*?@param:?$rid(可選),來自哪個reactor線程 ?*?注意點(diǎn):? ?*?1.?onClose回調(diào)函數(shù)如果發(fā)生了致命錯誤,會導(dǎo)致連接泄漏。通過netstat命令會看到大量CLOSE_WAIT狀態(tài)的TCP連接 ?*?2.?查看命令netstat?-anopc?|?grep?端口號,可以查看到TCP接收和發(fā)送隊(duì)列是否有堆積以及TCP連接的狀態(tài) ?*?3.?無論由客戶端發(fā)起close還是服務(wù)器端主動調(diào)用$serv-&gt;close()關(guān)閉連接,都會觸發(fā)此事件。因此只要連接關(guān)閉,就一定會回調(diào)此函數(shù) ?*?4.?1.7.7+版本以后onClose中依然可以調(diào)用connection_info方法獲取到連接信息,在onClose回調(diào)函數(shù)執(zhí)行完畢后才會調(diào)用close關(guān)閉TCP連接 ?*?5.?這里回調(diào)onClose時表示客戶端連接已經(jīng)關(guān)閉,所以無需執(zhí)行$server-&gt;close($fd)。代碼中執(zhí)行$serv-&gt;close($fd)會拋出PHP錯誤告警。也就是在onclose中不能再$ws-&gt;close()了. ?*?6.?swoole-1.9.7版本修改了$reactorId參數(shù),當(dāng)服務(wù)器主動關(guān)閉連接時,底層會設(shè)置此參數(shù)為-1,可以通過判斷$reactorId?on('close',?function?($ws,?$fd)?{ 	$redis?=?new?Redis(); 	$redis-&gt;connect(REDIS_HOST,?REDIS_PORT); 	$redis-&gt;auth(REDIS_PWD); 	$sArr?=?$redis-&gt;sMembers(REDIS_FD_S.$fd); 	if?(!empty($sArr))?{ 		foreach?((array)$sArr?as?$key?=&gt;?$sc)?{ 			$res?=?$redis-&gt;sRem(REDIS_S_FD.$sc,?$fd); 			$num?=?$redis-&gt;sCard(REDIS_S_FD.$sc); 			if?($num?==?'0')?{ 				$redis-&gt;sRem(REDIS_S_KEY,?$sc); 				$redis-&gt;hDel(REDIS_ZS_KEY,?$sc); 			} 		} 	} 	$redis-&gt;del(REDIS_FD_S.$fd); 	$redis-&gt;close(); 	echo?"FD?$fd?has?closed.n"; }); ? /** ?*?開啟swoole_websocket_server服務(wù) ?*/ $ws-&gt;start(); ? ? /** ?*?接受到消息以后進(jìn)行響應(yīng)異步任務(wù)的執(zhí)行 ?*?@param:?$ws?swoole_websocket_sever?object ?*?@param:?$frame?swoole_websocket_frame?obejct ?*/ function?handleClientData($ws,?$frame)?{ 	$data?=?$frame-&gt;data; 	$redis?=?new?Redis(); 	$redis-&gt;connect(REDIS_HOST,?REDIS_PORT); 	$redis-&gt;auth(REDIS_PWD); 	$isMembers?=?$redis-&gt;sIsmember(REDIS_S_FD.$sc,?$frame-&gt;fd); 	if?(!$isMembers)?{ 		$res?=?$redis-&gt;sAdd(REDIS_S_FD.$sc,?$frame-&gt;fd); 	} 	$redis-&gt;sAdd(REDIS_FD_S.$frame-&gt;fd,?$sc); 	$isMembers?=?$redis-&gt;sIsmember(REDIS_S_KEY,?$sc); 	if?(!$isMembers)?{? 		$redis-&gt;sAdd(REDIS_S_KEY,?$sc); 	} } ? ? /** ?*?redis訂閱后的回調(diào)函數(shù) ?*?@param:?$ins?instance實(shí)例 ?*?@param:?$pattern?匹配模式 ?*?@param:?$channel?頻道名 ?*?@param:?$data?數(shù)據(jù) ?*?注意點(diǎn):?subscribe和psubscribe兩種不同的訂閱方式的回調(diào)函數(shù)的參數(shù)個數(shù)不一樣,后者多了$pattern參數(shù) ?*/ function?sendTask($ins,?$pattern,?$channel,?$data)?{ 	//滿足一些條件后,投遞到task進(jìn)程中進(jìn)行推送 	$taskData?=?array( 		'cmd'?=&gt;?'pushToClient', 		'key'?=&gt;?$sc, 		'val'?=&gt;?$data, 	); 	//請注意,taskwait是同步阻塞的,所以改腳本并不是全異步非阻塞的 	$GLOBALS['ws']-&gt;taskwait($taskData); } ? /** ?*?推送消息到指定的客戶端 ?*?@param:?$ws?swoole_websocket_server?object ?*?@param:?$sc?股票代碼 ?*?@param:?$data?要推送的數(shù)據(jù) ?*/ function?pushToClientTask($ws,?$sc,?$data)?{ ????$redis?=?new?Redis(); ????$redis-&gt;connect(REDIS_HOST,?REDIS_PORT); ????$redis-&gt;auth(REDIS_PWD); 	$fdList?=?$redis-&gt;sMembers(REDIS_S_FD.$sArr[4]); 	if?(!empty($fdList))?{ 		foreach?((array)$fdList?as?$fd)?{ 			$res?=?$GLOBALS['ws']-&gt;push($fd,?$data); 			echo?"FD:?$fd?push?$res.n"; 			if?(!$res)?{?//推送失敗,即客戶端已經(jīng)斷開連接 				//從該fd訂閱的所有股票中刪除該fd 				$sArrOfFd?=?$redis-&gt;sMembers(REDIS_FD_S.$fd); 				if?(!empty($sArrOfFd))?{ 					foreach?((array)$sArrOfFd?as?$key?=&gt;?$sc)?{ 						$res?=?$redis-&gt;sRem(REDIS_S_FD.$sc,?$fd); 						$num?=?$redis-&gt;sCard(REDIS_S_FD.$sc); 						if?($num?==?'0')?{ 							$redis-&gt;sRem(REDIS_S_KEY,?$sc); 							$redis-&gt;hDel(REDIS_ZS_KEY,?$sc); 						} 					} 				} 				$redis-&gt;del(REDIS_FD_S.$fd); 			} 		} 	} ????$redis-&gt;close(); }

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊12 分享