摘要:判斷客戶端是否配置了檢測或者長度檢測,如果配置了就調(diào)用接受完整的數(shù)據(jù)包,這兩天會調(diào)用,進(jìn)而調(diào)用函數(shù)。異步客戶端接受數(shù)據(jù)異步的客戶端接受數(shù)據(jù)調(diào)用的和同步的客戶端相同,都是調(diào)用函數(shù)。
recv 接受數(shù)據(jù)
客戶端接受數(shù)據(jù)需要指定緩存區(qū)最大長度,就是下面的 buf_len,flags 用于指定是否設(shè)置 waitall 標(biāo)志,如果設(shè)定了 waitall 就必須設(shè)定準(zhǔn)確的 size,否則會一直等待,直到接收的數(shù)據(jù)長度達(dá)到 size。
客戶端啟用了 EOF/Length 檢測后,無需設(shè)置 size 和 waitall 參數(shù)。擴(kuò)展層會返回完整的數(shù)據(jù)包或者返回false。
開啟了 open_eof_check/open_length_check 選項(xiàng)之后,會自動進(jìn)行包長檢測,過程和服務(wù)端類似,此處不需要多說。
static PHP_METHOD(swoole_client, recv) { zend_long buf_len = SW_PHP_CLIENT_BUFFER_SIZE; zend_long flags = 0; int ret; char *buf = NULL; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|ll", &buf_len, &flags) == FAILURE) { return; } //waitall if (flags == 1) { flags = MSG_WAITALL; } swClient *cli = client_get_ptr(getThis() TSRMLS_CC); swProtocol *protocol = &cli->protocol; if (cli->open_eof_check) { if (cli->buffer == NULL) { cli->buffer = swString_new(SW_BUFFER_SIZE_BIG); } swString *buffer = cli->buffer; int eof = -1; if (buffer->length > 0) { goto find_eof; } while (1) { buf = buffer->str + buffer->length; buf_len = buffer->size - buffer->length; if (buf_len > SW_BUFFER_SIZE_BIG) { buf_len = SW_BUFFER_SIZE_BIG; } ret = cli->recv(cli, buf, buf_len, 0); if (ret < 0) { swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(errno), errno); buffer->length = 0; RETURN_FALSE; } else if (ret == 0) { buffer->length = 0; RETURN_EMPTY_STRING(); } buffer->length += ret; if (buffer->length < protocol->package_eof_len) { continue; } find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol->package_eof, protocol->package_eof_len); if (eof >= 0) { eof += protocol->package_eof_len; SW_RETVAL_STRINGL(buffer->str, eof, 1); if (buffer->length > eof) { buffer->length -= eof; memmove(buffer->str, buffer->str + eof, buffer->length); } else { buffer->length = 0; } return; } else { if (buffer->length == protocol->package_max_length) { swoole_php_error(E_WARNING, "no package eof"); buffer->length = 0; RETURN_FALSE; } else if (buffer->length == buffer->size) { if (buffer->size < protocol->package_max_length) { int new_size = buffer->size * 2; if (new_size > protocol->package_max_length) { new_size = protocol->package_max_length; } if (swString_extend(buffer, new_size) < 0) { buffer->length = 0; RETURN_FALSE; } } } } } buffer->length = 0; RETURN_FALSE; } else if (cli->open_length_check) { if (cli->buffer == NULL) { cli->buffer = swString_new(SW_BUFFER_SIZE_STD); } uint32_t header_len = protocol->package_length_offset + protocol->package_length_size; ret = cli->recv(cli, cli->buffer->str, header_len, MSG_WAITALL); if (ret <= 0) { goto check_return; } else if (ret != header_len) { ret = 0; goto check_return; } buf_len = protocol->get_package_length(protocol, cli->socket, cli->buffer->str, ret); //error package if (buf_len < 0) { RETURN_EMPTY_STRING(); } //empty package else if (buf_len == header_len) { SW_RETURN_STRINGL(cli->buffer->str, header_len, 1); } else if (buf_len > protocol->package_max_length) { swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "Package is too big. package_length=%d", (int )buf_len); RETURN_EMPTY_STRING(); } buf = (char *) emalloc(buf_len + 1); memcpy(buf, cli->buffer->str, header_len); SwooleG.error = 0; ret = cli->recv(cli, buf + header_len, buf_len - header_len, MSG_WAITALL); if (ret > 0) { ret += header_len; if (ret != buf_len) { ret = 0; } } } else { if (!(flags & MSG_WAITALL) && buf_len > SW_PHP_CLIENT_BUFFER_SIZE) { buf_len = SW_PHP_CLIENT_BUFFER_SIZE; } buf = (char *) emalloc(buf_len + 1); SwooleG.error = 0; ret = cli->recv(cli, buf, buf_len, flags); } check_return: if (ret < 0) { SwooleG.error = errno; swoole_php_error(E_WARNING, "recv() failed. Error: %s [%d]", strerror(SwooleG.error), SwooleG.error); zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC); swoole_efree(buf); RETURN_FALSE; } else { if (ret == 0) { swoole_efree(buf); RETURN_EMPTY_STRING(); } else { buf[ret] = 0; SW_RETVAL_STRINGL(buf, ret, 0); } } }swClient_tcp_recv_no_buffer 同步 TCP 客戶端接受函數(shù)
上一小節(jié)中的 cli->recv 實(shí)際調(diào)用的是 swClient_tcp_recv_no_buffer 函數(shù),無論是同步客戶端還是異步客戶端都是這個(gè)函數(shù),該函數(shù)會調(diào)用 swConnection_recv 接受數(shù)據(jù),直到達(dá)到超時(shí)時(shí)間。
值得注意的是這個(gè)函數(shù)中的 flag 如果是 MSG_WAITALL 標(biāo)志,recv 會等待所有的數(shù)據(jù)(長度為 len 的數(shù)據(jù))全部到達(dá)之后才會返回。
static int swClient_tcp_recv_no_buffer(swClient *cli, char *data, int len, int flag) { int ret; while (1) { ret = swConnection_recv(cli->socket, data, len, flag); if (ret >= 0) { break; } if (errno == EINTR) { if (cli->interrupt_time <= 0) { cli->interrupt_time = swoole_microtime(); } else if (swoole_microtime() > cli->interrupt_time + cli->timeout) { break; } else { continue; } } #ifdef SW_USE_OPENSSL if (errno == EAGAIN && cli->socket->ssl) { int timeout_ms = (int) (cli->timeout * 1000); if (cli->socket->ssl_want_read && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_READ) == SW_OK) { continue; } else if (cli->socket->ssl_want_write && swSocket_wait(cli->socket->fd, timeout_ms, SW_EVENT_WRITE) == SW_OK) { continue; } } #endif break; } return ret; }swClient_udp_recv 同步 UDP 客戶端
對于 UDP 來說,cli->recv 就是函數(shù) swClient_udp_recv, 本函數(shù)會嘗試調(diào)用兩次 recvfrom:
static int swClient_udp_recv(swClient *cli, char *data, int length, int flags) { cli->remote_addr.len = sizeof(cli->remote_addr.addr); int ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr.addr, &cli->remote_addr.len); if (ret < 0) { if (errno == EINTR) { ret = recvfrom(cli->socket->fd, data, length, flags, (struct sockaddr *) &cli->remote_addr, &cli->remote_addr.len); } else { return SW_ERR; } } return ret; }swClient_onStreamRead 異步 TCP 客戶端讀就緒
對于異步 TCP 數(shù)據(jù)的接受,首先與異步客戶端的寫就緒類似,首先要判斷當(dāng)前的 SSL 的狀態(tài)是否是 SW_SSL_STATE_WAIT_STREAM,再次進(jìn)行 SSL 握手(具體原因不太清楚)。
判斷客戶端是否配置了 EOF 檢測或者長度檢測,如果配置了就調(diào)用 swProtocol_recv_check_eof/swProtocol_recv_check_length 接受完整的數(shù)據(jù)包,這兩天會調(diào)用 swClient_onPackage,進(jìn)而調(diào)用 onReceive 函數(shù)。
如果沒有配置,那么就簡單的調(diào)用 swConnection_recv 接受數(shù)據(jù),接受到數(shù)據(jù)之后會調(diào)用 onReceive。
static int swClient_onPackage(swConnection *conn, char *data, uint32_t length) { swClient *cli = (swClient *) conn->object; cli->onReceive(conn->object, data, length); return conn->close_wait ? SW_ERR : SW_OK; } static int swClient_onStreamRead(swReactor *reactor, swEvent *event) { int n; swClient *cli = event->socket->object; char *buf = cli->buffer->str + cli->buffer->length; long buf_size = cli->buffer->size - cli->buffer->length; #ifdef SW_USE_OPENSSL if (cli->open_ssl && cli->socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) { if (swClient_ssl_handshake(cli) < 0) { goto connect_fail; } if (cli->socket->ssl_state != SW_SSL_STATE_READY) { return SW_OK; } //ssl handshake sucess else if (cli->onConnect) { execute_onConnect(cli); } } #endif if (cli->open_eof_check || cli->open_length_check) { swConnection *conn = cli->socket; swProtocol *protocol = &cli->protocol; if (cli->open_eof_check) { n = swProtocol_recv_check_eof(protocol, conn, cli->buffer); } else { n = swProtocol_recv_check_length(protocol, conn, cli->buffer); } if (n < 0) { return cli->close(cli); } else { if (conn->removed == 0 && cli->remove_delay) { swClient_sleep(cli); cli->remove_delay = 0; } return SW_OK; } } #ifdef SW_CLIENT_RECV_AGAIN recv_again: #endif n = swConnection_recv(event->socket, buf, buf_size, 0); if (n < 0) { __error: switch (swConnection_error(errno)) { case SW_ERROR: swSysError("Read from socket[%d] failed.", event->fd); return SW_OK; case SW_CLOSE: goto __close; case SW_WAIT: return SW_OK; default: return SW_OK; } } else if (n == 0) { __close: return cli->close(cli); } else { cli->onReceive(cli, buf, n); #ifdef SW_CLIENT_RECV_AGAIN if (n == buf_size) { goto recv_again; } #endif return SW_OK; } return SW_OK; }swClient_onDgramRead 異步 UDP 客戶端接受數(shù)據(jù)
異步的 UDP 客戶端接受數(shù)據(jù)調(diào)用的和同步的客戶端相同,都是調(diào)用 swClient_udp_recv 函數(shù)。
static int swClient_onDgramRead(swReactor *reactor, swEvent *event) { swClient *cli = event->socket->object; char buffer[SW_BUFFER_SIZE_UDP]; int n = swClient_udp_recv(cli, buffer, sizeof(buffer), 0); if (n < 0) { return SW_ERR; } else { cli->onReceive(cli, buffer, n); } return SW_OK; }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/29497.html
摘要:新建可以看到,自動采用包長檢測的方法該函數(shù)主要功能是設(shè)置各種回調(diào)函數(shù)值得注意的是第三個(gè)參數(shù)代表是否異步。發(fā)送數(shù)據(jù)函數(shù)并不是直接發(fā)送數(shù)據(jù),而是將數(shù)據(jù)存儲在,等著寫事件就緒之后調(diào)用發(fā)送數(shù)據(jù)。 swReactorThread_dispatch 發(fā)送數(shù)據(jù) reactor 線程會通過 swReactorThread_dispatch 發(fā)送數(shù)據(jù),當(dāng)采用 stream 發(fā)送數(shù)據(jù)的時(shí)候,會調(diào)用 sw...
摘要:有研究過框架的同學(xué)就會發(fā)現(xiàn),其實(shí)最核心的,就是用了拓展加上拓展來實(shí)現(xiàn)其底層的網(wǎng)絡(luò)服務(wù)和多進(jìn)程調(diào)度。我們在模式下,測試起五個(gè)進(jìn)程主進(jìn)程要等待回收我們,這樣就很簡單的實(shí)現(xiàn)了一個(gè)多進(jìn)程的協(xié)程服務(wù)。 有研究過Workman框架的同學(xué)就會發(fā)現(xiàn),其實(shí)workman最核心的,就是用了php socket拓展加上pcntl拓展來實(shí)現(xiàn)其底層的網(wǎng)絡(luò)服務(wù)和多進(jìn)程調(diào)度。那我們今天就來探討如何使用Swoole的...
摘要:之后如果仍然有剩余未發(fā)送的數(shù)據(jù),那么就如果已經(jīng)沒有剩余數(shù)據(jù)了,繼續(xù)去取下一個(gè)數(shù)據(jù)包。拿到后,要用函數(shù)轉(zhuǎn)化為相應(yīng)的類型即可得到包長值。 swPort_onRead_check_eof EOF 自動分包 我們前面說過,swPort_onRead_raw 是最簡單的向 worker 進(jìn)程發(fā)送數(shù)據(jù)包的方法,swoole 會將從客戶端接受到的數(shù)據(jù)包,立刻發(fā)送給 worker 進(jìn)程,用戶自己把...
摘要:兩個(gè)函數(shù)是可選回調(diào)函數(shù)。附帶了一組可信任證書。應(yīng)該注意的是,驗(yàn)證失敗并不意味著連接不能使用。在對證書進(jìn)行驗(yàn)證時(shí),有一些安全性檢查并沒有執(zhí)行,包括證書的失效檢查和對證書中通用名的有效性驗(yàn)證。 前言 swoole_client 提供了 tcp/udp socket 的客戶端的封裝代碼,使用時(shí)僅需 new swoole_client 即可。 swoole 的 socket client 對比...
摘要:對于服務(wù)端來說,緩存默認(rèn)是不能使用的,可以通過調(diào)用函數(shù)來進(jìn)行設(shè)置生效。在回調(diào)函數(shù)中,首先申請一個(gè)大數(shù)數(shù)據(jù)結(jié)構(gòu),然后將其設(shè)定為,該值表示公鑰指數(shù),然后利用函數(shù)生成秘鑰。此時(shí)需要調(diào)用函數(shù)將新的連接與綁定。 前言 上一篇文章我們講了 OpenSSL 的原理,接下來,我們來說說如何利用 openssl 第三方庫進(jìn)行開發(fā),來為 tcp 層進(jìn)行 SSL 隧道加密 OpenSSL 初始化 在 sw...
閱讀 988·2021-11-22 13:54
閱讀 2938·2021-09-28 09:36
閱讀 3038·2019-08-30 15:55
閱讀 2005·2019-08-30 15:44
閱讀 597·2019-08-29 12:31
閱讀 2616·2019-08-28 18:18
閱讀 1266·2019-08-26 13:58
閱讀 1498·2019-08-26 13:44