laravel框架中隊列的用法介紹(附代碼)

本篇文章給大家帶來的內容是關于laravel框架中隊列的用法介紹(附代碼) ,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。

在實際的項目開發中,我們經常會遇到需要輕量級隊列的情形,例如發短信、發郵件等,這些任務不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試、并發控制等功能。通常來說,我們經常會使用 redis、Beanstalk、Amazon SQS 來實現相關功能,laravel 為此對不同的后臺隊列服務提供統一的 API,本文將會介紹應用最為廣泛的 redis 隊列。

在講解 laravel 的隊列服務之前,我們要先說說基于 redis 的隊列服務。首先,redis設計用來做緩存的,但是由于它自身的某種特性使得它可以用來做消息隊列

redis 隊列的數據結構

List 鏈表

redis 做消息隊列的特性例如FIFO(先入先出)很容易實現,只需要一個 list 對象從頭取數據,從尾部塞數據即可。

相關的命令:(1)左側入右側出:lpush/rpop;(2)右側入左側出:rpush/lpop。

這個簡單的消息隊列很容易實現。

Zset 有序集合

有些任務場景,并不需要任務立刻執行,而是需要延遲執行;有些任務很重要,需要在任務失敗的時候重新嘗試。這些功能僅僅依靠 list 是無法完成的。這個時候,就需要 redis 的有序集合。

Redis 有序集合和 Redis 集合類似,是不包含相同字符串的合集。它們的差別是,每個有序集合的成員都關聯著一個評分 score,這個評分用于把有序集合中的成員按最低分到最高分排列。

單看有序集合和延遲任務并無關系,但是可以將有序集合的評分 score 設置為延時任務開啟的時間,之后輪詢這個有序集合,將到期的任務拿出來進行處理,這樣就實現了延遲任務的功能。

對于重要的需要重試的任務,在任務執行之前,會將該任務放入有序集合中,設置任務最長的執行時間。若任務順利執行完畢,該任務會在有序集合中刪除。如果任務沒有在規定時間內完成,那么該有序集合的任務將會被重新放入隊列中。

相關命令:

(1) ZADD 添加一個或多個成員到有序集合,或者如果它已經存在更新其分數。

(2) ZRANGEBYSCORE 按分數返回一個成員范圍的有序集合。

(3) ZREMRANGEBYRANK 在給定的索引之內刪除所有成員的有序集合。

laravel 隊列服務的任務調度

隊列服務的任務調度過程如下:

laravel框架中隊列的用法介紹(附代碼)

laravel 的隊列服務由兩個進程控制,一個是生產者,一個是消費者。這兩個進程操縱了 redis 三個隊列,其中一個 List,負責即時任務,兩個 Zset,負責延時任務與待處理任務。

生產者負責向 redis 推送任務,如果是即時任務,默認就會向 queue:default 推送;如果是延時任務,就會向 queue:default:delayed 推送。

消費者輪詢兩個隊列,不斷的從隊列中取出任務,先把任務放入 queue:default:reserved 中,再執行相關任務。如果任務執行成功,就會刪除 queue:default:reserved 中的任務,否則會被重新放入 queue:default:delayed 隊列中。

laravel 隊列服務的總體流程

任務分發流程:

laravel框架中隊列的用法介紹(附代碼)

任務處理器運作:

laravel框架中隊列的用法介紹(附代碼)

創建任務

queue 設置

'redis' => [         'driver' => 'redis',         'connection' => 'default',         'queue' => 'default',         'retry_after' => 90,     ],

在config/queue.php中進行配置
一般來說,默認的 redis 配置如上,connection 是 database 中 redis 的連接名稱;queue 是 redis 中的隊列名稱,值得注意的是,如果使用的是 redis 集群的話,這個需要使用 key hash tag,也就是 {default};當任務運行超過 retry_after 這個時間后,該任務會被重新放入隊列當中。

任務類的創建

任務類的結構很簡單,一般來說只會包含一個讓隊列用來調用此任務的 handle 方法。

如果想要使得任務被推送到隊列中,而不是同步執行,那么需要實現 IlluminateContractsQueueShouldQueue 接口。

如果想要讓任務推送到特定的連接中,例如 redis 或者 sqs,那么需要設置 conneciton 變量。

如果想要讓任務推送到特定的隊列中去,可以設置 queue 變量。

如果想要讓任務延遲推送,那么需要設置 delay 變量。

如果想要設置任務至多重試的次數,可以使用 tries 變量;

如果想要設置任務可以運行的最大秒數,那么可以使用 timeout 參數。

如果想要手動訪問隊列,可以使用 trait : IlluminateQueueInteractsWithQueue。

任務的分發
分發服務
寫好任務類后,就能通過 dispatch 輔助函數來分發它了。唯一需要傳遞給 dispatch 的參數是這個任務類的實例:

class PodcastController extends Controller {     public function store(Request $request)     {         // 創建播客...          ProcessPodcast::dispatch($podcast);     } }

如果想延遲執行一個隊列中的任務,可以用任務實例的 delay 方法。

 ProcessPodcast::dispatch($podcast)                 ->delay(Carbon::now()->addMinutes(10));

通過推送任務到不同的隊列,可以給隊列任務分類,甚至可以控制給不同的隊列分配多少任務。要指定隊列的話,就調用任務實例的 onQueue 方法:

ProcessPodcast::dispatch($podcast)->onQueue('processing');

如果使用了多個隊列連接,可以將任務推到指定連接。要指定連接的話,可以在分發任務的時候使用 onConnection 方法:

ProcessPodcast::dispatch($podcast)->onConnection('redis ');

? 版權聲明
THE END
喜歡就支持一下吧
點贊7 分享