摘要:有助于將響應(yīng)與請求關(guān)聯(lián)起來。如果發(fā)生這種情況,重新啟動的服務(wù)器將再次處理請求。又名服務(wù)器正在等待該隊列上的請求。當(dāng)消息出現(xiàn)時,它檢查屬性。然后,我們進入循環(huán),在其中等待請求消息,完成工作并發(fā)送響應(yīng)。
(using php-amqplib)
前提必讀本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。
如果您在本教程中遇到困難,可以通過郵件列表與我們聯(lián)系。
開始在第二個教程中,我們學(xué)習(xí)了如何使用工作隊列在多個工人之間分配耗時的任務(wù)。
但是如果我們需要在遠程計算機上運行一個函數(shù)并等待結(jié)果呢?嗯,那是另一回事了。這種模式通常稱為遠程過程調(diào)用或RPC。
在本教程中我們將使用RabbitMQ搭建一個RPC系統(tǒng):一個客戶端和一個可擴展的RPC服務(wù)器。由于我們沒有任何值得分配的耗時的任務(wù),所以我們將創(chuàng)建一個返回Fibonacci數(shù)的模擬一個RPC服務(wù)。
Client interface為了說明如何使用RPC服務(wù),我們將創(chuàng)建一個簡單的客戶類。它將公開一個名為調(diào)用的方法,該方法發(fā)送一個RPC請求并阻塞直到接收到結(jié)果為止:
$fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call(30); echo " [.] Got ", $response, " ";
關(guān)于RPC的一些建議回調(diào)隊列(Callback queue)雖然RPC是計算中非常常見的模式,但它經(jīng)常遭到批評。當(dāng)程序員不知道函數(shù)調(diào)用是本地的,或者它是一個緩慢的RPC時,問題就出現(xiàn)了。這樣的混亂導(dǎo)致了不可預(yù)知的系統(tǒng),并給調(diào)試增加了不必要的復(fù)雜性。而簡化軟件,濫用會導(dǎo)致難以維護的RPC代碼。
考慮到這一點,請考慮以下建議:
確保很明顯哪個函數(shù)調(diào)用是本地調(diào)用,并且它是遠程的。
記錄系統(tǒng)。使組件之間的依賴關(guān)系清晰。
處理錯誤案例。RPC服務(wù)器長時間處于下行狀態(tài)時,客戶端應(yīng)如何響應(yīng)?
有疑問時避免RPC。如果可以,則應(yīng)該使用異步管道,而不是像阻塞這樣的RPC,結(jié)果被異步推送到下一個計算階段。
一般在RabbitMQ做RPC是容易的。客戶端發(fā)送一條請求消息和一個響應(yīng)消息的服務(wù)器回復(fù)。為了接收響應(yīng),我們需要向請求發(fā)送一個“回調(diào)”隊列地址。我們可以使用默認隊列。讓我們試試看:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array("reply_to" => $queue_name)); $channel->basic_publish($msg, "", "rpc_queue"); # ... then code to read a response message from the callback_queue ...
消息屬性Correlation Id
AMQP協(xié)議(0-9-1 protocol)預(yù)定義了一套14個屬性,去一個消息。大多數(shù)屬性很少使用,除了以下內(nèi)容:delivery_mode: 將消息標(biāo)記為持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。
content_type:用來描述編碼的MIME類型。例如,對于常用的JSON編碼,將此屬性設(shè)置為應(yīng)用程序/ JSON是一個很好的做法。
reply_to:常用的名字一個回調(diào)隊列。
correlation_id:有助于將RPC響應(yīng)與請求關(guān)聯(lián)起來。
在上面介紹的方法中,我們建議為每個RPC請求創(chuàng)建一個回調(diào)隊列。這是非常低效的,但幸運的是有一個更好的方法——讓我們?yōu)槊總€客戶機創(chuàng)建一個回調(diào)隊列。
這引發(fā)了一個新問題,在隊列中收到了響應(yīng),不清楚響應(yīng)的請求屬于哪個。那時候correlation_id屬性用于。我們將把它設(shè)置為每個請求的唯一值。稍后,當(dāng)我們在回調(diào)隊列中接收消息時,我們將查看這個屬性,并在此基礎(chǔ)上,我們將能夠?qū)㈨憫?yīng)與請求匹配。如果我們看到一個未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請求。
您可能會問,為什么我們應(yīng)該忽略回調(diào)隊列中的未知消息,而不是失敗出錯呢?這是由于服務(wù)器端可能出現(xiàn)競爭情況。雖然不太可能,RPC服務(wù)器可能在發(fā)送完答案后死亡,但在發(fā)出請求的確認消息之前。如果發(fā)生這種情況,重新啟動的RPC服務(wù)器將再次處理請求。這就是為什么在客戶機上我們必須優(yōu)雅地處理重復(fù)響應(yīng),而RPC應(yīng)該理想地是冪等的。
總結(jié)我們的RPC會像這樣工作:
當(dāng)客戶端啟動時,它創(chuàng)建一個匿名的獨占回調(diào)隊列。
一個RPC請求,客戶端發(fā)送消息,兩個屬性:reply_to,設(shè)置回調(diào)隊列和correlation_id,它被設(shè)置為每個請求的唯一值。
請求被發(fā)送到一個rpc_queue隊列。
RPC worker(又名:服務(wù)器)正在等待該隊列上的請求。當(dāng)一個請求時,它的工作和發(fā)送消息的結(jié)果返回給客戶端,使用從reply_to隊列。
客戶機等待回調(diào)隊列上的數(shù)據(jù)。當(dāng)消息出現(xiàn)時,它檢查correlation_id屬性。如果它與請求的值匹配,則返回對應(yīng)用程序的響應(yīng)。
匯總Fibonacci 遞歸源碼:
function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } `` 我們聲明fibonacci(斐波那契)函數(shù)。它只假設(shè)有效的正整數(shù)輸入。(不要指望這一個能為大數(shù)字工作,而且這可能是最慢的遞歸實現(xiàn))。 我們的RPC服務(wù)器rpc_server.php代碼看起來像這樣:
require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");
$channel = $connection->channel();
$channel->queue_declare("rpc_queue", false, false, false, false);
function fib($n) {
if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requestsn";
$callback = function($req) {
$n = intval($req->body); echo " [.] fib(", $n, ") "; $msg = new AMQPMessage( (string) fib($n), array("correlation_id" => $req->get("correlation_id")) ); $req->delivery_info["channel"]->basic_publish( $msg, "", $req->get("reply_to")); $req->delivery_info["channel"]->basic_ack( $req->delivery_info["delivery_tag"]);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume("rpc_queue", "", false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
服務(wù)器代碼相當(dāng)簡單: 像往常一樣,我們從建立連接、通道和聲明隊列開始。 我們可能需要運行多個服務(wù)器進程。為了分散負載同樣多的服務(wù)器需要設(shè)置`prefetch_count`, 設(shè)置`$channel.basic_qos`美元。 我們用`basic_consume`訪問隊列。然后,我們進入while循環(huán),在其中等待請求消息,完成工作并發(fā)送響應(yīng)。 我們rpc_client.php RPC客戶端代碼:
require_once DIR . "/vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
class FibonacciRpcClient {
private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( "localhost", 5672, "guest", "guest"); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, "", false, false, false, false, array($this, "on_response")); } public function on_response($rep) { if($rep->get("correlation_id") == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array("correlation_id" => $this->corr_id, "reply_to" => $this->callback_queue) ); $this->channel->basic_publish($msg, "", "rpc_queue"); while(!$this->response) { $this->channel->wait(); } return intval($this->response); }
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";
?>
現(xiàn)在是一個很好的時間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。 我們的RPC服務(wù)現(xiàn)在準(zhǔn)備好了。我們可以啟動服務(wù)器:
php rpc_server.php
# => [x] Awaiting RPC requests
請求斐波那契數(shù)運行客戶機:
php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設(shè)計并不是RPC服務(wù)的唯一實現(xiàn),但它有一些重要的要點:
如果RPC服務(wù)器太慢,您可以通過運行另一個服務(wù)器來擴展。試著在一個新的控制臺再運行第一個:rpc_server.php。
在客戶端,RPC只需要發(fā)送和接收一條消息。不喜歡queue_declare需要同步調(diào)用。因此,RPC客戶機只需要一次RPC請求的一次網(wǎng)絡(luò)往返。
我們的代碼仍然非常簡單,并沒有試圖解決更復(fù)雜(但重要)的問題,例如:
如果沒有服務(wù)器運行,客戶端應(yīng)該如何反應(yīng)?
客戶端應(yīng)該對RPC有某種超時嗎?
如果服務(wù)器發(fā)生故障并引發(fā)異常,是否應(yīng)該轉(zhuǎn)發(fā)給客戶端?
在處理前防止無效傳入消息(如檢查邊界、類型)。
如果您想進行實驗,您可能會發(fā)現(xiàn)management UI對于查看隊列非常有用。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/28314.html
摘要:前提必讀本教程假設(shè)是安裝在標(biāo)準(zhǔn)端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設(shè)RabbitMQ是安裝在標(biāo)準(zhǔn)端口上運行(5672)。如果您使用不同的主機、端口或憑據(jù),則連接設(shè)置需要調(diào)整。 在哪里得到幫助...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請求,并阻塞知道結(jié)果返回。當(dāng)有消息時,進行計算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個worker之間派發(fā)時間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..
摘要:通常用于命名回調(diào)隊列。對每個響應(yīng)執(zhí)行的回調(diào)函數(shù)做了一個非常簡單的工作,對于每個響應(yīng)消息它檢查是否是我們正在尋找的。在這個方法中,首先我們生成一個唯一的數(shù)并保存回調(diào)函數(shù)將使用這個值來捕獲適當(dāng)?shù)捻憫?yīng)。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 遠程過程調(diào)用(RPC) (using the Pika Python client) 本章節(jié)教程...
摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個郵局:當(dāng)你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
閱讀 2011·2023-04-26 01:44
閱讀 1339·2021-11-12 10:34
閱讀 1693·2021-09-09 09:33
閱讀 1814·2019-08-30 15:44
閱讀 2958·2019-08-30 13:49
閱讀 2266·2019-08-29 15:26
閱讀 1001·2019-08-26 13:30
閱讀 1483·2019-08-23 18:15