摘要:推廣專(zhuān)題講座開(kāi)源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來(lái)使用,假如隊(duì)列綁定到了那么它會(huì)接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會(huì)更新在上面
前言在之前的建立路由中我們改進(jìn)了日志系統(tǒng)。我們摒棄無(wú)腦發(fā)送消息的廣播路由器,而使用能夠根據(jù)綁定鍵(binding key)來(lái)發(fā)送消息的,從而能有有選擇的后去logs.
盡管使用直達(dá)路由器大大的改進(jìn)了我們系統(tǒng),但也存在局限性 - 無(wú)法加入更多條件。比如我們希望能夠加入更多的維度,我們希望不僅是基于嚴(yán)重程度,而且是基于來(lái)源,如果你對(duì)linux tool工具有了解的話,它不僅僅是基于嚴(yán)重程度(info/warn/crit...) 而且有來(lái)源(auth/cron/kern...),這個(gè)給到我們更大的靈活性-我們需要監(jiān)聽(tīng)所有來(lái)自"cron"的errors消息,以及來(lái)自"kern"的所有l(wèi)og。所以我們需要的是一個(gè)更復(fù)雜的主題交換機(jī)
主題交換機(jī)發(fā)送到主題交換機(jī)的消息并不會(huì)有一個(gè)確定的路由鍵-而是一長(zhǎng)串字符列表,以"."來(lái)分割,而這個(gè)字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大長(zhǎng)度限制在255bytes。
同時(shí),在隊(duì)列綁定交換機(jī)時(shí)也需要指定模式,而符合模式的消息將會(huì)被發(fā)送至該隊(duì)列,模式可以由通配符組成:
"*" 可以表示一個(gè)詞
"#" 表示0個(gè)或多個(gè)詞
可以通過(guò)如下的例子來(lái)說(shuō)明
請(qǐng)看例子,以發(fā)送動(dòng)物的消息為例,我們會(huì)發(fā)送包含三個(gè)詞的路由鍵(兩個(gè)".")。第一個(gè)是速度,第二個(gè)是顏色,而第三個(gè)是種族
同時(shí),我們建立了三個(gè)綁定,Q1綁定了鍵".orange.",Q2綁定了鍵"..rabbit"以及"lazy.#"??梢宰鋈缦碌慕忉?zhuān)琎1用來(lái)接受所有orange的動(dòng)物,Q2用來(lái)接受所有rabbits,以及l(fā)azy的動(dòng)物
一個(gè)路由為"quick.orange.rabbit"的消息將會(huì)被同時(shí)發(fā)送給這兩個(gè)隊(duì)列,消息"lazy.orange.elephant"也會(huì)被同時(shí)發(fā)給它們;"quick.orange.fox"只會(huì)發(fā)給第一個(gè)隊(duì)列;"lazy.brown.fox"會(huì)發(fā)到第二個(gè);"lazy.pink.rabbit"將只會(huì)發(fā)送給第二個(gè);"quick.brown.fox"會(huì)被丟棄因?yàn)槠ヅ洳簧先魏我粋€(gè)。
如果我們發(fā)送四個(gè)詞的呢?比如"oragne"或者"quick.orange.male.rabbit"?這些沒(méi)有任何匹配的隊(duì)列將會(huì)丟失。但比如"quick.orange.male.rabbit"會(huì)匹配到第二個(gè)隊(duì)列。
主題交換機(jī)也可以當(dāng)成其它交換機(jī)來(lái)使用,假如隊(duì)列綁定到了 "#",那么它會(huì)接收所有的消息,就像廣播路由器一樣;而如果未使用"*","#",那么就跟直達(dá)路由器一樣了。
整合所有的代碼我們用主題交換機(jī)替換掉之前的直達(dá)交換機(jī),用如同"
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent "" + routingKey + "":"" + message + """); connection.close(); } //... }
ReceiveLogsTopic.java的代碼片段
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯這段代碼
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接受所有的logs
java -cp $CP ReceiveLogsTopic "#"
接受來(lái)自"kern"的消息
java -cp $CP ReceiveLogsTopic "kern.*"
接受來(lái)自"critical"的消息
java -cp $CP ReceiveLogsTopic "*.critical"
創(chuàng)建多個(gè)綁定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發(fā)送消息
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
你可以嘗試更多的參數(shù),以此來(lái)熟悉這個(gè)知識(shí)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/68117.html
摘要:推廣專(zhuān)題講座開(kāi)源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。如此一來(lái)路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專(zhuān)題講座 https://segmentfault.com/l/15... CoolMQ開(kāi)源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖荚创a:https...
摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運(yùn)行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個(gè)或四個(gè)詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡(jiǎn)單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒(méi)有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡(jiǎn)單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...
摘要:概述概述消息隊(duì)列,是分布式系統(tǒng)中重要的組件,是一種進(jìn)程間通信或者是同一進(jìn)程的不同線程的通信方式。消息隊(duì)列的使用場(chǎng)景消息隊(duì)列的使用場(chǎng)景異步處理流量控制應(yīng)用解耦應(yīng)用解耦應(yīng)用解耦消息隊(duì)列的一個(gè)作用就是實(shí)現(xiàn)系統(tǒng)應(yīng)用之間的解耦。概述消息隊(duì)列(Message Queue),是分布式系統(tǒng)中重要的組件,是一種進(jìn)程間通信或者是同一進(jìn)程的不同線程的通信方式。和 http 同步協(xié)議不同的是,消息隊(duì)列是一種異步的通...
摘要:消息隊(duì)列,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息。由在與時(shí)指定,而由發(fā)送時(shí)指定,兩者的匹配方式由決定。需要為每一個(gè)創(chuàng)建,協(xié)議規(guī)定只有通過(guò)才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。 安裝 rabbitmq 在 mac 下可以直接用 brew 安裝默認(rèn)安裝在 /usr/local/Cellar/下命令被軟連接加入到了/usr/local...
閱讀 3213·2021-11-22 12:01
閱讀 3833·2021-08-30 09:46
閱讀 834·2019-08-30 13:48
閱讀 3275·2019-08-29 16:43
閱讀 1735·2019-08-29 16:33
閱讀 1916·2019-08-29 13:44
閱讀 1479·2019-08-26 13:45
閱讀 2287·2019-08-26 11:44