成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

白話RabbitMQ(六): RPC

KevinYan / 2968人閱讀

摘要:因?yàn)橄M(fèi)消息是在另外一個(gè)進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長(zhǎng)度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項(xiàng)目

我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會(huì)更新在上面

聲明RPC接口

為了闡述RPC我們先建立一個(gè)客戶端接口,它有一個(gè)方法,會(huì)發(fā)起一個(gè)RPC請(qǐng)求,而且會(huì)一直阻塞直到有結(jié)果返回

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

留意RPC
雖然RPC很常見,但一定要非常小心的使用它,假設(shè)rpc調(diào)用的是一個(gè)非常慢的程序,將導(dǎo)致結(jié)果不可預(yù)料,而且非常難以調(diào)試。

使用RPC時(shí)你可以參考下列一些規(guī)范

系統(tǒng)設(shè)計(jì)上要有詳細(xì)的文檔描述,使組件間的依賴講清晰,做到有據(jù)可查

做好錯(cuò)誤的異常處理,特別是當(dāng)RPC服務(wù)掛掉或很長(zhǎng)時(shí)間沒(méi)有響應(yīng)時(shí)

盡量少用RPC,而使用異步管道,而非阻塞式的RPC,降低系統(tǒng)間的耦合

回調(diào)隊(duì)列(Callback queue)

用RabbitMQ實(shí)現(xiàn)RPC比較簡(jiǎn)單,客戶端發(fā)起請(qǐng)求,服務(wù)端返回對(duì)這個(gè)請(qǐng)求的響應(yīng)。為了實(shí)現(xiàn)這個(gè)功能我們需要一個(gè)能夠"回調(diào)"的隊(duì)列,我們直接用默認(rèn)的隊(duì)列即可

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the     callback_queue ...
消息屬性(Message properties)

AMQP 0-9-1 協(xié)議為每個(gè)消息定義了14個(gè)屬性,很多屬性很少會(huì)被用到,但我們要特別留意如下幾個(gè)

分發(fā)模式(deliveryMode): 標(biāo)記一個(gè)消息是否需要持久化(persistent)或者是需要事務(wù)(transient)等,在第二章中有描述

消息體類型(contentType): 描述消息中傳遞具體內(nèi)容的編碼方式,比如我們經(jīng)常使用的JSON可以設(shè)置成:application/json

消息回應(yīng)(replyTo):用于回調(diào)隊(duì)列

關(guān)系Id(correlationId): 用于將RPC的返回值關(guān)聯(lián)到對(duì)應(yīng)的請(qǐng)求。

我們需要引入相應(yīng)的包

import com.rabbitmq.client.AMQP.BasicProperties;
關(guān)系Id(Correlation Id)

在前面的方法中我們?yōu)槊恳粋€(gè)RPC請(qǐng)求都生成了一個(gè)隊(duì)列,這是完全沒(méi)有必要的,我們?yōu)槊恳粋€(gè)客戶端建立一個(gè)隊(duì)列就可以了。

這會(huì)引起一個(gè)新的問(wèn)題,因?yàn)樗械腞PC都是用一個(gè)隊(duì)列,一旦有消息返回,你怎么知道返回的消息對(duì)應(yīng)的是哪個(gè)請(qǐng)求呢?所以我們就用到了Correlation Id,作為每個(gè)請(qǐng)求獨(dú)一無(wú)二的標(biāo)識(shí),當(dāng)我們收到返回值后,會(huì)檢查這個(gè)Id,匹配對(duì)應(yīng)的響應(yīng)。如果找不到Id所對(duì)應(yīng)的請(qǐng)求,會(huì)直接拋棄它。

這里你可能會(huì)有疑問(wèn),為什么要拋棄掉未知消息呢?而不是拋出異常啥的。這跟我們服務(wù)端的競(jìng)態(tài)條件(possibility of a race condition )會(huì)有關(guān)系。比如假設(shè)我們RabbitMQ服務(wù)掛掉了,它剛給我們回復(fù)消息,還沒(méi)等到回應(yīng),服務(wù)器就掛掉了,那么當(dāng)RabbitMQ服務(wù)重啟時(shí),會(huì)重發(fā)消息,客戶端會(huì)收到一條重復(fù)的消息,為了冥等性的考慮,我們需要仔細(xì)的處理返回后的處理方式。

小結(jié)

RPC工作過(guò)程如下

當(dāng)客戶端啟動(dòng)時(shí),它會(huì)創(chuàng)建一個(gè)獨(dú)立的匿名回調(diào)隊(duì)列,然后發(fā)送RPC請(qǐng)求,這個(gè)RPC
請(qǐng)求會(huì)帶兩個(gè)屬性:replyTo - RPC調(diào)用成功后需要返回的隊(duì)列名稱;correlationId - 每個(gè)請(qǐng)求獨(dú)一無(wú)二的標(biāo)識(shí)。RPC服務(wù)提供者會(huì)等在隊(duì)列上,一旦有請(qǐng)求到達(dá),它會(huì)立即響應(yīng),把自己的活干完,然后返回一個(gè)結(jié)果,根據(jù)replyTo返回到對(duì)應(yīng)的隊(duì)列。而客戶端也會(huì)等著隊(duì)列中的信息返回,一旦有一個(gè)消息出現(xiàn),會(huì)檢查correlationId,將結(jié)果返回給響應(yīng)的請(qǐng)求發(fā)起者

整合

Fibonacci級(jí)數(shù)

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我們定義個(gè)一個(gè)fibonacci級(jí)數(shù),只能接受正整數(shù),而且是效率不怎么高的那種。
rpc.java如下所示

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection      = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    String response = "";

                    try {
                        String message = new String(body,"UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    }
                    catch (RuntimeException e){
                        System.out.println(" [.] " + e.toString());
                    }
                    finally {
                        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(), false);

            // RabbitMq consumer worker thread notifies the RPC server owner thread 
                    synchronized(this) {
                        this.notify();
                    }
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

            // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized(consumer) {
        try {
              consumer.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();          
            }
            }
         }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
        try {
                connection.close();
             } catch (IOException _ignore) {}
         }
    }
}

服務(wù)端的代碼比較直接,首先建立連接,建立channel以及聲明隊(duì)列。我們之后可能會(huì)建立多個(gè)消費(fèi)者,為了更好的負(fù)載均衡,需要在channel.basicQos中設(shè)置prefetchCount,然后設(shè)置一個(gè)basicConsume監(jiān)聽隊(duì)列,提供一個(gè)回調(diào)函數(shù)來(lái)處理請(qǐng)求以及返回值

RPCClient.java

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException, InterruptedException {
        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue response = new ArrayBlockingQueue(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }

    //...
}

客戶端代碼如下,我們建立一個(gè)連接,聲明一個(gè)"callback"隊(duì)列,我們將會(huì)往"callback"隊(duì)列提交消息,并接收RPC的返回值,具體步驟如下:

我們首先生成一個(gè)唯一的correlation Id,并保存,我們將會(huì)使用它來(lái)區(qū)分之后所接受到的信息。然后發(fā)出這個(gè)消息,消息會(huì)包含兩個(gè)屬性: replyTo以及collelationId。因?yàn)橄M(fèi)消息是在另外一個(gè)進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列BlockingQueue是一種非常好的方式,這里我們使用了長(zhǎng)度為1的ArrayBlockQueue,handleDelivery的功能是檢查消息的的correlationId是不是我們之前所發(fā)送的,如果是,將返回值返回到BlockingQueue。此時(shí)主線程會(huì)等待返回并從ArrayBlockQueue取到返回值

從客戶端發(fā)起請(qǐng)求

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got "" + response + """);

fibonacciRpc.close();

源代碼參考RPCClient.java 和 RPCServer.java
編譯

javac -cp $CP RPCClient.java RPCServer.java

我們的rpc服務(wù)端好了,啟動(dòng)服務(wù)

java -cp $CP RPCServer
# => [x] Awaiting RPC requests

為了獲取fibonacci級(jí)數(shù)我們只需要運(yùn)行客戶端:

java -cp $CP RPCClient
# => [x] Requesting fib(30)

以上的實(shí)現(xiàn)方式并非建立RPC請(qǐng)求唯一的方式,但是它有很多優(yōu)點(diǎn):如果一個(gè)RPC服務(wù)過(guò)于緩慢,你可以非常方便的水平擴(kuò)展,只需要增加消費(fèi)者的個(gè)數(shù)即可,我們的代碼還是比較簡(jiǎn)單的,有些負(fù)責(zé)的問(wèn)題并未解決,比如

如果服務(wù)全部掛了,客戶端要如何處理

如果服務(wù)超時(shí)該如何處理

非法信息該如何處理

基礎(chǔ)章節(jié)的內(nèi)容到此就結(jié)束了,到這里,你就能夠基本明白消息隊(duì)列的基本用法,接下來(lái)我們可以進(jìn)入中級(jí)內(nèi)容內(nèi)容的學(xué)習(xí)了。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/68133.html

相關(guān)文章

  • 【譯】RabbitMQ系列()-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過(guò)指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評(píng)論0 收藏0
  • RabbitMQ+PHP 教程RPC

    摘要:有助于將響應(yīng)與請(qǐng)求關(guān)聯(lián)起來(lái)。如果發(fā)生這種情況,重新啟動(dòng)的服務(wù)器將再次處理請(qǐng)求。又名服務(wù)器正在等待該隊(duì)列上的請(qǐng)求。當(dāng)消息出現(xiàn)時(shí),它檢查屬性。然后,我們進(jìn)入循環(huán),在其中等待請(qǐng)求消息,完成工作并發(fā)送響應(yīng)。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運(yùn)行(5672)。如果您使用不同的主機(jī)、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 如果您在本教程中遇到...

    anquan 評(píng)論0 收藏0
  • 白話rabbitmq(一): HelloWorld

    摘要:作為消息隊(duì)列的一個(gè)典型實(shí)踐,完全實(shí)現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個(gè)隊(duì)列不僅可以綁定多個(gè)生產(chǎn)者,而且能夠發(fā)送消息到多個(gè)消費(fèi)者。消費(fèi)者接受并消費(fèi)消息。幾乎于完全類似是一個(gè)繼承了接口的類,方便我們來(lái)存儲(chǔ)消息隊(duì)列來(lái)的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的...

    garfileo 評(píng)論0 收藏0
  • 白話RabbitMQ(三):發(fā)布/訂閱

    摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀。因此一旦有有消息,消息會(huì)廣播到所有的消費(fèi)者。如此一來(lái)路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請(qǐng)大家圍觀??梢詤⒖荚创a:https...

    Ververica 評(píng)論0 收藏0
  • 白話RabbitMQ(四): 建立路由

    摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會(huì)更新在上面前言在訂閱發(fā)布中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會(huì)發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會(huì)發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...

    CoderStudy 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<