成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

Swoole 源碼分析——基礎(chǔ)模塊之Queue隊列

jollywing / 924人閱讀

摘要:消息隊列的接受消息隊列的接受是利用函數(shù),其中是消息的類型,該參數(shù)會取出指定類型的消息,如果設定的是爭搶模式,該值會統(tǒng)一為,否則該值就是消息發(fā)送目的的。環(huán)形隊列的消息入隊發(fā)送消息首先要確定環(huán)形隊列的隊尾。取模操作可以優(yōu)化

前言

swoole 的底層隊列有兩種:進程間通信 IPC 的消息隊列 swMsgQueue,與環(huán)形隊列 swRingQueue。IPC 的消息隊列用于 task_worker 進程接受投遞消息,環(huán)形隊列用于 SW_MODE_THREAD 線程模式下 task_worker 接受投遞消息的方法。

swMsgQueue 消息隊列數(shù)據(jù)結(jié)構(gòu)

swoole 使用的消息隊列并不是 POSIX 下的 mq_xx 系統(tǒng)函數(shù),而是 SystemV 下的 msgxxx 系列函數(shù),原因猜測是 systemv 系統(tǒng)函數(shù)可以指定 mtype,也就是消息的類型,這樣就可以實現(xiàn)對指定的 task_worker 的投放。

swMsgQueue 的數(shù)據(jù)結(jié)構(gòu)比較簡單,blocking 指定消息隊列是否是阻塞式,msg_id 是創(chuàng)建的消息隊列的 id,flags 也是指定阻塞式還是非阻塞式,perms 指定消息隊列的權(quán)限。

typedef struct _swMsgQueue
{
    int blocking;
    int msg_id;
    int flags;
    int perms;
} swMsgQueue;
swMsgQueue 消息隊列 swMsgQueue 消息隊列的創(chuàng)建

創(chuàng)建消息隊列就是調(diào)用 msgget 函數(shù),這個函數(shù)的 msg_key 就是 server 端配置的 message_queue_key,task 隊列在 server 結(jié)束后不會銷毀,重新啟動程序后,task 進程仍然會接著處理隊列中的任務。如果不設置該值,那么程序會自動生成: ftok($php_script_file, 1)

void swMsgQueue_set_blocking(swMsgQueue *q, uint8_t blocking)
{
    if (blocking == 0)
    {
        q->flags = q->flags | IPC_NOWAIT;
    }
    else
    {
        q->flags = q->flags & (~IPC_NOWAIT);
    }
}

int swMsgQueue_create(swMsgQueue *q, int blocking, key_t msg_key, int perms)
{
    if (perms <= 0 || perms >= 01000)
    {
        perms = 0666;
    }
    int msg_id;
    msg_id = msgget(msg_key, IPC_CREAT | perms);
    if (msg_id < 0)
    {
        swSysError("msgget() failed.");
        return SW_ERR;
    }
    else
    {
        bzero(q, sizeof(swMsgQueue));
        q->msg_id = msg_id;
        q->perms = perms;
        q->blocking = blocking;
        swMsgQueue_set_blocking(q, blocking);
    }
    return 0;
}
swMsgQueue 消息隊列的發(fā)送

消息隊列的發(fā)送主要利用 msgsnd 函數(shù),flags 指定發(fā)送是阻塞式還是非阻塞式,在 task_worker 進程中都是采用阻塞式發(fā)送的方法。

int swMsgQueue_push(swMsgQueue *q, swQueue_data *in, int length)
{
    int ret;

    while (1)
    {
        ret = msgsnd(q->msg_id, in, length, q->flags);
        if (ret < 0)
        {
            SwooleG.error = errno;
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN)
            {
                return -1;
            }
            else
            {
                swSysError("msgsnd(%d, %d, %ld) failed.", q->msg_id, length, in->mtype);
                return -1;
            }
        }
        else
        {
            return ret;
        }
    }
    return 0;
}

swMsgQueue 消息隊列的接受

消息隊列的接受是利用 msgrcv 函數(shù),其中 mtype 是消息的類型,該參數(shù)會取出指定類型的消息,如果 task_ipc_mode 設定的是爭搶模式,該值會統(tǒng)一為 0,否則該值就是消息發(fā)送目的 task_workerid。

task_worker 進程的主循環(huán)會阻塞在本函數(shù)中,直到有消息到達。

int swMsgQueue_pop(swMsgQueue *q, swQueue_data *data, int length)
{
    int ret = msgrcv(q->msg_id, data, length, data->mtype, q->flags);
    if (ret < 0)
    {
        SwooleG.error = errno;
        if (errno != ENOMSG && errno != EINTR)
        {
            swSysError("msgrcv(%d, %d, %ld) failed.", q->msg_id, length, data->mtype);
        }
    }
    return ret;
}
swRingQueue 環(huán)形隊列的數(shù)據(jù)結(jié)構(gòu)

環(huán)形隊列在之前的文章中從來沒有出現(xiàn),因為該隊列是用于 SW_MODE_THREAD 模式下的 worker 線程中。由于并不是進程間的通訊,而是線程間的通訊,因此效率會更高。

swoole 的環(huán)形隊列有兩種,一種是普通的環(huán)形隊列,另一種是線程安全的環(huán)形隊列,本文只會講線程安全的環(huán)形隊列,

swoole 為了環(huán)形隊列更加高效,并沒有使用線程鎖,而是使用了無鎖結(jié)構(gòu),只會利用 atomic 原子鎖。

值得注意的是數(shù)據(jù)結(jié)構(gòu)中的 flags,該值只會是 0-4 中的一個,該值都是利用原子鎖來改動,以此來實現(xiàn)互斥的作用。

typedef struct _swRingQueue
{
    void **data; /* 隊列空間 */
    char *flags; 
    // 0:push ready 1: push now
    // 2:pop ready; 3: pop now
    uint size; /* 隊列總尺寸 */
    uint num; /* 隊列當前入隊數(shù)量 */
    uint head; /* 頭部,出隊列方向*/
    uint tail; /* 尾部,入隊列方向*/

} swRingQueue;
swRingQueue 環(huán)形隊列 swRingQueue 環(huán)形隊列的創(chuàng)建

環(huán)形隊列的創(chuàng)建很簡單,就是初始化隊列數(shù)據(jù)結(jié)構(gòu)中的各種屬性。

int swRingQueue_init(swRingQueue *queue, int buffer_size)
{
    queue->size = buffer_size;
    queue->flags = (char *)sw_malloc(queue->size);
    if (queue->flags == NULL)
    {
        return -1;
    }
    queue->data = (void **)sw_calloc(queue->size, sizeof(void*));
    if (queue->data == NULL)
    {
        sw_free(queue->flags);
        return -1;
    }
    queue->head = 0;
    queue->tail = 0;
    memset(queue->flags, 0, queue->size);
    memset(queue->data, 0, queue->size * sizeof(void*));
    return 0;
}
swRingQueue 環(huán)形隊列的消息入隊

發(fā)送消息首先要確定環(huán)形隊列的隊尾。queue->flags 是一個數(shù)組,里面存儲著所有的隊列元素當前的狀態(tài)。如果當前隊尾元素的狀態(tài)不是 0,說明已經(jīng)有其他線程對該隊列元素進行操作,我們當前線程暫時不能對當前隊尾進行操作,要等其他線程將隊尾元素向后移動一位,我們才能進行更新。

當線程將當前隊尾的狀態(tài)從 0 改變?yōu)?1 之后,我們就要立刻更新隊尾的 offset,讓其他線程繼續(xù)入隊數(shù)據(jù)。接著將數(shù)據(jù)放入 queue->data,僅僅將數(shù)據(jù)的地址保存即可。

最后,將 cur_tail_flag_index 原子加 1,將隊列元素狀態(tài)改為待讀;將 queue->num 原子加 1

int swRingQueue_push(swRingQueue *queue, void * ele)
{
    if (!(queue->num < queue->size))
    {
        return -1;
    }
    int cur_tail_index = queue->tail;
    char * cur_tail_flag_index = queue->flags + cur_tail_index;
    //TODO Scheld
    while (!sw_atomic_cmp_set(cur_tail_flag_index, 0, 1))
    {
        cur_tail_index = queue->tail;
        cur_tail_flag_index = queue->flags + cur_tail_index;
    }

    // 兩個入隊線程之間的同步
    //TODO 取模操作可以優(yōu)化
    int update_tail_index = (cur_tail_index + 1) % queue->size;

    // 如果已經(jīng)被其他的線程更新過,則不需要更新;
    // 否則,更新為 (cur_tail_index+1) % size;
    sw_atomic_cmp_set(&queue->tail, cur_tail_index, update_tail_index);

    // 申請到可用的存儲空間
    *(queue->data + cur_tail_index) = ele;

    sw_atomic_fetch_add(cur_tail_flag_index, 1);
    sw_atomic_fetch_add(&queue->num, 1);
    return 0;
}
swRingQueue 環(huán)形隊列的消息出隊

與入隊相反,出隊需要確定當前隊列的隊首位置,如果隊首的狀態(tài)不是 2,那么說明有其他線程已經(jīng)進行了出隊操作,等待其他線程更新隊首位置即可。

獲取到隊首元素之后,要立刻更新隊首的新位置,然后將數(shù)據(jù)的首地址傳遞給 ele,然后將隊首元素狀態(tài)復原,減少隊列的 num

int swRingQueue_pop(swRingQueue *queue, void **ele)
{
    if (!(queue->num > 0))
        return -1;
    int cur_head_index = queue->head;
    char * cur_head_flag_index = queue->flags + cur_head_index;

    while (!sw_atomic_cmp_set(cur_head_flag_index, 2, 3))
    {
        cur_head_index = queue->head;
        cur_head_flag_index = queue->flags + cur_head_index;
    }
    //TODO 取模操作可以優(yōu)化
    int update_head_index = (cur_head_index + 1) % queue->size;
    sw_atomic_cmp_set(&queue->head, cur_head_index, update_head_index);
    *ele = *(queue->data + cur_head_index);

    sw_atomic_fetch_sub(cur_head_flag_index, 3);
    sw_atomic_fetch_sub(&queue->num, 1);
    return 0;
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/30844.html

相關(guān)文章

  • Swoole 源碼分析——基礎(chǔ)模塊 Channel 隊列

    摘要:前言內(nèi)存數(shù)據(jù)結(jié)構(gòu),類似于的通道,底層基于共享內(nèi)存互斥鎖實現(xiàn),可實現(xiàn)用戶態(tài)的高性能內(nèi)存隊列。是當前隊列占用的內(nèi)存大小,用來指定是否使用共享內(nèi)存是否使用鎖是否使用通知。 前言 內(nèi)存數(shù)據(jù)結(jié)構(gòu) Channel,類似于 Go 的 chan 通道,底層基于 共享內(nèi)存 + Mutex 互斥鎖實現(xiàn),可實現(xiàn)用戶態(tài)的高性能內(nèi)存隊列。Channel 可用于多進程環(huán)境下,底層在讀取寫入時會自動加鎖,應用層不需...

    txgcwm 評論0 收藏0
  • Swoole 源碼分析——Server模塊TaskWorker事件循環(huán)

    摘要:函數(shù)事件循環(huán)在事件循環(huán)時,如果使用的是消息隊列,那么就不斷的調(diào)用從消息隊列中取出數(shù)據(jù)。獲取后的數(shù)據(jù)調(diào)用回調(diào)函數(shù)消費消息之后,向中發(fā)送空數(shù)據(jù),告知進程已消費,并且關(guān)閉新連接。 swManager_start 創(chuàng)建進程流程 task_worker 進程的創(chuàng)建可以分為三個步驟:swServer_create_task_worker 申請所需的內(nèi)存、swTaskWorker_init 初始化...

    用戶83 評論0 收藏0
  • Swoft 源碼解讀

    摘要:官網(wǎng)源碼解讀號外號外歡迎大家我們開發(fā)組定了一個就線下聚一次的小目標里面的框架算是非常重的了這里的重先不具體到性能層面主要是框架的設計思想和框架集成的服務讓框架可以既可以快速解決很多問題又可以輕松擴展中的框架有在應該無出其右了這次解讀的源碼 官網(wǎng): https://www.swoft.org/源碼解讀: http://naotu.baidu.com/file/8... 號外號外, 歡迎大...

    weij 評論0 收藏0
  • Swoole 源碼分析——進程管理 Swoole_Process

    摘要:清空主進程殘留的定時器與信號。設定為執(zhí)行回調(diào)函數(shù)如果在回調(diào)函數(shù)中調(diào)用了異步系統(tǒng),啟動函數(shù)進行事件循環(huán)。因此為了區(qū)分兩者,規(guī)定并不允許兩者同時存在。 前言 swoole-1.7.2 增加了一個進程管理模塊,用來替代 PHP 的 pcntl 擴展。 PHP自帶的pcntl,存在很多不足,如 pcntl 沒有提供進程間通信的功能 pcntl 不支持重定向標準輸入和輸出 pcntl 只...

    pepperwang 評論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴展實現(xiàn)真正的數(shù)據(jù)庫連接池這種方案中,項目占用的連接數(shù)僅僅為。一種是連接暫時不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對原文進行了重新的排版。Swoft Github: https:...

    rozbo 評論0 收藏0

發(fā)表評論

0條評論

jollywing

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<