摘要:消費(fèi)端弄丟了數(shù)據(jù)關(guān)閉自動(dòng)提交,在自己處理完畢之后手動(dòng)提交,這樣就不會(huì)丟失數(shù)據(jù)。弄丟了數(shù)據(jù)一般要求設(shè)置個(gè)參數(shù)來(lái)保證消息不丟失給設(shè)置參數(shù)這個(gè)值必須大于,表示要求每個(gè)必須至少有個(gè)副本。上一篇如何保證消息不重復(fù)消費(fèi)下一篇如何保證消息按順序執(zhí)行
1.mq原則數(shù)據(jù)不能多,也不能少,不能多是說(shuō)消息不能重復(fù)消費(fèi),這個(gè)我們上一節(jié)已解決;不能少,就是說(shuō)不能丟失數(shù)據(jù)。如果mq傳遞的是非常核心的消息,支撐核心的業(yè)務(wù),那么這種場(chǎng)景是一定不能丟失數(shù)據(jù)的。
2.丟失數(shù)據(jù)場(chǎng)景丟數(shù)據(jù)一般分為兩種,一種是mq把消息丟了,一種就是消費(fèi)時(shí)將消息丟了。下面從rabbitmq和kafka分別說(shuō)一下,丟失數(shù)據(jù)的場(chǎng)景,
(1)rabbitmq
A:生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到rabbitmq的時(shí)候,可能在傳輸過(guò)程中因?yàn)榫W(wǎng)絡(luò)等問(wèn)題而將數(shù)據(jù)弄丟了。
B:rabbitmq自己丟了數(shù)據(jù)
如果沒(méi)有開(kāi)啟rabbitmq的持久化,那么rabbitmq一旦重啟,那么數(shù)據(jù)就丟了。所依必須開(kāi)啟持久化將消息持久化到磁盤(pán),這樣就算rabbitmq掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟失。除非極其罕見(jiàn)的情況,rabbitmq還沒(méi)來(lái)得及持久化自己就掛了,這樣可能導(dǎo)致一部分?jǐn)?shù)據(jù)丟失。
C:消費(fèi)端弄丟了數(shù)據(jù)
主要是因?yàn)橄M(fèi)者消費(fèi)時(shí),剛消費(fèi)到,還沒(méi)有處理,結(jié)果消費(fèi)者就掛了,這樣你重啟之后,rabbitmq就認(rèn)為你已經(jīng)消費(fèi)過(guò)了,然后就丟了數(shù)據(jù)。
3.如何防止消息丟失
(1)rabbitmq
A:生產(chǎn)者丟失消息
①:可以選擇使用rabbitmq提供是事物功能,就是生產(chǎn)者在發(fā)送數(shù)據(jù)之前開(kāi)啟事物,然后發(fā)送消息,如果消息沒(méi)有成功被rabbitmq接收到,那么生產(chǎn)者會(huì)受到異常報(bào)錯(cuò),這時(shí)就可以回滾事物,然后嘗試重新發(fā)送;如果收到了消息,那么就可以提交事物。
channel.txSelect();//開(kāi)啟事物 try{ //發(fā)送消息 }catch(Exection e){ channel.txRollback();//回滾事物 //重新提交 }
缺點(diǎn): rabbitmq事物已開(kāi)啟,就會(huì)變?yōu)橥阶枞僮鳎a(chǎn)者會(huì)阻塞等待是否發(fā)送成功,太耗性能會(huì)造成吞吐量的下降。
②:可以開(kāi)啟confirm模式。在生產(chǎn)者哪里設(shè)置開(kāi)啟了confirm模式之后,每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的id,然后如何寫(xiě)入了rabbitmq之中,rabbitmq會(huì)給你回傳一個(gè)ack消息,告訴你這個(gè)消息發(fā)送OK了;如果rabbitmq沒(méi)能處理這個(gè)消息,會(huì)回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id,如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)。
//開(kāi)啟confirm channel.confirm(); //發(fā)送成功回調(diào) public void ack(String messageId){ } // 發(fā)送失敗回調(diào) public void nack(String messageId){ //重發(fā)該消息 }
二者不同
事務(wù)機(jī)制是同步的,你提交了一個(gè)事物之后會(huì)阻塞住,但是confirm機(jī)制是異步的,發(fā)送消息之后可以接著發(fā)送下一個(gè)消息,然后rabbitmq會(huì)回調(diào)告知成功與否。
一般在生產(chǎn)者這塊避免丟失,都是用confirm機(jī)制。
B:rabbitmq自己弄丟了數(shù)據(jù)
設(shè)置消息持久化到磁盤(pán)。設(shè)置持久化有兩個(gè)步驟:
①創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
②發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2,這樣消息就會(huì)被設(shè)為持久化方式,此時(shí)rabbitmq就會(huì)將消息持久化到磁盤(pán)上。
必須要同時(shí)開(kāi)啟這兩個(gè)才可以。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來(lái),只有消息持久化到了磁盤(pán)之后,才會(huì)通知生產(chǎn)者ack,這樣就算是在持久化之前rabbitmq掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)。
C:消費(fèi)者弄丟了數(shù)據(jù)
使用rabbitmq提供的ack機(jī)制,首先關(guān)閉rabbitmq的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后,在代碼里手動(dòng)調(diào)用ack。這樣就可以避免消息還沒(méi)有處理完就ack。
(2)kafka
A:消費(fèi)端弄丟了數(shù)據(jù)
關(guān)閉自動(dòng)提交offset,在自己處理完畢之后手動(dòng)提交offset,這樣就不會(huì)丟失數(shù)據(jù)。
B:kafka弄丟了數(shù)據(jù)
一般要求設(shè)置4個(gè)參數(shù)來(lái)保證消息不丟失:
①給topic設(shè)置 replication.factor參數(shù):這個(gè)值必須大于1,表示要求每個(gè)partition必須至少有2個(gè)副本。
②在kafka服務(wù)端設(shè)置min.isync.replicas參數(shù):這個(gè)值必須大于1,表示 要求一個(gè)leader至少感知到有至少一個(gè)follower在跟自己保持聯(lián)系正常同步數(shù)據(jù),這樣才能保證leader掛了之后還有一個(gè)follower。
③在生產(chǎn)者端設(shè)置acks=all:表示 要求每條每條數(shù)據(jù),必須是寫(xiě)入所有replica副本之后,才能認(rèn)為是寫(xiě)入成功了
④在生產(chǎn)者端設(shè)置retries=MAX(很大的一個(gè)值,表示無(wú)限重試):表示 這個(gè)是要求一旦寫(xiě)入事變,就無(wú)限重試
C:生產(chǎn)者弄丟了數(shù)據(jù) 如果按照上面設(shè)置了ack=all,則一定不會(huì)丟失數(shù)據(jù),要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認(rèn)為本次寫(xiě)成功了。如果沒(méi)滿(mǎn)足這個(gè)條件,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無(wú)限次。
上一篇《如何保證消息不重復(fù)消費(fèi)》
下一篇《如何保證消息按順序執(zhí)行》
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/7233.html
摘要:能不能支持?jǐn)?shù)據(jù)丟失啊可以的,參考我們之前說(shuō)的那個(gè)數(shù)據(jù)零丟失方案其實(shí)一個(gè)肯定是很復(fù)雜的,其實(shí)這是個(gè)開(kāi)放題,就是看看你有沒(méi)有從架構(gòu)角度整體構(gòu)思和設(shè)計(jì)的思維以及能力。其實(shí)回答這類(lèi)問(wèn)題,說(shuō)白了,起碼不求你看過(guò)那技術(shù)的源碼,起碼你大概知道那個(gè)技術(shù)的基本原理,核心組成部分,基本架構(gòu)構(gòu)成,然后參照一些開(kāi)源的技術(shù)把一個(gè)系統(tǒng)設(shè)計(jì)出來(lái)的思路說(shuō)一下就好 比如說(shuō)這個(gè)消息隊(duì)列系統(tǒng),我們來(lái)從以下幾個(gè)角度來(lái)考慮一下 (1...
摘要:數(shù)量對(duì)吞吐量的影響可以達(dá)到幾百幾千個(gè)的級(jí)別,吞吐量會(huì)有小幅度的下降。這是的一大優(yōu)勢(shì),可在同等數(shù)量機(jī)器下支撐大量的從幾十個(gè)到幾百個(gè)的時(shí)候,吞吐量會(huì)大幅下降。下一篇如何保證消息隊(duì)列的高可用 1.為什么使用消息隊(duì)列? (1)解耦:可以在多個(gè)系統(tǒng)之間進(jìn)行解耦,將原本通過(guò)網(wǎng)絡(luò)之間的調(diào)用的方式改為使用MQ進(jìn)行消息的異步通訊,只要該操作不是需要同步的,就可以改為使用MQ進(jìn)行不同系統(tǒng)之間的聯(lián)系,這樣項(xiàng)目之間...
摘要:一個(gè)對(duì)應(yīng)一個(gè),但是里面進(jìn)行了多線(xiàn)程消費(fèi),這樣也會(huì)造成消息消費(fèi)順序錯(cuò)誤。保證消息的消費(fèi)順序拆分多個(gè),每個(gè)一個(gè),就是多一些而已,確實(shí)是麻煩點(diǎn)這樣也會(huì)造成吞吐量下降,可以在消費(fèi)者內(nèi)部采用多線(xiàn)程的方式取消費(fèi)。 1.為什么要保證順序 消息隊(duì)列中的若干消息如果是對(duì)同一個(gè)數(shù)據(jù)進(jìn)行操作,這些操作具有前后的關(guān)系,必須要按前后的順序執(zhí)行,否則就會(huì)造成數(shù)據(jù)異常。舉例: 比如通過(guò)mysql binlog進(jìn)行兩個(gè)數(shù)據(jù)...
摘要:緊接著征用倍的機(jī)器來(lái)部署,每一批消費(fèi)一個(gè)臨時(shí)的消息。這種做法相當(dāng)于臨時(shí)將資源和資源擴(kuò)大倍,以正常速度的倍來(lái)消費(fèi)消息。解決方案這種情況下,實(shí)際上沒(méi)有什么消息擠壓,而是丟了大量的消息。 1.大量消息在mq里積壓了幾個(gè)小時(shí)了還沒(méi)解決 場(chǎng)景: 幾千萬(wàn)條數(shù)據(jù)在MQ里積壓了七八個(gè)小時(shí),從下午4點(diǎn)多,積壓到了晚上很晚,10點(diǎn)多,11點(diǎn)多。線(xiàn)上故障了,這個(gè)時(shí)候要不然就是修復(fù)consumer的問(wèn)題,讓他恢復(fù)消...
摘要:的過(guò)期策略是什么樣的采用了定期刪除惰性刪除的過(guò)期策略。定期刪除原理定期刪除指的是默認(rèn)每隔就隨機(jī)抽取一些設(shè)置了過(guò)期時(shí)間的,檢測(cè)這些是否過(guò)期,如果過(guò)期了就將其刪掉。所有只會(huì)抽取一部分而不會(huì)全部檢查。 1.數(shù)據(jù)為什么會(huì)過(guò)期? 首先,要明白redis是用來(lái)做數(shù)據(jù)緩存的,不是用來(lái)做數(shù)據(jù)存儲(chǔ)的(當(dāng)然也可以當(dāng)數(shù)據(jù)庫(kù)用),所以數(shù)據(jù)時(shí)候過(guò)期的,過(guò)期的數(shù)據(jù)就不見(jiàn)了,過(guò)期主要有兩種情況, ①在設(shè)置緩存數(shù)據(jù)時(shí)制定了...
閱讀 919·2021-11-24 10:44
閱讀 2866·2021-11-11 16:54
閱讀 3371·2021-10-08 10:21
閱讀 2242·2021-08-25 09:39
閱讀 3011·2019-08-30 15:56
閱讀 3524·2019-08-30 13:46
閱讀 3555·2019-08-23 18:09
閱讀 2206·2019-08-23 17:05