成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

RabbitMQ快速入門

Moxmi / 530人閱讀

摘要:就是交換機(jī)生產(chǎn)者發(fā)送消息給交換機(jī),然后由交換機(jī)將消息轉(zhuǎn)發(fā)給隊(duì)列。對(duì)應(yīng)于中則是發(fā)送一個(gè),處理完成之后將其返回給。這樣來說一個(gè)是級(jí)別而不是級(jí)別的了。當(dāng)然這些也都是官網(wǎng)的入門例子,后續(xù)有機(jī)會(huì)的話再深入研究。

一、前言

RabbitMQ其實(shí)是我最早接觸的一個(gè)MQ框架,我記得當(dāng)時(shí)是在大學(xué)的時(shí)候跑到圖書館一個(gè)人去看,由于RabbitMQ官網(wǎng)的英文還不算太難,因此也是參考官網(wǎng)學(xué)習(xí)的,一共有6章,當(dāng)時(shí)是用Node來開發(fā)的,當(dāng)時(shí)花了一下午看完了,也理解了。而現(xiàn)在回過頭來再看,發(fā)現(xiàn)已經(jīng)忘記了個(gè)差不多了,現(xiàn)在再回過頭來繼續(xù)看看,然乎記之。以防再忘,讀者看時(shí)最好有一定的MQ基礎(chǔ)。

二、RabbitMQ

首先我們需要知道的是RabbitMQ它是基于高級(jí)隊(duì)列協(xié)議(AMQP)的,它是Elang編寫的,下面將圍繞RabbitMQ隊(duì)列、交換機(jī)、RPC三個(gè)重點(diǎn)進(jìn)行展開。

2.1、隊(duì)列

存儲(chǔ)消息的地方,多個(gè)生產(chǎn)者可以將消息發(fā)送到一個(gè)隊(duì)列,多個(gè)消費(fèi)者也可以消費(fèi)同一個(gè)隊(duì)列的消息。

注意:當(dāng)多個(gè)消費(fèi)者監(jiān)聽一個(gè)隊(duì)列,此時(shí)生產(chǎn)者發(fā)送消息到隊(duì)列只有一個(gè)消費(fèi)者被消費(fèi),并且消費(fèi)端的消費(fèi)方式是按照消費(fèi)端在內(nèi)部啟動(dòng)的順序輪詢(round-robin)。
2.2、消費(fèi)者

消費(fèi)消息的一方

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent "" + message + """);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received "" + message + """);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
2.3、小結(jié)

1、Rabbit是如何保證消息被消費(fèi)的?
答:通過ack機(jī)制。每當(dāng)一個(gè)消息被消費(fèi)端消費(fèi)的時(shí)候,消費(fèi)端可以發(fā)送一個(gè)ack給RabbitMQ,這樣RabbitMQ就知道了該條消息已經(jīng)被完整消費(fèi)并且可以被delete了。;如果一條消息被消費(fèi)但是沒有發(fā)送ack,那么此時(shí)RabbitMQ將會(huì)認(rèn)為需要重新消費(fèi)該消息,如果此時(shí)還有其它的消費(fèi)者,那么此時(shí)RabbitMQ將會(huì)把這條消息交給它處理。

注意:開啟ack機(jī)制的是autoAck=false;

2、消息如何進(jìn)行持久化?

將queue持久化,即設(shè)置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二個(gè)參數(shù)durable為true

設(shè)置消息持久化,即設(shè)置MessageProperties.PERSISTENT_TEXT_PLAIN

注意:消息持久化并不一定保證消息不會(huì)被丟失

3、RabbitMQ如何避免兩個(gè)消費(fèi)者一個(gè)非常忙一個(gè)非常閑的情況?
通過如下設(shè)置,保證一個(gè)消費(fèi)者一次只能消費(fèi)一個(gè)消息,只有當(dāng)它消費(fèi)完成并且返回ack給RabbitMQ之后才給它派發(fā)新的消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

4、RabbitMQ異常情況下如何保證消息不會(huì)被重復(fù)消費(fèi)?
需要業(yè)務(wù)自身實(shí)現(xiàn)密等性,RabbitMQ沒有提供比較好的方式去保證。

2.2、交換機(jī)

在RabbitMQ中,生產(chǎn)者其實(shí)從來不會(huì)發(fā)送消息到隊(duì)列,甚至,它不知道消息被發(fā)送到了哪個(gè)隊(duì)列。那它被發(fā)送到了哪里呢?就是本節(jié)的重點(diǎn):交換機(jī),下面就是它在RabbitMQ中的介紹圖。(X就是交換機(jī))生產(chǎn)者發(fā)送消息給交換機(jī),然后由交換機(jī)將消息轉(zhuǎn)發(fā)給隊(duì)列。

從上圖就產(chǎn)生一個(gè)問題:X怎么將消息發(fā)給queue呢?它是把消息發(fā)給所有queue還是發(fā)給一個(gè)指定的queue或者丟棄消息呢?這就是看交換機(jī)的類型了。下面一起談?wù)勥@幾種類型

2.2.1、fanout

fanout:廣播模式,這個(gè)比較好理解,就是所有的隊(duì)列都能收到交換機(jī)的消息。

如上面,兩個(gè)隊(duì)列都能收到交換機(jī)的消息。

2.2.2、direct

這個(gè)模式相當(dāng)于發(fā)布/訂閱模式的一種,當(dāng)交換機(jī)類型為direct的時(shí)候,此時(shí)我們需要設(shè)置兩個(gè)參數(shù):

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二個(gè)參數(shù),我們可以把它稱呼為routeKey

channel.queueBind(queueName, EXCHANGE_NAME, "");第三個(gè)參數(shù),我們把它稱呼為bindKey

有了這兩個(gè)參數(shù),我們就可以指定我們訂閱哪些消息了。


如圖,Q1訂閱了orange的消息,Q2訂閱了black、green的消息。

2.2.3、topic

其實(shí)topic和direct有一點(diǎn)類似,它相當(dāng)于對(duì)direct作了增強(qiáng)。在direct中,我們上面所說的bind routeKey為black、green的它是有限制的,它只能絕對(duì)的等于routeKey,但是有時(shí)候我們的需求不是這樣,我們可能想要的是正則匹配即可,那么Topic就派上用場(chǎng)了。

當(dāng)類型為topic時(shí),它的bindKey對(duì)應(yīng)字符串需要是以“.”分割,同時(shí)RabbitMQ還提供了兩個(gè)符號(hào):

星號(hào)(*):表示1個(gè)單詞

井號(hào)(#):表示0、多個(gè)單詞

上圖的意思是:所有第二個(gè)單詞為orange的消息發(fā)送個(gè)Q1,所有最后一個(gè)單詞為rabbit或者第一個(gè)單詞為lazy的消息發(fā)送給Q2。

2.2.4、header

這一種類型官方demo沒有過多解釋,這里也不研究了。

2.3、RPC

RabbitMQ 還可以實(shí)現(xiàn)RPC(遠(yuǎn)程過程調(diào)用)。什么是RPC,簡(jiǎn)單來說就是local調(diào)用remote方法。對(duì)應(yīng)于RabbitMQ中則是Client發(fā)送一個(gè)request message,Server處理完成之后將其返回給Client。這里就有了一個(gè)疑問?Server是如何將response返回給Client的,這里RabbitMQ定義了一個(gè)概念:Callback Queue。
Callback Queue
注意這個(gè)隊(duì)列是獨(dú)一無二的String replyQueueName = channel.queueDeclare().getQueue();。
首先我們需要明白一點(diǎn)的是為什么需要這個(gè)queue?我們知道在RabbitMQ作消息隊(duì)列的時(shí)候,Client只需要將消息投放到queue中,然后Server從queue去取就可以了。但是在RabbitMQ作為RPC的時(shí)候多了一點(diǎn)就是,Client還需要返回結(jié)果,這時(shí)Server端怎么知道把消息發(fā)送給Client,這就是Callback Queue的用處了。
Correlation Id
在上面我們知道Server返回?cái)?shù)據(jù)給Client是通過Callback Queue的,那么是為每一個(gè)request都創(chuàng)建一個(gè)queue嗎?這未免太過浪費(fèi)資源,RabbitMQ有更好的方案。在我們發(fā)送request,綁定一個(gè)唯一ID(correlationId),然后在消息被處理返回的時(shí)候取出這個(gè)ID和發(fā)出去的ID進(jìn)行匹配。這樣來說一個(gè)Callback Queue是Client級(jí)別而不是request級(jí)別的了。

實(shí)現(xiàn)
上面介紹了RabbitMQ實(shí)現(xiàn)RPC最重要的兩個(gè)概念,具體代碼比較簡(jiǎn)單還是貼下把。
client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got "" + response + """);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服務(wù)端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
三、總結(jié)

這次回頭再看RabbitMQ,再次重新理解了以下RabbitMQ,有些東西還是要慢慢嚼的。當(dāng)然這些也都是官網(wǎng)的入門例子,后續(xù)有機(jī)會(huì)的話再深入研究。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/74445.html

相關(guān)文章

  • 快速入門spring-amqp

    摘要:它還為具有偵聽器容器的消息驅(qū)動(dòng)的提供支持。接收消息當(dāng)存在基礎(chǔ)結(jié)構(gòu)時(shí),可以使用任何來注釋以創(chuàng)建偵聽器端點(diǎn)。默認(rèn)情況下,如果禁用重試并且偵聽器拋出異常,則會(huì)無限期地重試傳遞。 Spring-amqp-tutorial Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)。它提供了一個(gè)模板作為發(fā)送和接收消息的高級(jí)抽象。它還為具有偵聽器容器的消息驅(qū)動(dòng)的PO...

    鄒強(qiáng) 評(píng)論0 收藏0
  • Spring Cloud構(gòu)建微服務(wù)架構(gòu):消息驅(qū)動(dòng)的微服務(wù)(入門)【Dalston版】

    摘要:它通過使用來連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)的微服務(wù)應(yīng)用。該示例主要目標(biāo)是構(gòu)建一個(gè)基于的微服務(wù)應(yīng)用,這個(gè)微服務(wù)應(yīng)用將通過使用消息中間件來接收消息并將消息打印到日志中。下面我們通過編寫生產(chǎn)消息的單元測(cè)試用例來完善我們的入門內(nèi)容。 之前在寫Spring Boot基礎(chǔ)教程的時(shí)候?qū)戇^一篇《Spring Boot中使用RabbitMQ》。在該文中,我們通過簡(jiǎn)單的配置和注解就能實(shí)現(xiàn)向Rabbi...

    smallStone 評(píng)論0 收藏0
  • RabbitMQ 快速入門 python

    摘要:為了預(yù)防消息丟失,提供了,即工作進(jìn)程在收到消息并處理后,發(fā)送給,告知這時(shí)候可以把該消息從隊(duì)列中刪除了。如果工作進(jìn)程掛掉了,沒有收到,那么會(huì)把該消息重新分發(fā)給其他工作進(jìn)程。之前在發(fā)布消息時(shí),的值為即使用。 HelloWorld 簡(jiǎn)介 RabbitMQ:接受消息再傳遞消息,可以視為一個(gè)郵局。發(fā)送者和接受者通過隊(duì)列來進(jìn)行交互,隊(duì)列的大小可以視為無限的,多個(gè)發(fā)送者可以發(fā)生給一個(gè)隊(duì)列,多個(gè)接收者...

    wenshi11019 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<