成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

MySql實現(xiàn)事務型消息隊列以及php多進程消費設計

Cristic / 1531人閱讀

摘要:因公司業(yè)務需要,最近在設計一個通用隊列功能模塊,主體要求兩大點用實現(xiàn)事務型消息隊列當然,主流的隊列服務可使用或者等,此處討論的是實現(xiàn)多進程消費隊列消息用實現(xiàn)事務型消息隊列消息隊列的作用有異步化解耦和消除峰值等。

因公司業(yè)務需要,最近在設計一個通用隊列功能模塊,主體要求兩大點:

用MySql實現(xiàn)事務型消息隊列(當然,主流的隊列服務可使用redis或者rabbitmq等,此處討論的是mysql實現(xiàn))

php多進程消費隊列消息

用MySql實現(xiàn)事務型消息隊列

消息隊列的作用有:異步化、解耦和消除峰值等。目前異步化對于我來說使用最頻繁,在很多業(yè)務場景下,我們可以將實時性要求較低的請求轉為異步處理,減小系統(tǒng)負載壓力,提高系統(tǒng)穩(wěn)定性。在離線數(shù)據異步處理過程中,消息隊列要滿足以下要求:

消息不能丟失,即使在系統(tǒng)失敗的情況下。消息一旦被插入就一定會被至少處理一次(只被處理一次是最好的,但是實現(xiàn)起來有難度,所以只要求at-least-once semantic);

FIFO順序。(mysql id自增可滿足此特性。當然,可以設計特殊參數(shù)做特殊處理)

支持多生產者(mysql支持并發(fā)操作,支持此特點)

支持多消費者。每個消息只能被其中一個消費者處理(業(yè)務的處理需要考慮冪等性)。

以上是隊列實現(xiàn)的說明,具體用MySql實現(xiàn)事務型消息隊列可以參考文章
https://spockwangs.github.io/...

此次設計的表結構如下:

CREATE TABLE `comom_queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT "自增id",
  `type` tinyint(4) NOT NULL DEFAULT "0" COMMENT "隊列類型,代碼業(yè)務備注",
  `conn_id` int(11) NOT NULL DEFAULT "0" COMMENT "消費者標識",
  `param_content` text COMMENT "隊列入參",
  `callback` varchar(255) NOT NULL DEFAULT "" COMMENT "隊列消費回調函數(shù)",
  `status` tinyint(2) NOT NULL DEFAULT "0" COMMENT "0新建 1消費中 2成功 3失敗 4需重試",
  `create_time` int(11) NOT NULL DEFAULT "0" COMMENT "創(chuàng)建時間",
  `update_time` int(11) NOT NULL DEFAULT "0" COMMENT "狀態(tài)變更時間",
  `preexec_time` int(11) NOT NULL DEFAULT "0" COMMENT "預消費時間",
  `p_key` varchar(100) NOT NULL DEFAULT "" COMMENT "業(yè)務唯一標識key,查詢用",
  `mark` varchar(255) NOT NULL DEFAULT "" COMMENT "備注",
  PRIMARY KEY (`id`),
  KEY `indx_s` (`p_key`,`type`) USING BTREE,
  KEY `indx_exec` (`conn_id`,`status`) USING BTREE,
  KEY `indx_ty` (`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

說明下幾個字段的設計:

callback 隊列中不同的業(yè)務消息有不同的業(yè)務處理,利用callback值回調對應的業(yè)務方法

type 隊列業(yè)務類型,區(qū)分不同的業(yè)務,可用不同的消費者分開消費。在FIFO的特點外,可多帶帶開消費者對有特殊要求(消息優(yōu)先級高)的業(yè)務消息進行消費

preexec_time 預消費時間,有的業(yè)務消息有消費時間要求,可設置出隊列時間

php多進程消費設計

此次php多進程的實現(xiàn)依賴pcntl,posix擴展,讀者可自行檢查是否安裝了此拓展。queue隊列服務設計和實現(xiàn)包括以下功能點:

主進程和子進程的運行時間可配

主進程(master進程)創(chuàng)建和監(jiān)聽子進程行為

創(chuàng)建定時器信號,主進程(master進程)定時監(jiān)聽隊列信息,可用于消息堆積通知等

子進程(worker進程)消費消息

針對不同的業(yè)務消息可配置不同數(shù)量的子進程

各個業(yè)務子進程數(shù)可配置正常拉起數(shù)和最大進程數(shù),根據隊列積壓情況,子進程動態(tài)啟動進程數(shù)(暫未實現(xiàn),后續(xù)添加)

不多說了,直接看代碼,抽離出來的queue服務類代碼如下:

 "process_num"]
    protected $child   = []; // 子進程pid數(shù)組
    protected $result  = []; // 計算的結果
    protected $overTime = 0; //主進程超時時間
    protected $startTime; //主進程運行時間
    protected $childOverTime = 3600; //子進程超時時間
    protected $alarm_time = 2;
    public function __construct($process = [], $overTime = 0, $childOverTime = 3600)
    {
        if (!function_exists("pcntl_fork")) {
            die("pcntl_fork not existing");
        }
        $this->process  = $process;
        $this->overTime = $overTime;
        $this->childOverTime = $childOverTime;
        $this->startTime = time();
    }
    /**
     * 設置子進程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }

    /**
     * 設置檢測時間間隔 單位s
     */
    public function setAlarmTime($time){
        $this->alarm_time = $time;
    }

    /**
     * fork 子進程
     */
    protected function forkProcess()
    {
        //循環(huán)創(chuàng)建每個type 的消費子進程
        $process  = $this->process;
        foreach($process as $key => $num) {
            for ($i = 0; $i < $num; $i++){
                $this->forkOneProcess($key);
            }
        }
        return $this;
    }

    /**
     * 創(chuàng)建子進程操作
     * @param $key
     * @return $this
     */
    private function forkOneProcess($key)
    {
        $pid = pcntl_fork();
        if ($pid == 0) {
            $id = getmypid();
            $this->processDo($id, $key);
            exit(0);
        } else if ($pid > 0) {
            //記錄子進程信息
            $childProcess = array(
                "pid" => $pid,
                "type" => $key,
                "create_time" => time()
            );
            $this->child[$pid] = $childProcess;
        }
        return $this;
    }

    /**
     * 子進程做的事情,消費者
     */
    abstract protected function processDo($id, $key);

    /**
     * 隊列數(shù)量檢測
     */
    abstract protected function checkQueueNum();

    /**
     * 等待子進程結束
     */
    protected function waiteProcess()
    {
        while(count($this->child)) {
            foreach($this->child as $pid => $item){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                pcntl_signal_dispatch();
                if ( -1 == $res || $res > 0 ) {
                    unset($this->child[$pid]);
                    echo "pid $pid 退出", PHP_EOL;
                    //判斷主進程是否超時 未超時拉起新的子進程
                    $leftTime = time() - $this->startTime;
                    if ($this->overTime > $leftTime){
                        $this->forkOneProcess($item["type"]);
                        echo "創(chuàng)建新進程", PHP_EOL;
                    }
                }//判斷子進程是否存在且超時,超過時限20分鐘則強制退出
                elseif (posix_kill($pid, 0) && (time() - $item["create_time"] - 20*60) > $this->childOverTime){
                    posix_kill($pid, SIGUSR1);
                    echo "pid $pid 退出2", PHP_EOL;
                }
            }
        }

        return $this;
    }

    /**
     * 隊列檢測
     */
    protected function timeHandler(){
        $this->checkQueueNum();
        pcntl_alarm($this->alarm_time);
    }

    /**
     * 啟動
     */
    public function runProcess() {
        //注冊信號
        pcntl_signal(SIGALRM, array($this, "timeHandler"));
        pcntl_alarm($this->alarm_time);
        $leftTime = time() - $this->startTime;
        while(($this->overTime ==0 || $this->overTime > $leftTime)){
            echo "新進程processlist", PHP_EOL;
            $this->forkProcess()->waiteProcess();
            $leftTime = time() - $this->startTime;
        }
    }
}

最后一個功能點:各個業(yè)務子進程數(shù)可配置正常拉起數(shù)和最大進程數(shù),根據隊列積壓情況,子進程動態(tài)啟動進程數(shù) 暫未實現(xiàn)。目前的queue服務設計如上,請各位看官多多指教!

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://m.hztianpu.com/yun/29768.html

相關文章

發(fā)表評論

0條評論

閱讀需要支付1元查看
<