1 普通消息
消息隊(duì)列最基礎(chǔ)的功能就是生產(chǎn)者發(fā)送消息、Broker 保存消息,消費(fèi)者來消費(fèi)消息,以此實(shí)現(xiàn)系統(tǒng)解耦、削峰填谷的作用。
普通消息是消息隊(duì)列必備的消息類型,也是系統(tǒng)使用場景最多的一種消息。
2 順序消息
順序消息是指生產(chǎn)者發(fā)送消息的順序和消費(fèi)者消費(fèi)消息的順序是一致的。比如在一個電商場景,同一個用戶提交訂單、訂單支付、訂單出庫,這三個消息消費(fèi)者需要按照順序來進(jìn)行消費(fèi)。如下圖:
順序消息的實(shí)現(xiàn)并不容易,原因如下:
生產(chǎn)者集群中,有多個生產(chǎn)者發(fā)送消息,網(wǎng)絡(luò)延遲不一樣,很難保證發(fā)送到 Broker 的消息落盤順序是一致的;
如果 Broker 有多個分區(qū)或隊(duì)列,生產(chǎn)者發(fā)送的消息會進(jìn)入多個分區(qū),也無法保證順序消費(fèi);
如果有多個消費(fèi)者來異步消費(fèi)同一個分區(qū),很難保證消費(fèi)順序跟生產(chǎn)者發(fā)送順序一致。
要保證消息有序,需要滿足兩個條件:
同一個生產(chǎn)者必須同步發(fā)送消息到同一個分區(qū);
一個分區(qū)只能給同一個消費(fèi)者消費(fèi)。
如下圖:
上面第二個條件是比較容易實(shí)現(xiàn)的,一個分區(qū)綁定一個消費(fèi)者就可以,主要是第一個條件。
在主流消息隊(duì)列的實(shí)現(xiàn)中,Kafka 和 Pulsar 的實(shí)現(xiàn)方式類似,生產(chǎn)者給消息賦值一個 key,對 key 做 Hash 運(yùn)算來指定消息發(fā)送到哪一個分區(qū)。比如上面電商的例子,對同一個用戶的一筆訂單,提交訂單、訂單支付、訂單出庫這三個消息賦值同一個 key,就可以把這三條消息發(fā)送到同一個分區(qū)。
對于 RocketMQ,生產(chǎn)者在發(fā)送消息的時候,可以通過 MessageQueueSelector 指定把消息投遞到那個 MessageQueue,如下圖:
示例代碼如下:
publicstaticvoidmain(String[]args)throwsUnsupportedEncodingException{ try{ DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[]tags=newString[]{"TagA","TagB","TagC","TagD","TagE"}; for(inti=0;i100;?i++)?{ ???int?orderId?=?i?%?10; ???Message?msg?= ????new?Message("TopicTestjjj",?tags[i?%?tags.length],?"KEY"?+?i, ?????("Hello?RocketMQ?"?+?i).getBytes(RemotingHelper.DEFAULT_CHARSET)); ???SendResult?sendResult?=?producer.send(msg,?new?MessageQueueSelector()?{ ????@Override ????public?MessageQueue?select(Listmqs,Messagemsg,Objectarg){ Integerid=(Integer)arg; intindex=id%mqs.size(); returnmqs.get(index); } },orderId); System.out.printf("%s%n",sendResult); } producer.shutdown(); }catch(MQClientException|RemotingException|MQBrokerException|InterruptedExceptione){ e.printStackTrace(); } }
RabbitMQ 的實(shí)現(xiàn)是 Exchange 根據(jù)設(shè)置好的 Route Key 將數(shù)據(jù)路由到不同的 Queue 中。示例代碼如下:
@Resource privateAmqpTemplaterabbitTemplate; publicvoidsend1(Stringmessage){ rabbitTemplate.convertAndSend("testExchange","testRoutingKey",message); }
3 延時消息
或者也叫定時消息,是指消息發(fā)送后不會立即被消費(fèi),而是指定一個時間,到時間后再消費(fèi)。經(jīng)典的場景比如電商購物時,30 分鐘未支付訂單,讓訂單自動失效。
3.1 RocketMQ 實(shí)現(xiàn)
RocketMQ 定義了 18 個延時級別,每個延時級別對應(yīng)一個延時時間。下面如果延遲級別是 3,則消息會延遲 10s 才會拉取。
//MessageStoreConfig類 privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";
RocketMQ 的延時消息如下圖:
生產(chǎn)者把消費(fèi)發(fā)送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,然后調(diào)度任務(wù)會判斷是否到期,如果到期,會把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,這樣消費(fèi)者就可以消費(fèi)到了。
RocketMQ 的延時消息只支持最大兩個小時的延時,不過 RocketMQ5.0 基于時間輪算法實(shí)現(xiàn)了定時消息,解決了這個問題。
3.2 Pulsar 實(shí)現(xiàn)
Pulsar 的實(shí)現(xiàn)如下圖:
Pulsar 的延時消息首先會寫入一個 Delayed Message Tracker 的數(shù)據(jù)結(jié)構(gòu)中,Delayed Message Tracker 根據(jù)延時時間構(gòu)建 delayed index 優(yōu)先級隊(duì)列。消費(fèi)者拉取消息時,首先去 Delayed Message Tracker 檢查是否有到期的消息。如果有則直接拉取進(jìn)行消費(fèi)。
3.3 RabbitMQ 實(shí)現(xiàn)
RabbitMQ 的實(shí)現(xiàn)方式有兩種,一種是投遞到普通隊(duì)列都不消費(fèi),等消息過期后被投遞到死信隊(duì)列,消費(fèi)者消費(fèi)死信隊(duì)列。如下圖:
第二種方式是生產(chǎn)者發(fā)送消息時,先發(fā)送到本地 Mnesia 數(shù)據(jù)庫,消息到期后定時器再將消息投遞到 broker。
3.4 Kafka 實(shí)現(xiàn)
Kafka 本身并沒有延時隊(duì)列,不過可以通過生產(chǎn)者攔截器來實(shí)現(xiàn)消息延時發(fā)送,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實(shí)現(xiàn)延時消息。
4 事務(wù)消息
事務(wù)消息是指生產(chǎn)消息和消費(fèi)消息滿足事務(wù)的特性。
RabbitMQ 和 Kafka 的事務(wù)消息都是只支持生產(chǎn)消息的事務(wù)特性,即一批消息要不全部發(fā)送成功,要不全部發(fā)送失敗。
RabbitMQ 通過 Channel 來開啟事務(wù)消息,代碼如下:
ConnectionFactoryfactory=newConnectionFactory(); connection=factory.newConnection(); Channelchannel=connection.createChannel(); //開啟事務(wù) channel.txSelect(); channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8")); //提交事務(wù)或者channel.txRollback()回滾事務(wù) channel.txCommit();
Kafka 可以給多個生產(chǎn)者設(shè)置同一個事務(wù) ID ,從而把多個 Topic 、多個 Partition 放在一個事務(wù)中,實(shí)現(xiàn)原子性寫入。
Pulsar 的事務(wù)消息對于事務(wù)語義的定義是:允許事件流應(yīng)用將消費(fèi)、處理、生產(chǎn)消息整個過程定義為一個原子操作??梢?,Pulsar 的事務(wù)消息可以覆蓋消息流整個過程。
RocketMQ 的事務(wù)消息是通過 half 消息來實(shí)現(xiàn)的。以電商購物場景來看,賬戶服務(wù)扣減賬戶金額后,發(fā)送消息給 Broker,庫存服務(wù)來消費(fèi)這條消息進(jìn)行扣減庫存。如下圖:
可見,RocketMQ 只能保證生產(chǎn)者發(fā)送消息和本地事務(wù)的原子性,并不能保證消費(fèi)消息的原子性。
5 軌跡消息
軌跡消息主要用于跟蹤消息的生命周期,當(dāng)消息丟失時可以很方便地找出原因。
軌跡消息也跟普通消息一樣,也需要存儲和查詢,也會占用消息隊(duì)列的資源,所以選擇軌跡消息要考慮下面幾點(diǎn):
消息生命周期的關(guān)鍵節(jié)點(diǎn)一定要記錄;
不能影響正常消息的發(fā)送和消費(fèi)性能;
不能影響 Broker 的消息存儲性能;
要考慮消息查詢維度和性能。
RabbitMQ Broker 實(shí)現(xiàn)了軌跡消息的功能,打開 Trace 開關(guān),就可以把軌跡消息發(fā)送到 amq.rabbitmq.trace 這個 exchange,但是要考慮軌跡消息會不會給 Broker 造成 壓力進(jìn)而導(dǎo)致消息積壓。RabbitMQ 的生產(chǎn)者和消費(fèi)者都沒有實(shí)現(xiàn)軌跡消息,需要開發(fā)者自己來實(shí)現(xiàn)。
RocketMQ 生產(chǎn)者、Broker 和消費(fèi)者都實(shí)現(xiàn)了軌跡消息,不過默認(rèn)是關(guān)閉的,需要手工開啟。
使用軌跡消息,需要考慮記錄哪些節(jié)點(diǎn)、存儲介質(zhì)、性能、查詢方式等問題。
6 死信隊(duì)列
在消息隊(duì)列中,死信隊(duì)列主要應(yīng)對一些異常的情況,如下圖:
RocketMQ 實(shí)現(xiàn)了消費(fèi)端的死信隊(duì)列,當(dāng)消費(fèi)者消費(fèi)失敗時,會進(jìn)行重試,如果重試 16 次還是失敗,則這條消息會被發(fā)送到死信隊(duì)列。
RabbitMQ 實(shí)現(xiàn)了生產(chǎn)者和 Broker 的死信隊(duì)列,下面三種情況,消息會被發(fā)送到死信隊(duì)列:
生產(chǎn)者發(fā)送消息被拒絕,并且 requeue 參數(shù)設(shè)置為 false;
Broker 消息過期了;
隊(duì)列達(dá)到最大長度。
RabbitMQ 消息變成死信消息后,會被發(fā)送到死信交換機(jī)(Dead-Letter-Exchange)。
7 優(yōu)先級消息
有一些業(yè)務(wù)場景下,我們需要優(yōu)先處理一些消息,比如銀行里面的金卡客戶、銀卡客戶優(yōu)先級高于普通客戶,他們的業(yè)務(wù)需要優(yōu)先處理。如下圖:
主流消息隊(duì)列中,RabbitMQ 是支持優(yōu)先級隊(duì)列的,代碼如下:
ConnectionFactoryfactory=newConnectionFactory(); connection=factory.newConnection(); Channelchannel=connection.createChannel(); Mapargs=newHashMap (); //設(shè)置優(yōu)先級為5 args.put("x-max-priority",5); channel.queueDeclare("my-priority-queue",true,false,false,args);
8 總結(jié)
消息隊(duì)列技術(shù)選型,要考慮的因素很多,本文主要從業(yè)務(wù)場景來分析需要考慮的因素,同時技術(shù)上也需要考慮運(yùn)維復(fù)雜度、業(yè)務(wù)規(guī)模、社區(qū)活躍度、學(xué)習(xí)成本等因素。希望本文對你使用消息隊(duì)列有所幫助。
審核編輯:劉清
-
Hash算法
+關(guān)注
關(guān)注
0文章
43瀏覽量
7382 -
調(diào)度器
+關(guān)注
關(guān)注
0文章
98瀏覽量
5245
原文標(biāo)題:消息隊(duì)列技術(shù)選型的 7 種消息場景
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論