本篇文章給大家帶來的內容是關于laravel框架中隊列的用法介紹(附代碼) ,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。
在實際的項目開發中,我們經常會遇到需要輕量級隊列的情形,例如發短信、發郵件等,這些任務不足以使用 kafka、RabbitMQ 等重量級的消息隊列,但是又的確需要異步、重試、并發控制等功能。通常來說,我們經常會使用 redis、Beanstalk、Amazon SQS 來實現相關功能,laravel 為此對不同的后臺隊列服務提供統一的 API,本文將會介紹應用最為廣泛的 redis 隊列。
在講解 laravel 的隊列服務之前,我們要先說說基于 redis 的隊列服務。首先,redis設計用來做緩存的,但是由于它自身的某種特性使得它可以用來做消息隊列
redis 隊列的數據結構
List 鏈表
redis 做消息隊列的特性例如FIFO(先入先出)很容易實現,只需要一個 list 對象從頭取數據,從尾部塞數據即可。
相關的命令:(1)左側入右側出:lpush/rpop;(2)右側入左側出:rpush/lpop。
這個簡單的消息隊列很容易實現。
Zset 有序集合
有些任務場景,并不需要任務立刻執行,而是需要延遲執行;有些任務很重要,需要在任務失敗的時候重新嘗試。這些功能僅僅依靠 list 是無法完成的。這個時候,就需要 redis 的有序集合。
Redis 有序集合和 Redis 集合類似,是不包含相同字符串的合集。它們的差別是,每個有序集合的成員都關聯著一個評分 score,這個評分用于把有序集合中的成員按最低分到最高分排列。
單看有序集合和延遲任務并無關系,但是可以將有序集合的評分 score 設置為延時任務開啟的時間,之后輪詢這個有序集合,將到期的任務拿出來進行處理,這樣就實現了延遲任務的功能。
對于重要的需要重試的任務,在任務執行之前,會將該任務放入有序集合中,設置任務最長的執行時間。若任務順利執行完畢,該任務會在有序集合中刪除。如果任務沒有在規定時間內完成,那么該有序集合的任務將會被重新放入隊列中。
相關命令:
(1) ZADD 添加一個或多個成員到有序集合,或者如果它已經存在更新其分數。
(2) ZRANGEBYSCORE 按分數返回一個成員范圍的有序集合。
(3) ZREMRANGEBYRANK 在給定的索引之內刪除所有成員的有序集合。
laravel 隊列服務的任務調度
隊列服務的任務調度過程如下:
laravel 的隊列服務由兩個進程控制,一個是生產者,一個是消費者。這兩個進程操縱了 redis 三個隊列,其中一個 List,負責即時任務,兩個 Zset,負責延時任務與待處理任務。
生產者負責向 redis 推送任務,如果是即時任務,默認就會向 queue:default 推送;如果是延時任務,就會向 queue:default:delayed 推送。
消費者輪詢兩個隊列,不斷的從隊列中取出任務,先把任務放入 queue:default:reserved 中,再執行相關任務。如果任務執行成功,就會刪除 queue:default:reserved 中的任務,否則會被重新放入 queue:default:delayed 隊列中。
laravel 隊列服務的總體流程
任務分發流程:
任務處理器運作:
創建任務
queue 設置
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, ],
在config/queue.php中進行配置
一般來說,默認的 redis 配置如上,connection 是 database 中 redis 的連接名稱;queue 是 redis 中的隊列名稱,值得注意的是,如果使用的是 redis 集群的話,這個需要使用 key hash tag,也就是 {default};當任務運行超過 retry_after 這個時間后,該任務會被重新放入隊列當中。
任務類的創建
任務類的結構很簡單,一般來說只會包含一個讓隊列用來調用此任務的 handle 方法。
如果想要使得任務被推送到隊列中,而不是同步執行,那么需要實現 IlluminateContractsQueueShouldQueue 接口。
如果想要讓任務推送到特定的連接中,例如 redis 或者 sqs,那么需要設置 conneciton 變量。
如果想要讓任務推送到特定的隊列中去,可以設置 queue 變量。
如果想要讓任務延遲推送,那么需要設置 delay 變量。
如果想要設置任務至多重試的次數,可以使用 tries 變量;
如果想要設置任務可以運行的最大秒數,那么可以使用 timeout 參數。
如果想要手動訪問隊列,可以使用 trait : IlluminateQueueInteractsWithQueue。
任務的分發
分發服務
寫好任務類后,就能通過 dispatch 輔助函數來分發它了。唯一需要傳遞給 dispatch 的參數是這個任務類的實例:
class PodcastController extends Controller { public function store(Request $request) { // 創建播客... ProcessPodcast::dispatch($podcast); } }
如果想延遲執行一個隊列中的任務,可以用任務實例的 delay 方法。
ProcessPodcast::dispatch($podcast) ->delay(Carbon::now()->addMinutes(10));
通過推送任務到不同的隊列,可以給隊列任務分類,甚至可以控制給不同的隊列分配多少任務。要指定隊列的話,就調用任務實例的 onQueue 方法:
ProcessPodcast::dispatch($podcast)->onQueue('processing');
如果使用了多個隊列連接,可以將任務推到指定連接。要指定連接的話,可以在分發任務的時候使用 onConnection 方法:
ProcessPodcast::dispatch($podcast)->onConnection('redis ');