RabbitMQ的應用場景以及基本原理介紹

RabbitMQ的應用場景以及基本原理介紹

rabbitmq是一個由erlang開發的AMQP(Advanced Message Queuing Protocol)的開源實現。

AMQP :高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:pythonruby、.NET、Java、JMS、C、phpactionscript、XMPP、STOMP等,支持ajax。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

一、應用場景

  1. 異步處理
  2. 應用解耦
  3. 流量削峰

二、RabbitMQ 特性

RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:

 - 可靠性(Reliability)  RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。   - 靈活的路由(Flexible Routing)  在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。   - 消息集群(Clustering)  多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。   - 高可用(Highly Available Queues)  隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。   - 多種協議(Multi-protocol)  RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。   - 多語言客戶端(Many Clients)  RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。   - 管理界面(Management UI)  RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。   - 跟蹤機制(Tracing)  如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。   - 插件機制(Plugin System)  RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。

三、RabbitMQ 基本概念

 - Message  消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。   - Publisher  消息的生產者,也是一個向交換器發布消息的客戶端應用程序。   - Exchange  交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。   - Routing Key  路由關鍵字,exchange根據這個關鍵字進行消息投遞。   - Binding  綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。   - Queue  消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。   - Connection  網絡連接,比如一個TCP連接。   - channel  信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。   - Consumer  消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。   - Virtual Host  虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。   - Broker  表示消息隊列服務器實體。它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,

四、Exchange 類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

  • direct

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。

  • fanout

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。

  • topic

topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。#匹配0個或多個單詞,匹配不多不少一個單詞。

五、ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。

  • Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。
  • ConnectionFactory為Connection的制造工廠。
  • Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。

六、任務分發機制

1、Round-robin dispathching循環分發

RabbbitMQ的分發機制非常適合擴展,而且它是專門為并發程序設計的,如果現在load加重,那么只需要創建更多的Consumer來進行任務處理

2、Message acknowledgment 消息 確認

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執并檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。 這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中積的消息會越來越多;消費者重啟后會重復消費這些消息并重復執行業務邏輯…
另外pub message是沒有ack的。

3、Message durability 消息持久化

如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由于這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。
要持久化隊列queue的持久化需要在聲明時指定durable=True;
這里要注意,隊列的名字一定要是Broker中不存在的,不然不能改變此隊列的任何屬性.
隊列和交換機有一個創建時候指定的標志durable,durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列中的消息會在重啟后恢復
消息持久化包括3部分

 - 1.exchange持久化,在聲明時指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//聲明消息隊列,且為可持久化的   - 2.queue持久化,在聲明時指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//聲明消息隊列,且為可持久化的   - 3.消息持久化,在投遞時指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='',                       routing_key="task_queue",                       body=message,                       properties=pika.BasicProperties(                          delivery_mode = 2, # make message persistent                       ))

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的,如果exchange和queue兩者之間有一個持久化,一個非持久化,則不允許建立綁定.
注意:一旦創建了隊列和交換機,就不能修改其標志了,例如,創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建。

關于持久化的進一步討論: 為了數據不丟失,我們采用了: 在數據處理結束后發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。 持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。 持久化Message,理由同上。  但是這樣能保證數據100%不丟失嗎?答案是否定的。問題就在與RabbitMQ需要時間去把這些信息存到磁盤上,這個time window雖然短,但是它的確還是有。在這個時間窗口內如果數據沒有保存,數據還會丟失。還有另一個原因就是RabbitMQ并不是為每個Message都做fsync:它可能僅僅是把它保存到Cache里,還沒來得及保存到物理磁盤上。因此這個持久化還是有問題。但是對于大多數應用來說,這已經足夠了。當然為了保持一致性,你可以把每次的publish放到一個transaction中。這個transaction的實現需要user defined codes。那么商業系統會做什么呢?一種可能的方案是在系統panic時或者異常重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。

4、Fair dispath 公平分發

你可能也注意到了,分發機制不是那么優雅,默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。n是取余后的,它不管Consumer是否還有unacked Message,只是按照這個默認的機制進行分發.
那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻毫無休息的機會,那么,Rabbit是如何處理這種問題呢?

前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作并一直空閑的情況。我們可以通過設置prefetchCount來限制Queue每次發送給每個消費者的消息數,比如我們設置prefetchCount=1,則Queue每次給每個消費者發送一條消息;消費者處理完這條消息后Queue會再給該消費者發送一條消息。

通過basic.qos方法設置prefetch_count=1,這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message,換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它

1
channel.basic_qos(prefetch_count=1)

注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。

七、消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可作為RabbitMQ的Message的數據格式進行傳輸,由于是結構化的數據,這樣就極大的方便了Consumer的數據高效處理,當然也可以使用xml,與XML相比, ProtoBuf有以下優勢:
1.簡單
2.size小了3-10倍
3.速度快了20-100倍
4.易于編程
6.減少了語義的歧義.
,ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛

八、rpc

MQ本身是基于異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。 但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當于RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。
RabbitMQ ?中實現RPC 的機制是:
客戶端發送請求(消息)時,在消息的屬性(MessageProperties ,在AMQP 協議中定義了14中properties ,這些屬性會隨著消息一起發送)中設置兩個值replyTo (一個Queue 名稱,用于告訴服務器處理完成后將通知我的消息發送到這個Queue 中)和correlationId (此次請求的標識號,服務器處理完成后需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
服務器端收到消息并處理
服務器端處理完消息后,將生成一條應答消息到replyTo 指定的Queue ,同時帶上correlationId 屬性
客戶端之前已訂閱replyTo 指定的Queue ,從中收到服務器的應答消息后,根據其中的correlationId 屬性分析哪條請求被執行了,根據執行結果進行后續業務處理

九、RabbitMQ 選型和對比

1.從社區活躍度

按照目前網絡上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。

2.持久化消息比較

ZeroMq 不支持,activemq 和RabbitMq 都支持。持久化消息主要是指我們機器在不可抗力因素等情況下掛掉了,消息不會丟失的機制。

3.綜合技術實現

可靠性、靈活的路由、集群、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統等等。
RabbitMq / kafka 最好,ActiveMq 次之,ZeroMq 最差。當然ZeroMq 也可以做到,不過自己必須手動寫代碼實現,代碼量不小。尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性。

4.高并發

毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高并發高可用的erlang 語言。

5.比較關注的比較, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,穩定性上,可靠性上, ?RabbitMq ?勝于 ?Kafka ?(理論上)。
另外,Kafka 的定位主要在日志等方面, 因為Kafka 設計的初衷就是處理日志的,可以看做是一個日志(消息)系統一個重要組件,針對性很強,所以 如果業務方面還是建議選擇 RabbitMq 。
還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來很多。
選型最后總結:
如果我們系統中已經有選擇 ?Kafka ?,或者 ? RabbitMq ?,并且完全可以滿足現在的業務,建議就不用重復去增加和造輪子。
可以在 ?Kafka ?和 ? RabbitMq ?中選擇一個適合自己團隊和業務的,這個才是最重要的。但是毋庸置疑現階段,綜合考慮沒有第三選擇。

更多laravel相關技術文章,請訪問Laravel教程欄目進行學習!

? 版權聲明
THE END
喜歡就支持一下吧
點贊7 分享