成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

PHP RabbitMQ

anRui / 3427人閱讀

摘要:消息隊列,用于存儲還未被消費者消費的消息。由在與時指定,而由發(fā)送時指定,兩者的匹配方式由決定。需要為每一個創(chuàng)建,協(xié)議規(guī)定只有通過才能執(zhí)行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發(fā)送消息必須是串行的,但是建議盡量共用。

安裝

rabbitmq 在 mac 下可以直接用 brew 安裝
默認安裝在 /usr/local/Cellar/下
命令被軟連接加入到了/usr/local/sbin 下,因此可以把此目錄放到環(huán)境變量中,建議加入到~/.bash_profile 中
rabbitmq-server start 開啟服務
端口5672 默認
端口15672 web 端登錄管理端口127.0.0.1:15672
rabbitmq 默認提供的用戶 guest 密碼 guest

管理

停止服務
rabbitmqctl stop
開啟應用 [服務依舊運行]
rabbitmqctl start_app
停止應用 [服務依舊運行]
rabbitmqctl stop_app

用戶管理

添加用戶
sudo rabbitmqctl add_user username password
刪除用戶
sudo rabbitmqctl delete_user username
修改密碼
sudo rabbitmqctl change_password username newpassword
清除用戶密碼,禁止用戶登錄
sudo rabbitmqctl clear_password
列出所有用戶
sudo rabbitmqctl list_users
設置用戶角色
rabbitmqctl set_user_tags username tag

vhost虛擬主機管理

virtual host只是起到一個命名空間的作用,所以可以多個user共同使用一個virtual host,"/"這個是系統(tǒng)默認的vhost,就是說當我們創(chuàng)建一個到rabbitmq的connection時候,它的命名空間是"/",需要注意的是不同的命名空間之間的資源是不能訪問的,比如 exchang,queue ,bingding等
創(chuàng)建虛擬主機
sudo rabbitmqctl add_vhost vhostpath
刪除虛擬主機
sudo rabbitmqctl delete_vhost vhostpath
列出所有虛擬主機
sudo rabbitmqctl list_vhosts
列出某個 vhost 的所有用戶和權(quán)限
list_permissions [-p vhostpath]
列出某個用戶的所有權(quán)限。
list_user_permissions {username}
清除用戶對某個 vhost 的權(quán)限。
clear_permissions [-p vhostpath] {username}
設置用戶對某個 virtual host 的權(quán)限,如果不指定 vhost,則默認為“/” vhost。
set_permissions [-p vhostpath] {user}
rabbitmqctl set_permissions -p test_host kang “." "." ".*"

添加一個管理員代替 guest
rabbitmqctl add_user admin 123456
指定用戶的角色
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin "." "." ".*”
分配給用戶指定虛擬主機的權(quán)限,雖然是administrator角色,但不對所有虛擬主機都有權(quán)限,一樣需要對每個虛擬主機都授權(quán)

顯示信息
rabbitmqctl list_queues [-p ] [ ...]
列出某個 vhost 的所有 queue。
rabbitmqctl list_exchanges [-p ] [ ...]
列出某個 vhost 的所有 exchange。
rabbitmqctl list_bindings [-p ] [ ...]
列出某個 vhost 的所有 binding。
rabbitmqctl list_connections [ ...]
列出 RabbitMQ broker 的所有 connection。
rabbitmqctl list_channels [ ...]
列出 RabbitMQ broker 的所有 channel
rabbitmqcrl list_consumers [-p ]
列出某個 vhost 的所有 consumer。

基本概念點

1.Server(broker): 接受客戶端連接,實現(xiàn)AMQP消息隊列和路由功能的進程。

2.Virtual Host:其實是一個虛擬概念,類似于權(quán)限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,但是權(quán)限控制的最小粒度是Virtual Host

3.Exchange:接受生產(chǎn)者發(fā)送的消息,并根據(jù)Binding規(guī)則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行為,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不同類型的Exchange路由的行為是不一樣的。

4.Message Queue:消息隊列,用于存儲還未被消費者消費的消息。

5.Message: 由Header和Body組成,Header是由生產(chǎn)者添加的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優(yōu)先級是多少等。而Body是真正需要傳輸?shù)腁PP數(shù)據(jù)。

6.Binding:Binding聯(lián)系了Exchange與Message Queue。Exchange在與多個Message Queue發(fā)生Binding后會生成一張路由表,路由表中存儲著Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發(fā)送Message時指定,兩者的匹配方式由Exchange Type決定。

7.Connection:連接,對于RabbitMQ而言,其實就是一個位于客戶端和Broker之間的TCP連接。

8.Channel:信道,僅僅創(chuàng)建了客戶端到Broker之間的連接后,客戶端還是不能發(fā)送消息的。需要為每一個Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令。一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連接的建立和釋放都是十分昂貴的,如果一個客戶端每一個線程都需要與Broker交互,如果每一個線程都建立一個TCP連接,暫且不考慮TCP連接是否浪費,就算操作系統(tǒng)也無法承受每秒建立如此多的TCP連接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的,但是建議盡量共用Connection。

9.Command:AMQP的命令,客戶端通過Command完成與AMQP服務器的交互來實現(xiàn)自身的邏輯。例如在RabbitMQ中,客戶端可以通過publish命令發(fā)送消息,txSelect開啟一個事務,txCommit提交一個事務。

客戶端管理

php端 rabbitmq 客戶端可以使用 composer 下的庫

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

使用時,用到了這兩個東西

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

開始發(fā)送消息

public function handle()
{
    //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個 channel
    $channel = $connection->channel();
    //聲明 hello 隊列
    $channel->queue_declare("hello", false, false, false, false);
    //創(chuàng)建一個消息
    $msg = new AMQPMessage(time());
    //把消息推送到默認的交換機中,并且告訴交換機要把消息交給 hello 隊列
    $channel->basic_publish($msg, "", "hello");
    echo " [x] Sent ".time()."
";
}

重要概念,消息是保存在交換機中的,當消息存放時指定的隊列存在,交換機會把消息推送到該隊列

消息隊列發(fā)送消息給消費者,一個消息發(fā)給一個消費者

public function handle()
{
    //連接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個 channel
    $channel = $connection->channel();
    //可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在
    $channel->queue_declare("hello", false, false, false, false);

    echo " [*] Waiting for messages. To exit press CTRL+C", "
";

    $callback = function($msg) {
        echo " [x] Received ", $msg->body, "
";
        sleep($msg->body);
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
    };

    //默認情況下,隊列會把消息公平的分配給各個消費者
    //如果某個消費者腳本處理完成分配給他的消息任務后,會一直空閑
    //另外一個消費者腳本處理的消息都非常耗時,這就容易導致消費者腳本得不到合理利用,
    //加入此句話,是告訴隊列,取消把消息公平分配到各個腳本,而是那個腳本空閑,就交給它一個消息任務
    //這樣,合理利用到每一個空閑的消費者腳本
    $channel->basic_qos(null, 1, null);

    /**
     * basic_consume 方法    從隊列中讀取數(shù)據(jù)
     * @param string $queue 指定隊列
     * @param string $consumer_tag
     * @param bool $no_local
     * @param bool $no_ack    消費者處理完消息后,是否不需要告訴隊列已經(jīng)處理完成,true 不需要 false 需要,
     * true
        默認情況下,隊列會把消息公平分配到各個消費者中,然后一次性把消息交給消費者,如果消費者處理了一半掛了,那么消息就丟失了
     * false
        默認情況下,隊列會把消息公平的分配給各個消費者,然后一個一個的把消息分配到消費者腳本中,腳本處理完成后,告訴隊列,隊列會刪除這個消息,并且接著給下一個消息,
        當腳本掛掉,不會丟失消息,隊列會把未完成的消息分配給其他消費者
        在 callback 函數(shù)中需要加入這句話,處理完后通知隊列可以刪除消息了
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
        未加入這句話,隊列不會刪除已處理完的消息,當腳本掛掉時,會把分配給當前隊列的所有消息再次重新分配給其他隊列,會導致消息會重復處理
     */
    $channel->basic_consume("hello", "", false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}
發(fā)布/訂閱

一個消息發(fā)送給多個消費者

扇形交換機 fanout
發(fā)布訂閱模式,科院實現(xiàn)一個消息發(fā)送到多個隊列中
在發(fā)布消息腳本中,創(chuàng)建一個扇形交換機,把消息推送到交換機,不需要推動到指定的隊列中,隊列在消費者腳本中創(chuàng)建
消費腳本定義個臨時隊列,并綁定這個臨時隊列到交換機中,扇形交換機會把接收到的消息推動到每一個綁定的隊列中

生產(chǎn)者腳本

public function handle()
{
    //連接到 test_host 虛擬主機,每個虛擬主機有自己的隊列,交換機...
    $connection = new AMQPStreamConnection("127.0.0.1", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個 channel
    $channel = $connection->channel();
    //聲明 log 隊列
    //$channel->queue_declare("log", false, false, false, false);
    //創(chuàng)建一個fanout類型交換機
    $channel->exchange_declare("logs","fanout",false,false,false);

    //創(chuàng)建一個消息
    $msg = new AMQPMessage( time() );
    $channel->basic_publish ( $msg , "logs" );

    echo " [x] Sent ".time()."
";

    $channel->close();
    $connection->close();
}

消費者腳本

public function handle()
{
    //連接
    $connection = new AMQPStreamConnection("localhost", 5672, "kang", "a943434603", "test_host");
    //創(chuàng)建一個 channel
    $channel = $connection->channel();
    //可以運行這個命令很多次,但是只有一個隊列會被創(chuàng)建, 在程序中重復將隊列重復聲明一下是種值得推薦的做法,保證隊列存在
    //$channel->queue_declare("hello", false, false, false, false);
    //創(chuàng)建一個fanout類型交換機
    $channel->exchange_declare("logs", "fanout", false, false, false);

    //系統(tǒng)創(chuàng)建一個臨時隊列
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //綁定臨時隊列到交換機上
    $channel->queue_bind($queue_name, "logs");

    $callback = function($msg){
        echo " [x] ", $msg->body, "
";
    };
    $channel->basic_consume($queue_name, "", false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
}

直連交換機 direct
交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定消息該分發(fā)到哪個隊列

生產(chǎn)者,創(chuàng)建基本上和扇形交換機一樣,不同的是

$channel->exchange_declare("direct_logs","direct",false,false,false);
//創(chuàng)建一個消息
$msg = new AMQPMessage( time() );
//把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據(jù) key 接收消息
$channel->basic_publish ( $msg , "direct_logs", "warning" );

消費者

$channel->exchange_declare("direct_logs","direct",false,false,false);
//系統(tǒng)創(chuàng)建一個臨時隊列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//綁定臨時隊列到交換機上,并指定消息的路由 key
$channel->queue_bind($queue_name, "direct_logs", "error");
$channel->queue_bind($queue_name, "direct_logs", "warning");

主題交換機 topic

直連交換機中路由 key 匹配模式
(星號*) 用來表示一個單詞.
(井號#) 用來表示任意數(shù)量(零個或多個)單詞。

Q1會接收到 a.orange.b 等key 值中間為 orange 的消息
Q2會接收到 a.b.rabbit, lazy.a, lazy.a.b.c 等消息

當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現(xiàn)的時候,此時主題交換機就擁有的直連交換機的行為。

生產(chǎn)者,指定路由 key

$channel->exchange_declare("topic_logs","topic",false,false,false);
//創(chuàng)建一個消息
$msg = new AMQPMessage( time() );
//把消息推動到direct_logs交換機,并給消息加上路由 key,讓消費者隊列來根據(jù) key 接收消息
$channel->basic_publish ( $msg , "topic_logs", "baidu.warning" );

消費者1

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "baidu.#");

消費者2

$channel->exchange_declare("topic_logs","topic",false,false,false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, "topic_logs", "ali.#");
參考

https://www.rabbitmq.com/tutorials/tutorial-one-php.html
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/21527.html

相關文章

  • RabbitMQ】——centos7安裝rabbitmq教程 以及 PHP開啟rabbitmq擴展

    摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    lscho 評論0 收藏0
  • RabbitMQ】——centos7安裝rabbitmq教程 以及 PHP開啟rabbitmq擴展

    摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    printempw 評論0 收藏0
  • RabbitMQ+PHP 消息隊列環(huán)境配置

    摘要:參考文檔依賴包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執(zhí)行看是否能打開,用退出,注意后面的點號,那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...

    geekidentity 評論0 收藏0
  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當作一個郵局:當你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0

發(fā)表評論

0條評論

anRui

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<