摘要:到這里,我們總算能夠完整的理解清楚,當(dāng)我們向一個比原節(jié)點請求區(qū)塊數(shù)據(jù),我們這邊需要怎么做,對方節(jié)點又需要怎么做了。
作者:freewind
比原項目倉庫:
Github地址:https://github.com/Bytom/bytom
Gitee地址:https://gitee.com/BytomBlockc...
在上一篇,我們知道了比原是如何把“請求區(qū)塊數(shù)據(jù)”的信息BlockRequestMessage發(fā)送給peer節(jié)點的,那么本文研究的重點就是,當(dāng)peer節(jié)點收到了這個信息,它將如何應(yīng)答?
那么這個問題如果細(xì)分的話,也可以分為三個小問題:
比原節(jié)點是如何收到對方發(fā)過來的信息的?
收到BlockRequestMessage后,將會給對方發(fā)送什么樣的信息?
這個信息是如何發(fā)送出去的?
我們先從第一個小問題開始。
比原節(jié)點是如何接收對方發(fā)過來的信息的?如果我們在代碼中搜索BlockRequestMessage,會發(fā)現(xiàn)只有在ProtocolReactor.Receive方法中針對該信息進(jìn)行了應(yīng)答。那么問題的關(guān)鍵就是,比原是如何接收對方發(fā)過來的信息,并且把它轉(zhuǎn)交給ProtocolReactor.Receive的。
如果我們對前一篇《比原是如何把請求區(qū)塊數(shù)據(jù)的信息發(fā)出去的》有印象的話,會記得比原在發(fā)送信息時,最后會把信息寫入到MConnection.bufWriter中;與之相應(yīng)的,MConnection還有一個bufReader,用于讀取數(shù)據(jù),它也是與net.Conn綁定在一起的:
p2p/connection.go#L114-L118
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
(其中minReadBufferSize的值為常量1024)
所以,要讀取對方發(fā)來的信息,一定會讀取bufReader。經(jīng)過簡單的搜索,我們發(fā)現(xiàn),它也是在MConnection.Start中啟動的:
p2p/connection.go#L152-L159
func (c *MConnection) OnStart() error { // ... go c.sendRoutine() go c.recvRoutine() // ... }
其中的c.recvRoutine()就是我們本次所關(guān)注的。它上面的c.sendRoutine是用來發(fā)送的,是前一篇文章中我們關(guān)注的重點。
繼續(xù)c.recvRoutine():
p2p/connection.go#L403-L502
func (c *MConnection) recvRoutine() { // ... for { c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) // ... pktType := wire.ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) // ... switch pktType { // ... case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) c.recvMonitor.Update(int(n)) // ... channel, ok := c.channelsIdx[pkt.ChannelID] // ... msgBytes, err := channel.recvMsgPacket(pkt) // ... if msgBytes != nil { // ... c.onReceive(pkt.ChannelID, msgBytes) } // ... } } // ... }
經(jīng)過簡化以后,這個方法分成了三塊內(nèi)容:
第一塊就限制接收速率,以防止惡意結(jié)點突然發(fā)送大量數(shù)據(jù)把節(jié)點撐死。跟發(fā)送一樣,它的限制是500K/s
第二塊是從c.bufReader中讀取出下一個數(shù)據(jù)包的類型。它的值目前有三個,兩個跟心跳有關(guān):packetTypePing和packetTypePong,另一個表示是正常的信息數(shù)據(jù)類型packetTypeMsg,也是我們需要關(guān)注的
第三塊就是繼續(xù)從c.bufReader中讀取出完整的數(shù)據(jù)包,然后根據(jù)它的ChannelID找到相應(yīng)的channel去處理它。ChannelID有兩個值,分別是BlockchainChannel和PexChannel,我們目前只需要關(guān)注前者即可,它對應(yīng)的reactor是ProtocolReactor。當(dāng)最后調(diào)用c.onReceive(pkt.ChannelID, msgBytes)時,讀取的二進(jìn)制數(shù)據(jù)msgBytes就會被ProtocolReactor.Receive處理
我們的重點是看第三塊內(nèi)容。首先是channel.recvMsgPacket(pkt),即通道是怎么從packet包里讀取到相應(yīng)的二進(jìn)制數(shù)據(jù)的呢?
p2p/connection.go#L667-L682
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { // ... ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { msgBytes := ch.recving // ... ch.recving = ch.recving[:0] return msgBytes, nil } return nil, nil }
這個方法我去掉了一些錯誤檢查和關(guān)于性能方面的注釋,有興趣的同學(xué)可以點接上方的源代碼查看,這里就忽略了。
這段代碼主要是利用了一個叫recving的通道,把packet中持有的字節(jié)數(shù)組加到它后面,然后再判斷該packet是否代表整個信息結(jié)束了,如果是的話,則把ch.recving的內(nèi)容完整返回,供調(diào)用者處理;否則的話,返回一個nil,表示還沒拿完,暫時處理不了。在前一篇文章中關(guān)于發(fā)送數(shù)據(jù)的地方可以與這里對應(yīng),只不過發(fā)送方要麻煩的多,需要三個通道sendQueue、sending和send才能實現(xiàn),這邊接收方就簡單了。
然后回到前面的方法MConnection.recvRoutine,我們繼續(xù)看最后的c.onReceive調(diào)用。這個onReceive實際上是一個由別人賦值給該channel的一個函數(shù),它位于MConnection創(chuàng)建的地方:
p2p/peer.go#L292-L310
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { if chID == PexChannel { return } else { cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) } } reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) }
邏輯也比較簡單,就是當(dāng)前面的c.onReceive(pkt.ChannelID, msgBytes)調(diào)用時,它會根據(jù)傳入的chID找到相應(yīng)的Reactor,然后執(zhí)行其Receive方法。對于本文來說,就會進(jìn)入到ProtocolReactor.Receive。
那我們繼續(xù)看ProtocolReactor.Receive:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) // ... switch msg := msg.(type) { case *BlockRequestMessage: // ... }
其中的DecodeMessage(...)就是把傳入的二進(jìn)制數(shù)據(jù)反序列化成一個BlockchainMessage對象,該對象是一個沒有任何內(nèi)容的interface,它有多種實現(xiàn)類型。我們在后面繼續(xù)對該對象進(jìn)行判斷,如果它是BlockRequestMessage類型的信息,我們就會繼續(xù)做相應(yīng)的處理。處理的代碼我在這里暫時省略了,因為它是屬于下一個小問題的,我們先不考慮。
好像不知不覺我們就把第一個小問題的后半部分差不多搞清楚了。那么前半部分是什么?我們在前面說,讀取bufReader的代碼的起點是在MConnection.Start中,那么前半部分就是:比原從啟動開始中,是在什么情況下怎樣一步步走到MConnection.Start的呢?
好在前半部分的問題我們在前一篇文章《比原是如何把請求區(qū)塊數(shù)據(jù)的信息發(fā)出去的》中進(jìn)行了專門的討論,這里就不講了,有需要的話可以再過去看一下(可以先看最后“總結(jié)”那一小節(jié))。
下面我們進(jìn)入第二個小問題:
收到BlockRequestMessage后,將會給對方發(fā)送什么樣的信息?這里就是接著前面的ProtocolReactor.Receive繼續(xù)向下講了。首先我們再貼一下它的較完整的代碼:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) // ... switch msg := msg.(type) { case *BlockRequestMessage: var block *types.Block var err error if msg.Height != 0 { block, err = pr.chain.GetBlockByHeight(msg.Height) } else { block, err = pr.chain.GetBlockByHash(msg.GetHash()) } // ... response, err := NewBlockResponseMessage(block) // ... src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response}) // ... }
可以看到,邏輯還是比較簡單的,即根據(jù)對方發(fā)過來的BlockRequestMessage中指定的height或者hash信息,在本地的區(qū)塊鏈數(shù)據(jù)中找到相應(yīng)的block,組成BlockResponseMessage發(fā)過去就行了。
其中chain.GetBlockByHeight(...)和chain.GetBlockByHash(...)如果詳細(xì)說明的話,需要深刻理解區(qū)塊鏈數(shù)據(jù)在比原節(jié)點中是如何保存的,我們在本文先不講,等到后面專門研究。
在這里,我覺得我們只需要知道我們會查詢區(qū)塊數(shù)據(jù)并且構(gòu)造出一個BlockResponseMessage,再通過BlockchainChannel這個通道發(fā)送出去就可以了。
最后一句代碼中調(diào)用了src.TrySend方法,它是把信息向?qū)Ψ絧eer發(fā)送過去。(其中的src就是指的對方peer)
那么,它到底是怎么發(fā)送出去的呢?下面我們進(jìn)入最后一個小問題:
這個BlockResponseMessage信息是如何發(fā)送出去的?我們先看看peer.TrySend代碼:
p2p/peer.go#L242-L247
func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } return p.mconn.TrySend(chID, msg) }
它在內(nèi)部將會調(diào)用MConnection.TrySend方法,其中chID是BlockchainChannel,也就是它對應(yīng)的Reactor是ProtocolReactor。
再接著就是我們熟悉的MConnection.TrySend,由于它在前一篇文章中進(jìn)行了全面的講解,在本文就不提了,如果需要可以過去翻看一下。
那么今天的問題就算是解決啦。
到這里,我們總算能夠完整的理解清楚,當(dāng)我們向一個比原節(jié)點請求“區(qū)塊數(shù)據(jù)”,我們這邊需要怎么做,對方節(jié)點又需要怎么做了。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/24155.html
摘要:作者比原項目倉庫地址地址在前一篇中,我們說到,當(dāng)比原向其它節(jié)點請求區(qū)塊數(shù)據(jù)時,會發(fā)送一個把需要的區(qū)塊告訴對方,并把該信息對應(yīng)的二進(jìn)制數(shù)據(jù)放入對應(yīng)的通道中,等待發(fā)送。這個就是真正與連接對象綁定的一個緩存區(qū),寫入到它里面的數(shù)據(jù),會被發(fā)送出去。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https:...
摘要:作者比原項目倉庫地址地址在前一篇中,我們已經(jīng)知道如何連上一個比原節(jié)點的端口,并與對方完成身份驗證。代碼如下可以看到,首先是從眾多的中,找到最合適的那個。到這里,我們其實已經(jīng)知道比原是如何向其它節(jié)點請求區(qū)塊數(shù)據(jù),以及何時把信息發(fā)送出去。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://...
摘要:如果傳的是,就會在內(nèi)部使用默認(rèn)的隨機(jī)數(shù)生成器生成隨機(jī)數(shù)并生成密鑰。使用的是,生成的是一個形如這樣的全球唯一的隨機(jī)數(shù)把密鑰以文件形式保存在硬盤上。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在前一篇,我們探討了從瀏覽器的dashb...
摘要:所以本文本來是想去研究一下,當(dāng)別的節(jié)點把區(qū)塊數(shù)據(jù)發(fā)給我們之后,我們應(yīng)該怎么處理,現(xiàn)在換成研究比原的是怎么做出來的。進(jìn)去后會看到大量的與相關(guān)的配置。它的功能主要是為了在訪問與的函數(shù)之間增加了一層轉(zhuǎn)換。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlo...
摘要:所以這個文章系列叫作剝開比原看代碼。所以我的問題是比原初始化時,產(chǎn)生了什么樣的配置文件,放在了哪個目錄下下面我將結(jié)合源代碼,來回答這個問題。將用來確認(rèn)數(shù)據(jù)目錄是有效的,并且將根據(jù)傳入的不同,來生成不同的內(nèi)容寫入到配置文件中。 作者:freewind 比原項目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee...
閱讀 1489·2021-11-24 09:38
閱讀 3409·2021-11-22 12:03
閱讀 4474·2021-11-11 10:59
閱讀 2626·2021-09-28 09:36
閱讀 1182·2021-09-09 09:32
閱讀 3658·2021-08-05 10:00
閱讀 2706·2021-07-23 15:30
閱讀 3133·2019-08-30 13:12