成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

RabbitMQ+PHP 教程六(RPC)

anquan / 2879人閱讀

摘要:有助于將響應(yīng)與請求關(guān)聯(lián)起來。如果發(fā)生這種情況,重新啟動的服務(wù)器將再次處理請求。又名服務(wù)器正在等待該隊列上的請求。當(dāng)消息出現(xiàn)時,它檢查屬性。然后,我們進入循環(huán),在其中等待請求消息,完成工作并發(fā)送響應(yīng)。

(using php-amqplib)

前提必讀

本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。

如果您在本教程中遇到困難,可以通過郵件列表與我們聯(lián)系。

開始

在第二個教程中,我們學(xué)習(xí)了如何使用工作隊列在多個工人之間分配耗時的任務(wù)。

但是如果我們需要在遠程計算機上運行一個函數(shù)并等待結(jié)果呢?嗯,那是另一回事了。這種模式通常稱為遠程過程調(diào)用或RPC。

在本教程中我們將使用RabbitMQ搭建一個RPC系統(tǒng):一個客戶端和一個可擴展的RPC服務(wù)器。由于我們沒有任何值得分配的耗時的任務(wù),所以我們將創(chuàng)建一個返回Fibonacci數(shù)的模擬一個RPC服務(wù)。

Client interface

為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個簡單的客戶類。它將公開一個名為調(diào)用的方法,該方法發(fā)送一個RPC請求并阻塞直到接收到結(jié)果為止:

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "
";
關(guān)于RPC的一些建議

雖然RPC是計算中非常常見的模式,但它經(jīng)常遭到批評。當(dāng)程序員不知道函數(shù)調(diào)用是本地的,或者它是一個緩慢的RPC時,問題就出現(xiàn)了。這樣的混亂導(dǎo)致了不可預(yù)知的系統(tǒng),并給調(diào)試增加了不必要的復(fù)雜性。而簡化軟件,濫用會導(dǎo)致難以維護的RPC代碼。

考慮到這一點,請考慮以下建議:

確保很明顯哪個函數(shù)調(diào)用是本地調(diào)用,并且它是遠程的。
記錄系統(tǒng)。使組件之間的依賴關(guān)系清晰。
處理錯誤案例。RPC服務(wù)器長時間處于下行狀態(tài)時,客戶端應(yīng)如何響應(yīng)?
有疑問時避免RPC。如果可以,則應(yīng)該使用異步管道,而不是像阻塞這樣的RPC,結(jié)果被異步推送到下一個計算階段。

回調(diào)隊列(Callback queue)

一般在RabbitMQ做RPC是容易的。客戶端發(fā)送一條請求消息和一個響應(yīng)消息的服務(wù)器回復(fù)。為了接收響應(yīng),我們需要向請求發(fā)送一個“回調(diào)”隊列地址。我們可以使用默認隊列。讓我們試試看:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
    $payload,
    array("reply_to" => $queue_name));

$channel->basic_publish($msg, "", "rpc_queue");
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協(xié)議(0-9-1 protocol)預(yù)定義了一套14個屬性,去一個消息。大多數(shù)屬性很少使用,除了以下內(nèi)容:

delivery_mode: 將消息標(biāo)記為持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。
content_type:用來描述編碼的MIME類型。例如,對于常用的JSON編碼,將此屬性設(shè)置為應(yīng)用程序/ JSON是一個很好的做法。
reply_to:常用的名字一個回調(diào)隊列。
correlation_id:有助于將RPC響應(yīng)與請求關(guān)聯(lián)起來。

Correlation Id

在上面介紹的方法中,我們建議為每個RPC請求創(chuàng)建一個回調(diào)隊列。這是非常低效的,但幸運的是有一個更好的方法——讓我們?yōu)槊總€客戶機創(chuàng)建一個回調(diào)隊列。

這引發(fā)了一個新問題,在隊列中收到了響應(yīng),不清楚響應(yīng)的請求屬于哪個。那時候correlation_id屬性用于。我們將把它設(shè)置為每個請求的唯一值。稍后,當(dāng)我們在回調(diào)隊列中接收消息時,我們將查看這個屬性,并在此基礎(chǔ)上,我們將能夠?qū)㈨憫?yīng)與請求匹配。如果我們看到一個未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請求。

您可能會問,為什么我們應(yīng)該忽略回調(diào)隊列中的未知消息,而不是失敗出錯呢?這是由于服務(wù)器端可能出現(xiàn)競爭情況。雖然不太可能,RPC服務(wù)器可能在發(fā)送完答案后死亡,但在發(fā)出請求的確認消息之前。如果發(fā)生這種情況,重新啟動的RPC服務(wù)器將再次處理請求。這就是為什么在客戶機上我們必須優(yōu)雅地處理重復(fù)響應(yīng),而RPC應(yīng)該理想地是冪等的。

總結(jié)

我們的RPC會像這樣工作:

當(dāng)客戶端啟動時,它創(chuàng)建一個匿名的獨占回調(diào)隊列。

一個RPC請求,客戶端發(fā)送消息,兩個屬性:reply_to,設(shè)置回調(diào)隊列和correlation_id,它被設(shè)置為每個請求的唯一值。

請求被發(fā)送到一個rpc_queue隊列。

RPC worker(又名:服務(wù)器)正在等待該隊列上的請求。當(dāng)一個請求時,它的工作和發(fā)送消息的結(jié)果返回給客戶端,使用從reply_to隊列。

客戶機等待回調(diào)隊列上的數(shù)據(jù)。當(dāng)消息出現(xiàn)時,它檢查correlation_id屬性。如果它與請求的值匹配,則返回對應(yīng)用程序的響應(yīng)。

匯總

Fibonacci 遞歸源碼:

function fib($n) {
    if ($n == 0)
        return 0;
    if ($n == 1)
        return 1;
    return fib($n-1) + fib($n-2);
}
``
我們聲明fibonacci(斐波那契)函數(shù)。它只假設(shè)有效的正整數(shù)輸入。(不要指望這一個能為大數(shù)字工作,而且這可能是最慢的遞歸實現(xiàn))。

我們的RPC服務(wù)器rpc_server.php代碼看起來像這樣:

require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");
$channel = $connection->channel();

$channel->queue_declare("rpc_queue", false, false, false, false);

function fib($n) {

if ($n == 0)
    return 0;
if ($n == 1)
    return 1;
return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body);
echo " [.] fib(", $n, ")
";

$msg = new AMQPMessage(
    (string) fib($n),
    array("correlation_id" => $req->get("correlation_id"))
    );

$req->delivery_info["channel"]->basic_publish(
    $msg, "", $req->get("reply_to"));
$req->delivery_info["channel"]->basic_ack(
    $req->delivery_info["delivery_tag"]);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume("rpc_queue", "", false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>

服務(wù)器代碼相當(dāng)簡單:

像往常一樣,我們從建立連接、通道和聲明隊列開始。

我們可能需要運行多個服務(wù)器進程。為了分散負載同樣多的服務(wù)器需要設(shè)置`prefetch_count`, 設(shè)置`$channel.basic_qos`美元。

我們用`basic_consume`訪問隊列。然后,我們進入while循環(huán),在其中等待請求消息,完成工作并發(fā)送響應(yīng)。

我們rpc_client.php RPC客戶端代碼:

require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;

public function __construct() {
    $this->connection = new AMQPStreamConnection(
        "localhost", 5672, "guest", "guest");
    $this->channel = $this->connection->channel();
    list($this->callback_queue, ,) = $this->channel->queue_declare(
        "", false, false, true, false);
    $this->channel->basic_consume(
        $this->callback_queue, "", false, false, false, false,
        array($this, "on_response"));
}
public function on_response($rep) {
    if($rep->get("correlation_id") == $this->corr_id) {
        $this->response = $rep->body;
    }
}

public function call($n) {
    $this->response = null;
    $this->corr_id = uniqid();

    $msg = new AMQPMessage(
        (string) $n,
        array("correlation_id" => $this->corr_id,
              "reply_to" => $this->callback_queue)
        );
    $this->channel->basic_publish($msg, "", "rpc_queue");
    while(!$this->response) {
        $this->channel->wait();
    }
    return intval($this->response);
}

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

現(xiàn)在是一個很好的時間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。

我們的RPC服務(wù)現(xiàn)在準(zhǔn)備好了。我們可以啟動服務(wù)器:

php rpc_server.php
# => [x] Awaiting RPC requests

請求斐波那契數(shù)運行客戶機:

php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設(shè)計并不是RPC服務(wù)的唯一實現(xiàn),但它有一些重要的要點:

如果RPC服務(wù)器太慢,您可以通過運行另一個服務(wù)器來擴展。試著在一個新的控制臺再運行第一個:rpc_server.php。

在客戶端,RPC只需要發(fā)送和接收一條消息。不喜歡queue_declare需要同步調(diào)用。因此,RPC客戶機只需要一次RPC請求的一次網(wǎng)絡(luò)往返。

我們的代碼仍然非常簡單,并沒有試圖解決更復(fù)雜(但重要)的問題,例如:

如果沒有服務(wù)器運行,客戶端應(yīng)該如何反應(yīng)?

客戶端應(yīng)該對RPC有某種超時嗎?

如果服務(wù)器發(fā)生故障并引發(fā)異常,是否應(yīng)該轉(zhuǎn)發(fā)給客戶端?

在處理前防止無效傳入消息(如檢查邊界、類型)。

如果您想進行實驗,您可能會發(fā)現(xiàn)management UI對于查看隊列非常有用。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/28314.html

相關(guān)文章

  • RabbitMQ+PHP 教程五(Topics)

    摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...

    nemo 評論0 收藏0
  • 【譯】RabbitMQ系列()-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請求,并阻塞知道結(jié)果返回。當(dāng)有消息時,進行計算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個worker之間派發(fā)時間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 評論0 收藏0
  • 白話RabbitMQ(): RPC

    摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..

    KevinYan 評論0 收藏0
  • rabbitmq中文教程python版 - 遠程過程調(diào)用

    摘要:通常用于命名回調(diào)隊列。對每個響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個非常簡單的工作,對于每個響應(yīng)消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數(shù)并保存回調(diào)函數(shù)將使用這個值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...

    chuyao 評論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個郵局:當(dāng)你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<