摘要:基礎(chǔ)教程注本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。消息的應(yīng)答現(xiàn)在存在這樣一種場(chǎng)景,消費(fèi)者取到消息,然后創(chuàng)建任務(wù)開(kāi)始執(zhí)行。如果處理失敗,也就是沒(méi)有收到應(yīng)答,那么就將這條消息重新發(fā)送給該隊(duì)列的其他消費(fèi)者。造成了負(fù)載不均衡。
RabbitMQ 基礎(chǔ)教程(2) - Work Queue
注:本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。
如果你喜歡我的文章,可以關(guān)注我的私人博客:http://blog-qeesung.rhcloud.com/
在上一篇文章 RabbitMQ 基礎(chǔ)教程(1) - Hello World 中,我們已經(jīng)簡(jiǎn)單的介紹了RabbitMQ以及如何發(fā)送和接收一個(gè)消息。接下來(lái)我們將繼續(xù)深入RabbitMQ,研究一下消息隊(duì)列(Work Queue)
消息隊(duì)列消息的發(fā)布者發(fā)布一個(gè)消息到消息隊(duì)列中,然后信息的消費(fèi)者取出消息進(jìn)行消費(fèi)。
queue +-------------+ +--+--+--+--+--+--+ +-------------+ | producer |----->|m1|m2| ... | | |---->| consumer | +-------------+ +--+--+--+--+--+--+ +-------------+
但是實(shí)際情況往往比這個(gè)要復(fù)雜,假如我們有多個(gè)信息的發(fā)布者和多個(gè)信息的消費(fèi)者,那RabbitMQ又將會(huì)是怎么工作呢?
+--------------+ +--------------+ | producer1 +- / | consumer1 | +--------------+ - queue /- +--------------+ +--------------+ - +---+---+---+----+ /- +--------------+ | producer2 +---->X|m1 |m2 |m3 |... |---->| consumer2 | +--------------+ /- +---+---+---+----+ - +--------------+ +--------------+ /- - +--------------+ | ... |/ | ... | +--------------+ +--------------+Round-robin 分發(fā)算法
RabbitMQ中,如果有多個(gè)消費(fèi)者同時(shí)消費(fèi)同一個(gè)消息隊(duì)列,那么就通過(guò)Round-robin算法將消息隊(duì)列中的消息均勻的分配給每一個(gè)消費(fèi)者。
這個(gè)算法其實(shí)很簡(jiǎn)單,每收到一個(gè)新的消息,就將這個(gè)消息分發(fā)給上下一個(gè)消費(fèi)者。比如上一個(gè)消費(fèi)者是consumer-n,那么有新消息來(lái)的時(shí)候就將這個(gè)新的消息發(fā)布到consumer-n+1,以此類推,如果到了最后一個(gè)消費(fèi)者,那么就又從第一個(gè)開(kāi)始。即:consumer-index = (consumer-index + 1) mod consumer-number
為了演示,首先來(lái)做幾項(xiàng)準(zhǔn)備工作。
定義任務(wù) task.js
/** * 創(chuàng)建一個(gè)任務(wù) * @param taskName 任務(wù)名字 * @param costTime 任務(wù)話費(fèi)的時(shí)間 * @param callback 任務(wù)結(jié)束以后的回調(diào)函數(shù) * @constructor */ function Task(taskName ,costTime , callback){ if(typeof(costTime) !== "number") costTime = 0; // no delay there setTimeout(function () { console.log(taskName+" finished"); if(callback && typeof (callback) === "function") callback(); } , 1000*costTime); };
串行化的消息任務(wù)結(jié)構(gòu)
任務(wù)發(fā)布者負(fù)責(zé)將該結(jié)構(gòu)發(fā)布到隊(duì)列中,然后消費(fèi)者取出消息,新建任務(wù)開(kāi)始執(zhí)行。
{ taskName : "taskname", costTime : 1 }
創(chuàng)建任務(wù)消息 task-producer.js
var amqp = require("amqplib/callback_api"); // 連接上RabbitMQ服務(wù)器 amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; // 得到發(fā)送消息的數(shù)目,默認(rèn)發(fā)送4個(gè) var name; var cost; (function () { if(process.argv.length < 4 ) { console.error("ERROR : usage - node rabbit-producer"); process.exit(-1); } name = process.argv[2]; cost = +process.argv[3]; })(); // 新建隊(duì)列,然后將隊(duì)列中的消息持久化取消 ch.assertQueue(q, {durable: true}); // 將任務(wù)串行化存入Buffer中,并推入隊(duì)列 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true}); console.log(" [x] Sent "+name); setTimeout(function () { process.exit(0); },500); }); });
消費(fèi)任務(wù)消息 task-consumer.js
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽(tīng)隊(duì)列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務(wù) new Task(obj.taskName,obj.costTime); }, {noAck: true}); }); });
現(xiàn)在開(kāi)啟兩個(gè)消費(fèi)者進(jìn)程來(lái)等待消費(fèi)tasks隊(duì)列中的消息
# shell1 node task-consumer.js # shell2 node task-consumer.js
然后向隊(duì)列中推入三個(gè)消息
# shell3 node task-producer.js task1 0 node task-producer.js task2 0 node task-producer.js task3 0
運(yùn)行結(jié)果
# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 已經(jīng)通過(guò)Round-robin算法將消息隊(duì)列中的消息分配到連接的消費(fèi)者中了.消息,隊(duì)列持久化
細(xì)心的讀者可能已經(jīng)發(fā)現(xiàn)了我們?cè)?strong>聲明隊(duì)列和發(fā)送消息的代碼塊中改動(dòng)了一小部分的代碼,那就是
// 聲明隊(duì)列 ch.assertQueue(q, {durable: true}); // 發(fā)送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
通過(guò)將隊(duì)列的durable配置參數(shù)生命為true可以保證在RabbitMQ服務(wù)器退出或者異常終止的情況下不會(huì)丟失消息隊(duì)列,注意這里只是不會(huì)丟失消息隊(duì)列,并不是消息隊(duì)列中沒(méi)有被消費(fèi)的消息不會(huì)丟失。
為了保證消息隊(duì)列中的消息不會(huì)丟失,就需要在發(fā)送消息時(shí)指定persistent選項(xiàng),這里并不能百分之百的保證消息不會(huì)丟失,因?yàn)閺年?duì)列中有新的消息,到將隊(duì)列中消息持久化到磁盤這一段時(shí)間之內(nèi)是無(wú)法保證的。
消息的應(yīng)答現(xiàn)在存在這樣一種場(chǎng)景,消費(fèi)者取到消息,然后創(chuàng)建任務(wù)開(kāi)始執(zhí)行。但是任務(wù)執(zhí)行到一半就拋出異常,那么這個(gè)任務(wù)算是沒(méi)有被成功執(zhí)行的。
在我們之前的代碼實(shí)現(xiàn)中,都是消息隊(duì)列中有新的消息,馬上就這個(gè)消息分配給消費(fèi)者消費(fèi),不管消費(fèi)者對(duì)消息處理結(jié)果如何,消息隊(duì)列會(huì)馬上將已經(jīng)分配的消息從消息隊(duì)列中刪除。如果這個(gè)任務(wù)非常重要,或者一定要執(zhí)行成功,那么一旦任務(wù)在執(zhí)行過(guò)程中拋出異常,那么這個(gè)任務(wù)就再也找不回來(lái)了,這是非??膳碌氖虑?。
還好在RabbitMQ中我們可以為已經(jīng)分配的消息和消息隊(duì)列之間創(chuàng)建一個(gè)應(yīng)答關(guān)系:
如果消息處理成功,那么就發(fā)送一個(gè)答復(fù)給消息隊(duì)列,告訴它:我已經(jīng)成功處理消息,不再需要這條消息了,你可以刪除了,于是消息隊(duì)列就將已經(jīng)應(yīng)答的消息從消息隊(duì)列中刪除。
如果處理失敗,也就是沒(méi)有收到應(yīng)答,那么就將這條消息重新發(fā)送給該隊(duì)列的其他消費(fèi)者。
要在消費(fèi)者和消息隊(duì)列之間建立這種應(yīng)答關(guān)系我們只需要將channel的consume函數(shù)的noAck參數(shù)設(shè)成false就可以了。
ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務(wù) new Task(obj.taskName,obj.costTime); }, {noAck: false}); // 這里設(shè)置成false
下面我們就模擬一下消息處理失敗的場(chǎng)景:
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽(tīng)隊(duì)列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定義新的任務(wù) new Task(obj.taskName,obj.costTime,function(){ if(obj.taskName === "task2") throw new Error("Test error"); else ch.ack(msg); }); // 如果是任務(wù)二,那么就拋出異常。 }, {noAck: false}); }); });
按照上面的腳本執(zhí)行順序,我們?cè)趫?zhí)行一遍腳本: consumer2得到執(zhí)行task2消息,然后馬上拋出異常退出進(jìn)行,然后消息隊(duì)列再將這個(gè)消息分配給cosumer1,接著也執(zhí)行失敗了,退出進(jìn)程,最終消息隊(duì)列中將只會(huì)有一個(gè)task2的消息存在。
啟動(dòng)消費(fèi)者等待消息
# shell1 開(kāi)啟消費(fèi)者1 node rabbit-consumer.js # shell2 開(kāi)啟消費(fèi)者2 node rabbit-consumer.js
創(chuàng)建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 10 node rabbit-producer.js task3 0
我們能來(lái)看一下結(jié)果:
# shell2 消費(fèi)者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 消費(fèi)者2執(zhí)行任務(wù)2的時(shí)候拋出異常,task2將會(huì)重新發(fā)送給消費(fèi)者1 ... throw new Error("Error test"); # shell1 消費(fèi)者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished Get the task task2 # 消費(fèi)者1接收到任何2 task2 finished ... throw new Error("Error test"); # 也拋出異常了
最終會(huì)在消息隊(duì)列中剩下一條未消費(fèi)的信息。
更加均衡的負(fù)載這里有一點(diǎn)需要注意,如果你將noAck選項(xiàng)設(shè)置成了false,那么如果消息處理成功,一定要進(jìn)行應(yīng)答,負(fù)責(zé)消息隊(duì)列中的消息會(huì)越來(lái)越多,直到撐爆內(nèi)存。
在上文中我們聽(tīng)到過(guò)消息隊(duì)列通過(guò)Round-robin算法來(lái)將消息分配給消費(fèi)者,但是這個(gè)分配過(guò)程是盲目的。比如現(xiàn)在有兩個(gè)消費(fèi)者,consumer1和consumer2,按照Round-robin算法就會(huì)將奇數(shù)編號(hào)的任務(wù)發(fā)配給consumer1,將偶數(shù)編號(hào)的任務(wù)分配給consumer2,但是這些任務(wù)恰好有一個(gè)特性,奇數(shù)編號(hào)的任務(wù)比較繁重,而偶數(shù)編號(hào)的任務(wù)就比較簡(jiǎn)單。
那么這就會(huì)造成一個(gè)問(wèn)題,那就是consumer1會(huì)被累死,而consumer2會(huì)被閑死。造成了負(fù)載不均衡。要是每一個(gè)消息都被成功消費(fèi)以后告訴消息隊(duì)列,然后消息隊(duì)列再將新的消息分配給空閑下來(lái)的消費(fèi)者不就好了。
RabbitMQ中的確有這樣的一個(gè)配置選項(xiàng)。那就是ch.prefetch(1);
我們現(xiàn)在就來(lái)模擬一下
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 監(jiān)聽(tīng)隊(duì)列上面的消息 ch.prefetch(1); // 添加這一行 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); new Task(obj.taskName,obj.costTime ,function () { ch.ack(msg); }); }, {noAck: false}); }); });
啟動(dòng)消費(fèi)者等待消息
# shell1 開(kāi)啟消費(fèi)者1 node rabbit-consumer.js # shell2 開(kāi)啟消費(fèi)者2 node rabbit-consumer.js
創(chuàng)建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 20 node rabbit-producer.js task3 0 node rabbit-producer.js task4 20
# shell1 開(kāi)啟消費(fèi)者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任務(wù)馬上結(jié)束 task1 finished Get the task task3 # 任務(wù)馬上結(jié)束 task3 finished Get the task task4 # 任務(wù)四被分配到consumer1中了 task4 finished # shell2 開(kāi)啟消費(fèi)者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/86315.html
摘要:平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會(huì)丟失,支持消息確認(rèn)。沒(méi)有任何消息超時(shí)當(dāng)這個(gè)消費(fèi)者中止了,將會(huì)重新分配消息時(shí)。這是因?yàn)橹皇钦{(diào)度消息時(shí),消息進(jìn)入隊(duì)列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個(gè) Hello World 教程中,我們編寫了從指定隊(duì)列發(fā)送...
摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒(méi)有發(fā)送,會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過(guò)一個(gè)...
摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開(kāi)放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。 介紹 RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就...
摘要:這樣的消息分發(fā)機(jī)制稱作輪詢。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...
摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過(guò)持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。 一、內(nèi)容大綱&使用場(chǎng)景 1. 消息隊(duì)列解決了什么問(wèn)題? 異步處理 應(yīng)用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...
閱讀 3232·2021-10-08 10:04
閱讀 1190·2021-09-30 09:48
閱讀 3567·2021-09-22 10:53
閱讀 1777·2021-09-10 11:22
閱讀 1784·2021-09-06 15:00
閱讀 2255·2019-08-30 15:56
閱讀 774·2019-08-30 15:53
閱讀 2367·2019-08-30 13:04