解析think-queue(圍繞redis做分析)

前言

分析之前請大家務必了解消息隊列的實現

tp5的消息隊列是基于database redis 和tp官方自己實現的 Topthink
本章是圍繞redis來做分析

存儲key:

key 類型 描述
queues:queueName list 要執行的任務
think:queue:restart string 重啟隊列時間戳
queues:queueName:delayed zSet 延遲任務
queues:queueName:reserved zSet 執行失敗,等待重新執行

執行命令

work和listen的區別在下面會解釋

命令 描述
php think queue:work 監聽隊列
php think queue:listen 監聽隊列
php think queue:restart 重啟隊列
php think queue:subscribe 暫無,可能是保留的 官方有什么其他想法但是還沒實現

行為標簽

標簽 描述
worker_daemon_start 守護進程開啟
worker_memory_exceeded 內存超出
worker_queue_restart 重啟守護進程
worker_before_process 任務開始執行之前
worker_before_sleep 任務延遲執行
queue_failed 任務執行失敗

命令參數

參數 默認值 可以使用的模式 描述
queue null work,listen 要執行的任務名稱
daemon null work 以守護進程執行任務
delay 0 work,listen 失敗后重新執行的時間
force null work 失敗后重新執行的時間
memory 128M work,listen 限制最大內存
sleep 3 work,listen 沒有任務的時候等待的時間
tries 0 work,listen 任務失敗后最大嘗試次數

模式區別

1: 執行原理不同
work: 單進程的處理模式;
無 daemon 參數 work進程在處理完下一個消息后直接結束當前進程。當不存在新消息時,會sleep一段時間然后退出;
有 daemon 參數 work進程會循環地處理隊列中的消息,直到內存超出參數配置才結束進程。當不存在新消息時,會在每次循環中sleep一段時間;

listen: 父進程 + 子進程 的處理模式;
會在所在的父進程會創建一個單次執行模式的work子進程,并通過該work子進程來處理隊列中的下一個消息,當這個work子進程退出之后;
所在的父進程會監聽到該子進程的退出信號,并重新創建一個新的單次執行的work子進程;

2: 退出時機不同
work: 看上面
listen: 所在的父進程正常情況會一直運行,除非遇到下面兩種情況
01: 創建的某個work子進程的執行時間超過了 listen命令行中的–timeout 參數配置;此時work子進程會被強制結束,listen所在的父進程也會拋出一個 ProcessTimeoutException 異常并退出;

開發者可以選擇捕獲該異常,讓父進程繼續執行;
02: 所在的父進程因某種原因存在內存泄露,則當父進程本身占用的內存超過了命令行中的 –memory 參數配置時,父子進程均會退出。正常情況下,listen進程本身占用的內存是穩定不變的。

3: 性能不同
work: 是在腳本內部做循環,框架腳本在命令執行的初期就已加載完畢;

listen: 是處理完一個任務之后新開一個work進程,此時會重新加載框架腳本;

因此 work 模式的性能會比listen模式高。
注意: 當代碼有更新時,work 模式下需要手動去執行 php think queue:restart 命令重啟隊列來使改動生效;而listen 模式會自動生效,無需其他操作。

4: 超時控制能力
work: 本質上既不能控制進程自身的運行時間,也無法限制執行中的任務的執行時間;
listen: 可以限制其創建的work子進程的超時時間;

可通過 timeout 參數限制work子進程允許運行的最長時間,超過該時間限制仍未結束的子進程會被強制結束;
expire 和time的區別

expire 在配置文件中設置,指任務的過期時間 這個時間是全局的,影響到所有的work進程
timeout 在命令行參數中設置,指work子進程的超時時間,這個時間只對當前執行的listen 命令有效,timeout 針對的對象是 work 子進程;

5: 使用場景不同

work 適用場景是:
01: 任務數量較多
02: 性能要求較高
03: 任務的執行時間較短
04: 消費者類中不存在死循環,sleep() ,exit() ,die() 等容易導致bug的邏輯

listen 適用場景是:

01: 任務數量較少
02: 任務的執行時間較長
03: 任務的執行時間需要有嚴格限制

公有操作

由于我們是根據redis來做分析 所以只需要分析src/queue/connector/redis.php
01: 首先調用 src/Queue.php中的魔術方法 __callStatic
02: 在__callStatic方法中調用了 buildConnector
03: buildConnector 中首先加載配置文件 如果無將是同步執行
04: 根據配置文件去創建連接并且傳入配置

在redis.php類的構造方法中的操作:
01: 檢測redis擴展是否安裝
02: 合并配置
03: 檢測是redis擴展還是 pRedis
04: 創建連接對象

發布過程

發布參數

參數名 默認值 描述 可以使用的方法
$job 要執行任務的類 push,later
$data 任務數據 push,later
$queue default 任務名稱 push,later
$delay null 延遲時間 later

立即執行

    push($job, $data, $queue)     Queue::push(Test::class, ['id' => 1], 'test');

一頓騷操作后返回一個數組 并且序列化后 rPush到redis中 key為 queue:queueName
數組結構:

[     'job' => $job, // 要執行任務的類     'data' => $data, // 任務數據     'id'=>'xxxxx' //任務id ]

寫入 redis并且返回隊列id
至于中間的那頓騷操作太長了就沒寫

延遲發布

    later($delay, $job, $data, $queue)     Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一頓騷操作后返回一個數組 并且序列化后 zAdd 到redis中 key為 queue:queueName:delayed score為當前的時間戳+$delay

執行過程

執行過程有work模式和listen模式 兩種 區別上面已經說了 代碼邏輯由于太多等下回分解;
最后講一下標簽的使用

    //守護進程開啟     'worker_daemon_start' => [         ppindexehaviorWorkerDaemonStart::class     ],     //內存超出     'worker_memory_exceeded' => [         ppindexehaviorWorkerMemoryExceeded::class     ],     //重啟守護進程     'worker_queue_restart' => [         ppindexehaviorWorkerQueueRestart::class     ],     //任務開始執行之前     'worker_before_process' => [         ppindexehaviorWorkerBeforeProcess::class     ],     //任務延遲執行     'worker_before_sleep' => [         ppindexehaviorWorkerBeforeSleep::class     ],     //任務執行失敗     'queue_failed' => [         ppindexehaviorQueueFailed::class     ]

解析think-queue(圍繞redis做分析)

public function run(Output $output)     {         $output->write('<info>任務執行失敗</info>', true);     }

控制臺執行 php think queue:work –queue test –daemon
會在控制臺一次輸出

守護進程開啟 任務延遲執行

失敗的處理 如果有任務執行失敗或者執行次數達到最大值
會觸發 queue_failed

在appindexehavior@run方法里面寫失敗的邏輯 比如郵件通知 寫入日志等

最后我們來說一下如何在其他框架或者項目中給tp的項目推送消息隊列,例如兩個項目是分開的 另一個使用的卻不是tp5的框架

在其他項目中推任務

php版本

<?php  class Index {     private $redis = null;      public function __construct()     {         $this->redis = new Redis();         $this->redis->connect('127.0.0.1', 6379);         $this->redis->select(10);     }      public function push($job, $data, $queue)     {         $payload = $this->createPayload($job, $data);         $this->redis->rPush('queues:' . $queue, $payload);     }      public function later($delay, $job, $data, $queue)     {         $payload = $this->createPayload($job, $data);         $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);     }      private function createPayload($job, $data)     {         $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));         return $this->setMeta($payload, 'attempts', 1);     }      private function setMeta($payload, $key, $value)     {         $payload = json_decode($payload, true);         $payload[$key] = $value;         $payload = json_encode($payload);          if (JSON_ERROR_NONE !== json_last_error()) {             throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());         }          return $payload;     }      private function random(int $length = 16): string     {         $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';         $randomString = '';         for ($i = 0; $i < $length; $i++) {             $randomString .= $str[rand(0, strlen($str) - 1)];         }         return $randomString;     } }  (new Index())->later(10, 'appindexjobsTest', ['id' => 1], 'test');

go版本

package main  import (     "encoding/json"     "github.com/garyburd/redigo/redis"     "math/rand"     "time" )  type Payload struct {     Id       string      `json:"id"`     Job      string      `json:"job"`     Data     interface{} `json:"data"`     Attempts int         `json:"attempts"` }  var RedisClient *redis.Pool  func init() {     RedisClient = &redis.Pool{         MaxIdle:     20,         MaxActive:   500,         IdleTimeout: time.Second * 100,         Dial: func() (conn redis.Conn, e error) {             c, err := redis.Dial("tcp", "127.0.0.1:6379")              if err != nil {                 return nil, err             }              _, _ = c.Do("SELECT", 10)              return c, nil         },     }  }  func main() {      var data = make(map[string]interface{})     data["id"] = "1"      later(10, "appindexjobsTest", data, "test") }  func push(job string, data interface{}, queue string) {     payload := createPayload(job, data)     queueName := "queues:" + queue      _, _ = RedisClient.Get().Do("rPush", queueName, payload) }  func later(delay int, job string, data interface{}, queue string) {      m, _ := time.ParseDuration("+1s")     currentTime := time.Now()     op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()     createPayload(job, data)     payload := createPayload(job, data)     queueName := "queues:" + queue + ":delayed"      _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) }  // 創建指定格式的數據 func createPayload(job string, data interface{}) (payload string) {     payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}      jsonStr, _ := json.Marshal(payload1)      return string(jsonStr) }  // 創建隨機字符串 func random(n int) string {      var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")      b := make([]rune, n)     for i := range b {         b[i] = str[rand.Intn(len(str))]     }     return string(b) }

更多thinkphp技術知識,請訪問thinkphp教程欄目!

以上就是解析think-queue(圍繞

? 版權聲明
THE END
喜歡就支持一下吧
點贊13 分享