摘要:消息隊列,用于存儲還未被消費者消費的消息。由在與時指定,而由發(fā)送時指定,兩者的匹配方式由決定。需要為每一個創(chuàng)建,協(xié)議規(guī)定只有通過才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。
安裝
rabbitmq 在 mac 下可以直接用 brew 安裝
默認安裝在 /usr/local/Cellar/下
命令被軟連接加入到了/usr/local/sbin 下,因此可以把此目錄放到環(huán)境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啟服務
端口5672 默認
端口15672 web 端登錄管理端口127.0.0.1:15672
rabbitmq 默認提供的用戶 guest 密碼 guest
停止服務
rabbitmqctl stop
開啟應用 [服務依舊運行]
rabbitmqctl start_app
停止應用 [服務依舊運行]
rabbitmqctl stop_app
添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登錄
sudo rabbitmqctl clear_password
列出所有用戶
sudo rabbitmqctl list_users
設置用戶角色
rabbitmqctl set_user_tags username tag
virtual host只是起到一個命名空間的作用,所以可以多個user共同使用一個virtual host,"/"這個是系統(tǒng)默認的vhost,就是說當我們創(chuàng)建一個到rabbitmq的connection時候,它的命名空間是"/",需要注意的是不同的命名空間之間的資源是不能訪問的,比如 exchang,queue ,bingding等
創(chuàng)建虛擬主機
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機
sudo rabbitmqctl delete_vhost vhostpath
列出所有虛擬主機
sudo rabbitmqctl list_vhosts
列出某個 vhost 的所有用戶和權(quán)限
list_permissions [-p vhostpath]
列出某個用戶的所有權(quán)限。
list_user_permissions {username}
清除用戶對某個 vhost 的權(quán)限。
clear_permissions [-p vhostpath] {username}
設置用戶對某個 virtual host 的權(quán)限,如果不指定 vhost,則默認為“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"
添加一個管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配給用戶指定虛擬主機的權(quán)限,雖然是administrator角色,但不對所有虛擬主機都有權(quán)限,一樣需要對每個虛擬主機都授權(quán)
顯示信息
rabbitmqctl list_queues [-p
列出某個 vhost 的所有 queue。
rabbitmqctl list_exchanges [-p
列出某個 vhost 的所有 exchange。
rabbitmqctl list_bindings [-p
列出某個 vhost 的所有 binding。
rabbitmqctl list_connections [
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p
列出某個 vhost 的所有 consumer。
1.Server(broker): 接受客戶端連接,實現(xiàn)AMQP消息隊列和路由功能的進程。
2.Virtual Host:其實是一個虛擬概念,類似于權(quán)限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權(quán)限控制的最小粒度是Virtual Host
3.Exchange:接受生產(chǎn)者發(fā)送的消息,并根據(jù)Binding規(guī)則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。
4.Message Queue:消息隊列,用于存儲還未被消費者消費的消息。
5.Message: 由Header和Body組成,Header是由生產(chǎn)者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優(yōu)先級是多少等。而Body是真正需要傳輸?shù)腁PP數(shù)據(jù)。
6.Binding:Binding聯(lián)系了Exchange與Message Queue。Exchange在與多個Message Queue發(fā)生Binding后會生成一張路由表,路由表中存儲著Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發(fā)送Message時指定,兩者的匹配方式由Exchange Type決定。
7.Connection:連接,對于RabbitMQ而言,其實就是一個位于客戶端和Broker之間的TCP連接。
8.Channel:信道,僅僅創(chuàng)建了客戶端到Broker之間的連接后,客戶端還是不能發(fā)送消息的。需要為每一個Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統(tǒng)也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的,但是建議盡量共用Connection。
9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現(xiàn)自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發(fā)送消息,txSelect開啟一個事務,txCommit提交一個事務。
客戶端管理php端 rabbitmq 客戶端可以使用 composer 下的庫
{ "require": { "php-amqplib/php-amqplib": "2.5.*" } }
使用時,用到了這兩個東西
use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage;
開始發(fā)送消息
public function handle() { //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個 channel $channel = $connection->channel(); //聲明 hello 隊列 $channel->queue_declare("hello", false, false, false, false); //創(chuàng)建一個消息 $msg = new AMQPMessage(time()); //把消息推送到默認的交換機中,并且告訴交換機要把消息交給 hello 隊列 $channel->basic_publish($msg, "", "hello"); echo " [x] Sent ".time()." "; }
重要概念,消息是保存在交換機中的,當消息存放時指定的隊列存在,交換機會把消息推送到該隊列
消息隊列發(fā)送消息給消費者,一個消息發(fā)給一個消費者
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個 channel $channel = $connection->channel(); //可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在 $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg) { echo " [x] Received ", $msg->body, " "; sleep($msg->body); $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; //默認情況下,隊列會把消息公平的分配給各個消費者 //如果某個消費者腳本處理完成分配給他的消息任務后,會一直空閑 //另外一個消費者腳本處理的消息都非常耗時,這就容易導致消費者腳本得不到合理利用, //加入此句話,是告訴隊列,取消把消息公平分配到各個腳本,而是那個腳本空閑,就交給它一個消息任務 //這樣,合理利用到每一個空閑的消費者腳本 $channel->basic_qos(null, 1, null); /** * basic_consume 方法 從隊列中讀取數(shù)據(jù) * @param string $queue 指定隊列 * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack 消費者處理完消息后,是否不需要告訴隊列已經(jīng)處理完成,true 不需要 false 需要, * true 默認情況下,隊列會把消息公平分配到各個消費者中,然后一次性把消息交給消費者,如果消費者處理了一半掛了,那么消息就丟失了 * false 默認情況下,隊列會把消息公平的分配給各個消費者,然后一個一個的把消息分配到消費者腳本中,腳本處理完成后,告訴隊列,隊列會刪除這個消息,并且接著給下一個消息, 當腳本掛掉,不會丟失消息,隊列會把未完成的消息分配給其他消費者 在 callback 函數(shù)中需要加入這句話,處理完后通知隊列可以刪除消息了 $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); 未加入這句話,隊列不會刪除已處理完的消息,當腳本掛掉時,會把分配給當前隊列的所有消息再次重新分配給其他隊列,會導致消息會重復處理 */ $channel->basic_consume("hello", "", false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }發(fā)布/訂閱
一個消息發(fā)送給多個消費者
扇形交換機 fanout
發(fā)布訂閱模式,科院實現(xiàn)一個消息發(fā)送到多個隊列中
在發(fā)布消息腳本中,創(chuàng)建一個扇形交換機,把消息推送到交換機,不需要推動到指定的隊列中,隊列在消費者腳本中創(chuàng)建
消費腳本定義個臨時隊列,并綁定這個臨時隊列到交換機中,扇形交換機會把接收到的消息推動到每一個綁定的隊列中
生產(chǎn)者腳本
public function handle() { //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機... $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個 channel $channel = $connection->channel(); //聲明 log 隊列 //$channel->queue_declare("log", false, false, false, false); //創(chuàng)建一個fanout類型交換機 $channel->exchange_declare("logs","fanout",false,false,false); //創(chuàng)建一個消息 $msg = new AMQPMessage( time() ); $channel->basic_publish ( $msg , "logs" ); echo " [x] Sent ".time()." "; $channel->close(); $connection->close(); }
消費者腳本
public function handle() { //連接 $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host"); //創(chuàng)建一個 channel $channel = $connection->channel(); //可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在 //$channel->queue_declare("hello", false, false, false, false); //創(chuàng)建一個fanout類型交換機 $channel->exchange_declare("logs", "fanout", false, false, false); //系統(tǒng)創(chuàng)建一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上 $channel->queue_bind($queue_name, "logs"); $callback = function($msg){ echo " [x] ", $msg->body, " "; }; $channel->basic_consume($queue_name, "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
直連交換機 direct
交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發(fā)到哪個隊列
生產(chǎn)者,創(chuàng)建基本上和扇形交換機一樣,不同的是
$channel->exchange_declare("direct_logs","direct",false,false,false); //創(chuàng)建一個消息 $msg = new AMQPMessage( time() ); //把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據(jù) key 接收消息 $channel->basic_publish ( $msg , "direct_logs", "warning" );
消費者
$channel->exchange_declare("direct_logs","direct",false,false,false); //系統(tǒng)創(chuàng)建一個臨時隊列 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); //綁定臨時隊列到交換機上,并指定消息的路由 key $channel->queue_bind($queue_name, "direct_logs", "error"); $channel->queue_bind($queue_name, "direct_logs", "warning");
主題交換機 topic
直連交換機中路由 key 匹配模式
(星號*) 用來表示一個單詞.
(井號#) 用來表示任意數(shù)量(零個或多個)單詞。
Q1會接收到 a.orange.b 等key 值中間為 orange 的消息
Q2會接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現(xiàn)的時候,此時主題交換機就擁有的直連交換機的行為。
生產(chǎn)者,指定路由 key
$channel->exchange_declare("topic_logs","topic",false,false,false); //創(chuàng)建一個消息 $msg = new AMQPMessage( time() ); //把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據(jù) key 接收消息 $channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );
消費者1
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "baidu.#");
消費者2
$channel->exchange_declare("topic_logs","topic",false,false,false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, "topic_logs", "ali.#");參考
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/21527.html
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:參考文檔依賴包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執(zhí)行看是否能打開,用退出,注意后面的點號,那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...
摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...
閱讀 1455·2019-08-30 12:54
閱讀 1937·2019-08-30 11:16
閱讀 1670·2019-08-30 10:50
閱讀 2549·2019-08-29 16:17
閱讀 1344·2019-08-26 12:17
閱讀 1436·2019-08-26 10:15
閱讀 2451·2019-08-23 18:38
閱讀 842·2019-08-23 17:50