摘要:后面每開啟一個子進程,會將子進程的存儲到中,用來后面主進程監(jiān)控子進程,如果子進程意外終止,主進程可以重新佛。將當前子進程設(shè)置為會話組再次創(chuàng)建子進程,為了防止在的系統(tǒng)下重新打開控制終端。
wokerman 啟動分析
@(學(xué)習(xí))[workerman, php]
前期想說的也是最近才看的代碼,遇到不懂得地方就去google,所以這篇文章里面穿插了很多參考資料,可以直接點擊閱覽。
需要了解一些知識pcntl、posix、libevent,然后我們從服務(wù)的啟動開始來看。
啟動runAll顧名思義,運行所有的,注釋中也寫了,Run all worker instances,運行所有的實例,也就是說腳本中可以同時new多個worker服務(wù),這也是后面一個重要的$workers包含了每一個$worker都是一個服務(wù)實例。然后會根據(jù)每一個實例初始化count個子進程。
/** * Run all worker instances. * * @return void */ public static function runAll() { // 判斷是否命令行模式 self::checkSapiEnv(); // 創(chuàng)建目錄、設(shè)置權(quán)限、綁定時鐘信號腳本 self::init(); // 解析cli的命令,完成start、reload、restart、kill、stop等 self::parseCommand(); // 是否開啟守護進程 self::daemonize(); // 對socket進行了一系列的配置 self::initWorkers(); // 注冊信號處理器 self::installSignal(); // 保存主進程的pid self::saveMasterPid(); // 每個$worker服務(wù)fork出count個子進程,然后給每個子進程綁定loop循環(huán)監(jiān)聽事件tcp self::forkWorkers(); self::displayUI(); self::resetStd(); // 主進程來監(jiān)聽子進程狀態(tài) self::monitorWorkers(); }生成實例的id
根據(jù)對象生成hashid,并且同一個對象生成的id是一樣的。
$this->workerId = spl_object_hash($this);
并且將當前對象$this存儲到靜態(tài)數(shù)組中self::$_workers
創(chuàng)建資源環(huán)境$this->_context = stream_context_create($context_option);校驗是否命令行模式
只有命令行模式可以執(zhí)行
if(php_sapi_name() != "cli") {}獲取初始化運行的腳本文件的絕對路徑
debug_backtrace返回的數(shù)組的最后一個元素就是初始化的文件,第一個元素就是當前執(zhí)行該命令的文件。
$backtrace = debug_backtrace(); $self::startFile = $backtrace[count($backtrace) -1 ]["file"]生成pid的存儲文件
將debug_backtrace獲取到的絕對路徑通過_進行分割作為pid的文件名。
創(chuàng)建日志文件如果路徑?jīng)]定義直接指定../workerman.log
touch(self::$logFile); chmod(self::$logFile,0622); // 可讀寫、寫、寫初始化當前主進程狀態(tài)
self:$_status = self::STATUS_STARTING;
腳本使用self::$_status來存儲當前運行的狀態(tài)(current status),一共有四種狀態(tài)
// 開始 const STATUS_STARTING = 1; // 運行 const STATUS_RUNNING = 2; // 關(guān)閉 const STATUS_SHUTDOWN=4; // 重新加載 const STATUS_RELOADING=8;臨時文件
slef::$_statisticsFile = sys_get_temp_dir()."/workerman.status";設(shè)置進程標題
// 設(shè)置當前進程的標題 cli_set_process_title($title); // 如果不存在上面的方法,那么使用proc_title擴展 if(extension_loaded("proctitle") && function_exists("setproctitle")) { setproctitle($title); }填充idMap
每個$worker_id都是當前腳本中要初始化的實例,每個服務(wù)要開啟$worker->count個子進程,先用0來填充數(shù)組。后面每開啟一個子進程,會將子進程的pid存儲到idMap中,用來后面主進程監(jiān)控子進程,如果子進程意外終止,主進程可以重新佛。
self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);進程信號處理器
pcntl_signal安裝一個信號處理器,用來監(jiān)聽信號SIGALRM。如果不安裝SIGALRM信號,當進程接收到SIGALRM信號時會默認終止進程。
pcntl_signal(SIGALRM, ["WorkermanLibTimer","signalHandle"],false);解析cli參數(shù)
允許進程腳本接受參數(shù),用來完成相應(yīng)的指令。
xxx.php start -d 會開啟守護進程 xxx.php start 會進入debug模式判斷進程是否已經(jīng)存在
從pid文件中讀取存在的進程號,如果進程號存在,并且進程也存活。如果傳入的命令是start并且pid文件中的進程id和當前腳本執(zhí)行的不一樣,那么說明腳本重復(fù)執(zhí)行了,報錯。
posix_kill($master_pid,0)用來判斷進程是否存在,posix_kill本意是向該進程發(fā)送信號。
// Get master process PID. $master_pid = @file_get_contents(self::$pidFile); $master_is_alive = $master_pid && @posix_kill($master_pid, 0); // Master is still alive? if ($master_is_alive) { // 如果已經(jīng)有進程存在,并且當前執(zhí)行的進程和已存在的進程不一樣,報錯。 if ($command === "start" && posix_getpid() != $master_pid) { self::log("Workerman[$start_file] already running"); exit; } }命令處理
kill使用pcntl_signal注冊了信號,然后使用posix_kill發(fā)送信號,使用pcntl_signal_dispatch來處理信號。
先發(fā)送kill -SIGINT ,千分之一毫秒之后發(fā)送kill -SIGKILL
status刪掉臨時文件
向進程發(fā)送SIGUSR2信號
從臨時文件中讀取內(nèi)容
結(jié)束腳本
restart、stop向進程發(fā)送終止信號
$master_pid && posix_kill($master_pid, SIGINT);
while(1)循環(huán)判斷是否進程已經(jīng)被終止,如果超過時間5s,那么寫入日志終止失敗,并且結(jié)束當前腳本。(沒有結(jié)束進程,只是結(jié)束了當前的命令腳本)
reload發(fā)送特殊信號SIGUSR1,但是這個信號不會被立即執(zhí)行,而是需要等待pcntl_signal_dispatch來進行信號分發(fā)。
posix_kill($master_pid,SIGUSR1);
例如用戶現(xiàn)在輸入了php worker.php reload,那么會使所有的進程重新進行加載配置。
當前腳本解析參數(shù)
判斷進程是否存活,如果進程pid存在,但是進程沒有存活,那么報錯,說not run
進程存活,注冊信號SIGUSR1
退出當前腳本
運行中的父進程開始觸發(fā)所有的信號,由于之前已經(jīng)安裝了信號處理方法,所以會觸發(fā)self::reload()方法。
啟動守護進程fork兩次并不是為了避免僵尸進程,而是為了避免svr4系統(tǒng)重新打開終端
守護進程詳解
守護進程為什么要fork兩次?
方法通用,可以稍作修改,用到別的地方。用這個方法做過一個隊列監(jiān)聽,redis的list啟動一個rpop。
umask(0)將默認權(quán)限的掩碼修改為0,即將要創(chuàng)建的所有的文佳你的權(quán)限都是777
$pid = pcntl_fork()啟動子進程,判斷$pid是否存在,只有在父進程中pcntl_fork()才會返回id,我們要將父進程kill掉。
posix_setsid()將當前子進程設(shè)置為會話組leader
再次創(chuàng)建子進程,為了防止在SVR4的系統(tǒng)下重新打開控制終端。
protected static function daemonize() { if (!self::$daemonize) { return; } // 將默認權(quán)限掩碼修改為0,意味著即將要創(chuàng)建的文件的權(quán)限都是777 umask(0); // 子進程 $pid = pcntl_fork(); // 子進程創(chuàng)建失敗 if (-1 === $pid) { throw new Exception("fork fail"); } elseif ($pid > 0) { //說明當前進程是父進程,只有在父進程中fork才會返回pid // 關(guān)閉父進程,讓子進程成為孤兒進程被init進程收養(yǎng) exit(0); } // 將子進程作為進程組的leader,開啟一個新的會話,脫離之前的會話和進程組。即使用戶logout也不會終止 if (-1 === posix_setsid()) { throw new Exception("setsid fail"); } // Fork again avoid SVR4 system regain the control of terminal. // 避免svr4系統(tǒng)重新獲取控制終端 $pid = pcntl_fork(); if (-1 === $pid) { throw new Exception("fork fail"); } elseif (0 !== $pid) { // 如果不是孫子進程,直接干掉。讓孫子進程成為孤兒進程被init進程(1號進程收養(yǎng)) exit(0); } }初始化所有的woker實例 獲取進程的當前用戶信息
$user_info = posix_getpwuid(posix_getuid());開啟監(jiān)聽
UNIX域套接字相關(guān)知識
創(chuàng)建socket服務(wù),啟動一個本地的socket服務(wù)。如果是unix域socket的話,$local_socket需要是本地的文件。個人理解就是通過某個端口來監(jiān)聽某個協(xié)議。
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);將包含socket的流導(dǎo)入到socket的擴展資源中
Imports a stream that encapsulates a socket into a socket extension resource.
關(guān)于為什么要使用socket_import_stream?
設(shè)置socketphp提供兩種socket:php提供了兩種類型的socket,stream_socket 和 sockets,二者api不兼容。stream_socket是php內(nèi)置的,可以直接使用,并且api和stream 的api通用(可以調(diào)用fread fwrite...)。sockets需要php安裝sockets擴展才能使用。
設(shè)置心跳檢測,減少傳輸延遲,設(shè)置為非阻塞模式。
SO_KEEPALIVE
TCP/IP Socket心跳機制so_keepalive的三個參數(shù)詳解
TCP Keepalive HOWTO
設(shè)置心跳
SO_KEEPALIVE 保持連接檢測對方主機是否崩潰,避免(服務(wù)器)永遠阻塞于TCP連接的輸入
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
設(shè)置最小化傳輸延遲
提高linux上socket性能
// 最小化傳輸延遲,而不是追求最小化報文數(shù)量 @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
設(shè)置為非阻塞模式
在非阻塞模式下,調(diào)用 fgets() 總是會立即返回;而在阻塞模式下,將會一直等到從資源流里面獲取到數(shù)據(jù)才能返回。
stream_set_blocking($this->_mainSocket,0);注冊信號處理器
// stop pcntl_signal(SIGINT, array("WorkermanWorker", "signalHandler"), false); // reload 重新加載的時候發(fā)送R1信號 pcntl_signal(SIGUSR1, array("WorkermanWorker", "signalHandler"), false); // status 查看狀態(tài)的時候發(fā)送R2信號 pcntl_signal(SIGUSR2, array("WorkermanWorker", "signalHandler"), false); // ignore pcntl_signal(SIGPIPE, SIG_IGN, false);保存當前進程的pid
如果已經(jīng)開啟了守護進程,那么獲取的是當前孫子進程(守護進程)的pid,將其寫入到pid文件中。
將每個woker實例(服務(wù))創(chuàng)建count個子進程遍歷整個$workers,將進程按照里面的每一個實例的參數(shù)fork count個數(shù)的子進程。并且將子進程的號碼,記錄到父進程中
// 創(chuàng)建子進程 $pid = pcntl_fork(); // Get available worker id. $id = self::getId($worker->workerId, 0); // For master process. // 如果當前進程是父進程 if ($pid > 0) { // 將子進程的號碼,記錄到父進程中(重要) self::$_pidMap[$worker->workerId][$pid] = $pid; // 父進程將自己的pid寫入到idMap的第一位(非常重要),在此之后,同一個實例下每個進程的id都不一樣。 self::$_idMap[$worker->workerId][$id] = $pid; } // For child processes.讓每個子進程下都開始啟動對應(yīng)worker的服務(wù)
$worker->run();設(shè)置當前狀態(tài)為運行中
self::$_status = self::STATUS_RUNNING;設(shè)置globalEvent
self::getEventLoopName從三個事件擴展中選擇一個libevent,event,ev
事件監(jiān)聽向socket添加事件監(jiān)聽回調(diào)函數(shù)
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, "acceptUdpConnection"));displayUI
終端下打印UI界面
再次重置標準化輸入輸出self::resetStd();監(jiān)控所有子進程
PHP多進程編程初步
PHP通過PCNTL擴展實現(xiàn)進程控制
pcntl_signal()函數(shù)僅僅是注冊信號和它的處理方法,真正接收到信號并調(diào)用其處理方法的是pcntl_signal_dispatch()函數(shù),而posix_kill會發(fā)送信號。
只有父進程才會監(jiān)控所有的子進程,因為子進程運行的是run方法
觸發(fā)所有的信號
等待子進程退出
將退出的子進程從實例維護的pidMap中移除
將退出的子進程從對應(yīng)的idMap中進行重置為0,取消占取的位置
如果腳本仍然在執(zhí)行,但是子進程退出了,那么重啟子進程
獲取所有的子進程號,如果子進程都退出了,那么結(jié)束父進程
關(guān)于pcntl_waitpcntl_signal(SIGUSR1,"signalHandle1"); 用來注冊信號;posix_kill($pid,SIGUSR1)向指定進程發(fā)送信號(返回結(jié)果是即時的,但是并不會觸發(fā)信號所綁定的行為);pcntl_signal_dispatch()用來觸發(fā)收到的信號的回調(diào)函數(shù)。
pcntl_wait($status,WUNTRACED)開啟阻塞模式來監(jiān)控子進程是否退出,之后子進程退出之后,才會執(zhí)行后面的操作。
0) { $status = 0; // 等待子進程終止 pcntl_wait($status,WUNTRACED); echo "wait已執(zhí)行 {$pid}? "; // 執(zhí)行信號的回調(diào)函數(shù) pcntl_signal_dispatch(); }else { // 發(fā)送信號2 posix_kill($pid,SIGUSR2); // 處理信號2的回調(diào)函數(shù) pcntl_signal_dispatch(); // 父進程處理信號2 sleep(3); // 子進程發(fā)送信號,看是否pcntl_wait會執(zhí)行(結(jié)果,wait沒有執(zhí)行) posix_kill($pid,SIGUSR1); // 繼續(xù)等待,觀察是否pcntl_wait等子進程結(jié)束后執(zhí)行 sleep(3); echo "子進程終止 "; // 這個時候wait往下執(zhí)行了,看來wait等待子進程讓父進程進行了掛起操作 }
輸出結(jié)果
信號2回調(diào) 子進程終止 wait已執(zhí)行 18425? 信號2回調(diào) 信號1回調(diào)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/22028.html
摘要:即時通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時間等等。目前設(shè)計是單表單庫。這里只是對即時通訊設(shè)計上做了一些簡要的闡述,如有疑問和建議,請在評論區(qū)回復(fù)。 詳解即時通訊設(shè)計實現(xiàn)(PHP+GatewayWorker+Redis) 需要實現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實時消息 顯示 工具...
摘要:即時通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時間等等。目前設(shè)計是單表單庫。這里只是對即時通訊設(shè)計上做了一些簡要的闡述,如有疑問和建議,請在評論區(qū)回復(fù)。 詳解即時通訊設(shè)計實現(xiàn)(PHP+GatewayWorker+Redis) 需要實現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實時消息 顯示 工具...
摘要:即時通訊中,最重要的是響應(yīng)速度,我們需要展示消息列表那么這時會有未讀消息,未讀數(shù)量,最后一條消息內(nèi)容,時間等等。目前設(shè)計是單表單庫。這里只是對即時通訊設(shè)計上做了一些簡要的闡述,如有疑問和建議,請在評論區(qū)回復(fù)。 詳解即時通訊設(shè)計實現(xiàn)(PHP+GatewayWorker+Redis) 需要實現(xiàn)的功能 一對一聊天(私聊) 一對多聊天(群聊) 類似QQ,微信等聊天列表 實時消息 顯示 工具...
閱讀 2463·2021-11-22 15:29
閱讀 4247·2021-11-04 16:13
閱讀 1061·2019-08-29 16:58
閱讀 390·2019-08-29 16:08
閱讀 1551·2019-08-23 17:56
閱讀 2504·2019-08-23 17:06
閱讀 3234·2019-08-23 16:55
閱讀 2136·2019-08-23 16:22