一起聊聊thinkphp6使用think-queue實現普通隊列和延遲隊列

本篇文章給大家帶來了關于thinkphp的相關知識,其中主要介紹了關于使用think-queue來實現普通隊列和延遲隊列的相關內容,think-queue是thinkphp官方提供的一個消息隊列服務,下面一起來看一下,希望對大家有幫助。

一起聊聊thinkphp6使用think-queue實現普通隊列和延遲隊列

推薦學習:《thinkphp

###TP6 隊列

TP6 中使用 think-queue 可以實現普通隊列和延遲隊列。

立即學習PHP免費學習筆記(深入)”;

think-queue 是thinkphp 官方提供的一個消息隊列服務,它支持消息隊列的一些基本特性:

  • 消息的發布,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等
  • 隊列的多隊列, 內存限制 ,啟動,停止,守護等
  • 消息隊列可降級為同步執行

消息隊列實現過程

1、通過生產者推送消息到消息隊列服務中

2、消息隊列服務將收到的消息存入redis隊列中(zset)

3、消費者進行監聽隊列,當監聽到隊列有新的消息時,獲取隊列第一條

4、處理獲取下來的消息調用業務類進行處理相關業務

5、業務處理后,需要從隊列中刪除消息

composer 安裝 think-queue

composer require topthink/think-queue

配置文件

安裝完 think-queue 后會在 config 目錄中生成 queue.php,這個文件是隊列的配置文件。

tp6中提供了多種消息隊列的實現方式,默認使用sync,我這里選擇使用Redis。

return [     'default'     => 'redis',     'connections' => [         'sync'     => [             'type' => 'sync',         ],         'database' => [             'type'       => 'database',             'queue'      => 'default',             'table'      => 'jobs',             'connection' => null,         ],         'redis'    => [             'type'       => 'redis',             'queue'      => 'default',             'host'       => env('redis.host', '127.0.0.1'),             'port'       => env('redis.port', '6379'),             'password'   => env('redis.password','123456'),             'select'     => 0,             'timeout'    => 0,             'persistent' => false,         ],     ],     'failed'      => [         'type'  => 'none',         'table' => 'failed_jobs',     ], ];

創建目錄及隊列消費類文件

在 app 目錄下創建 queue 目錄,然后在該目錄下新建一個抽象類 Queue.php 文件,作為基礎類

<?phpnamespace appqueue;use thinkfacadeCache;use thinkqueueJob;use thinkfacadeLog;/**  * Class Queue 隊列消費基礎類  * @package appqueue  */abstract class Queue{     /**      * @describe:fire是消息隊列默認調用的方法      * @param thinkqueueJob $job      * @param $message      */     public function fire(Job $job, $data)     {         if (empty($data)) {             Log::error(sprintf('[%s][%s] 隊列無消息', __CLASS__, __FUNCTION__));             return ;         }          $jobId = $job->getJobId(); // 隊列的數據庫id或者redis key         // $jobClassName = $job->getName(); // 隊列對象類         // $queueName = $job->getQueue(); // 隊列名稱          // 如果已經執行中或者執行完成就不再執行了         if (!$this->checkJob($jobId, $data)) {             $job->delete();             Cache::store('redis')->delete($jobId);             return ;         }          // 執行業務處理         if ($this->execute($data)) {             Log::record(sprintf('[%s][%s] 隊列執行成功', __CLASS__, __FUNCTION__));             $job->delete(); // 任務執行成功后刪除             Cache::store('redis')->delete($jobId); // 刪除redis中的緩存         } else {             // 檢查任務重試次數             if ($job->attempts() > 3) {                 Log::error(sprintf('[%s][%s] 隊列執行重試次數超過3次,執行失敗', __CLASS__, __FUNCTION__));                  // 第1種處理方式:重新發布任務,該任務延遲10秒后再執行;也可以不指定秒數立即執行                 //$job->release(10);                  // 第2種處理方式:原任務的基礎上1分鐘執行一次并增加嘗試次數                 //$job->failed();                    // 第3種處理方式:刪除任務                 $job->delete(); // 任務執行后刪除                 Cache::store('redis')->delete($jobId); // 刪除redis中的緩存             }         }     }      /**      * 消息在到達消費者時可能已經不需要執行了      * @param  string  $jobId      * @param $message      * @return bool 任務執行的結果      * @throws PsrSimpleCacheInvalidArgumentException      */     protected function checkJob(string $jobId, $message): bool     {         // 查詢redis         $data = Cache::store('redis')->get($jobId);         if (!empty($data)) {             return false;         }         Cache::store('redis')->set($jobId, $message);         return true;     }      /**      * @describe: 根據消息中的數據進行實際的業務處理      * @param $data 數據      * @return bool 返回結果      */     abstract protected function execute($data): bool;}

所有真正的消費類繼承基礎抽象類

<?phpnamespace appqueuetest;use appqueueQueue;class Test extends Queue{     protected function execute($data): bool     {        // 具體消費業務邏輯     }}

生產者邏輯

use thinkfacadeQueue;  // 普通隊列生成調用方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName);  // 延時隊列生成調用方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延時隊列 10 秒后執行: Queue::later(10 , Test::class, $data, $queueName);

開啟進程監聽任務并執行

php think queue:listen php think queue:work

命令模式介紹

命令模式

  • queue:work 命令

    work 命令: 該命令將啟動一個 work 進程來處理消息隊列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 該命令將會創建一個 listen 父進程 ,然后由父進程通過 proc_open(‘php think queue:work’) 的方式來創建一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。

    php think queue:listen --queue TestQueue

命令行參數

  • Work 模式

    php think queue:work  --daemon            //是否循環執行,如果不加該參數,則該命令處理完下一個消息就退出 --queue  helloJobQueue  //要處理的隊列的名稱 --delay  0         //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0 --force            //系統處于維護狀態時是否仍然處理任務,并未找到相關說明 --memory 128       //該進程允許使用的內存上限,以 M 為單位 --sleep  3         //如果隊列中無任務,則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries  2          //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,默認為0
  • Listen 模式

    php think queue:listen  --queue  helloJobQueue    //監聽的隊列的名稱 --delay  0          //如果本次任務執行拋出異常且任務未被刪除時,設置其下次執行前延遲多少秒,默認為0 --memory 128        //該進程允許使用的內存上限,以 M 為單位 --sleep  3          //如果隊列中無任務,則多長時間后重新檢查,daemon模式下有效 --tries  0          //如果任務已經超過重發次數上限,則進入失敗處理邏輯,默認為0 --timeout 60         //創建的work子進程的允許執行的最長時間,以秒為單位

    可以看到 listen 模式下,不包含 –deamon 參數,原因下面會說明

  • 消息隊列的開始,停止與重啟

    • 開始一個消息隊列:

      php think queue:work
    • 停止所有的消息隊列:

      php think queue:restart
    • 重啟所有的消息隊列:

      php think queue:restart  php think queue:work

推薦學習:《thinkphp

以上就是一起聊聊

? 版權聲明
THE END
喜歡就支持一下吧
點贊13 分享