美文网首页区块链研习社区块链研究
btcd 源码分析系列:4 - p2p网络的peer

btcd 源码分析系列:4 - p2p网络的peer

作者: tpkeeper | 来源:发表于2019-09-28 08:45 被阅读0次

参考:btcd

  • btc在p2p网络中与每一个节点的连接都视为一个peer对象,与该节点的消息交换都是通过该peer进行。本文主要分析peer对象的创建以及wire协议消息的收发机制。对应btcd中package peer,该package主要提供了与其他节点连接建立之后peer的创建,协商,消息处理,以及close功能。

一、创建peer对象

peer具体分为两种:

  • inbound 当connmanager监听到其他节点的请求时创建
  • outbound 当connmanager主动连接其他节点时创建
  • 两者在创建时都使用了newPeerBase()函数,但是outbound 增加了对addr和na属性的初始化
// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
// processing incoming and outgoing messages.
func NewInboundPeer(cfg *Config) *Peer {
    return newPeerBase(cfg, true)
}

// NewOutboundPeer returns a new outbound bitcoin peer.
func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
    p := newPeerBase(cfg, false)
    p.addr = addr

    host, portStr, err := net.SplitHostPort(addr)
    if err != nil {
        return nil, err
    }

    port, err := strconv.ParseUint(portStr, 10, 16)
    if err != nil {
        return nil, err
    }

    if cfg.HostToNetAddress != nil {
        na, err := cfg.HostToNetAddress(host, uint16(port), 0)
        if err != nil {
            return nil, err
        }
        p.na = na
    } else {
        p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
    }

    return p, nil
}


// newPeerBase returns a new base bitcoin peer based on the inbound flag.  This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers.
func newPeerBase(origCfg *Config, inbound bool) *Peer {
    // Default to the max supported protocol version if not specified by the
    // caller.
    cfg := *origCfg // Copy to avoid mutating caller.
    if cfg.ProtocolVersion == 0 {
        cfg.ProtocolVersion = MaxProtocolVersion
    }

    // Set the chain parameters to testnet if the caller did not specify any.
    if cfg.ChainParams == nil {
        cfg.ChainParams = &chaincfg.TestNet3Params
    }

    // Set the trickle interval if a non-positive value is specified.
    if cfg.TrickleInterval <= 0 {
        cfg.TrickleInterval = DefaultTrickleInterval
    }

    p := Peer{
        inbound:         inbound,
        wireEncoding:    wire.BaseEncoding,
        knownInventory:  newMruInventoryMap(maxKnownInventory),
        stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
        outputQueue:     make(chan outMsg, outputBufferSize),
        sendQueue:       make(chan outMsg, 1),   // nonblocking sync
        sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
        outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
        inQuit:          make(chan struct{}),
        queueQuit:       make(chan struct{}),
        outQuit:         make(chan struct{}),
        quit:            make(chan struct{}),
        cfg:             cfg, // Copy so caller can't mutate.
        services:        cfg.Services,
        protocolVersion: cfg.ProtocolVersion,
    }
    return &p
}

二、启动peer

启动顺序

  1. 通过versionMsg交换协议版本,进行protocol version判断(如果是outbound先发送自己的version再读取对方的,如果是inbound则反之)
  2. 如果version握手成功,启动其他handler goroutine
    • p.stallHandler() 处理消息超时
    • p.inHandler() 处理接收的消息
    • p.queueHandler() 处理消息发送队列
    • p.outHandler() 处理发送的消息
    • p.pingHandler() 发送周期心跳
  3. 最后发送verAck确认,完成握手
// start begins processing input and output messages.
func (p *Peer) start() error {
    log.Tracef("Starting peer %s", p)

    negotiateErr := make(chan error, 1)
    go func() {
        if p.inbound {
            negotiateErr <- p.negotiateInboundProtocol()
        } else {
            negotiateErr <- p.negotiateOutboundProtocol()
        }
    }()

    // Negotiate the protocol within the specified negotiateTimeout.
    select {
    case err := <-negotiateErr:
        if err != nil {
            p.Disconnect()
            return err
        }
    case <-time.After(negotiateTimeout):
        p.Disconnect()
        return errors.New("protocol negotiation timeout")
    }
    log.Debugf("Connected to %s", p.Addr())

    // The protocol has been negotiated successfully so start processing input
    // and output messages.
    go p.stallHandler()
    go p.inHandler()
    go p.queueHandler()
    go p.outHandler()
    go p.pingHandler()

    // Send our verack message now that the IO processing machinery has started.
    p.QueueMessage(wire.NewMsgVerAck(), nil)
    return nil
}

三、交换协议version(inbound端)

主要流程:

  • 读取接收的协议
  • 发送自己的协议

检测协议过程:

  • 检测nonce(防止跟自己建立连接)
  • 填充peer的基础信息(block高度,id,代理、services,address等)
  • 回调onVersion()方法
  • 判断版本是否符合要求
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
    if err := p.readRemoteVersionMsg(); err != nil {
        return err
    }

    return p.writeLocalVersionMsg()
}


// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer.  If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
    // Read their version message.
    remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
    if err != nil {
        return err
    }

    // Notify and disconnect clients if the first message is not a version
    // message.
    msg, ok := remoteMsg.(*wire.MsgVersion)
    if !ok {
        reason := "a version message must precede all others"
        rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
            reason)
        _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
        return errors.New(reason)
    }

    // Detect self connections.
    if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
        return errors.New("disconnecting peer connected to self")
    }

    // Negotiate the protocol version and set the services to what the remote
    // peer advertised.
    p.flagsMtx.Lock()
    p.advertisedProtoVer = uint32(msg.ProtocolVersion)
    p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
    p.versionKnown = true
    p.services = msg.Services
    p.flagsMtx.Unlock()
    log.Debugf("Negotiated protocol version %d for peer %s",
        p.protocolVersion, p)

    // Updating a bunch of stats including block based stats, and the
    // peer's time offset.
    p.statsMtx.Lock()
    p.lastBlock = msg.LastBlock
    p.startingHeight = msg.LastBlock
    p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
    p.statsMtx.Unlock()

    // Set the peer's ID, user agent, and potentially the flag which
    // specifies the witness support is enabled.
    p.flagsMtx.Lock()
    p.id = atomic.AddInt32(&nodeCount, 1)
    p.userAgent = msg.UserAgent

    // Determine if the peer would like to receive witness data with
    // transactions, or not.
    if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
        p.witnessEnabled = true
    }
    p.flagsMtx.Unlock()

    // Once the version message has been exchanged, we're able to determine
    // if this peer knows how to encode witness data over the wire
    // protocol. If so, then we'll switch to a decoding mode which is
    // prepared for the new transaction format introduced as part of
    // BIP0144.
    if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
        p.wireEncoding = wire.WitnessEncoding
    }

    // Invoke the callback if specified.
    if p.cfg.Listeners.OnVersion != nil {
        rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
        if rejectMsg != nil {
            _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
            return errors.New(rejectMsg.Reason)
        }
    }

    // Notify and disconnect clients that have a protocol version that is
    // too old.
    //
    // NOTE: If minAcceptableProtocolVersion is raised to be higher than
    // wire.RejectVersion, this should send a reject packet before
    // disconnecting.
    if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
        // Send a reject message indicating the protocol version is
        // obsolete and wait for the message to be sent before
        // disconnecting.
        reason := fmt.Sprintf("protocol version must be %d or greater",
            MinAcceptableProtocolVersion)
        rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
            reason)
        _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
        return errors.New(reason)
    }

    return nil
}

发送消息

  • 发送消息的直接入口是QueueMessage()方法
  • outMesage 被发送到了outputQueue队列(带缓冲的channel)
  • 然后再由queueHandler发给sendQueue
  • outHandler 对sendQueue处理
    • 发给 stallContol
    • 发给peer
  • stallhandler 对stallContol 处理
// QueueMessage adds the passed bitcoin message to the peer send queue.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
    p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}

// This function is safe for concurrent access.
func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
    encoding wire.MessageEncoding) {

    // Avoid risk of deadlock if goroutine already exited.  The goroutine
    // we will be sending to hangs around until it knows for a fact that
    // it is marked as disconnected and *then* it drains the channels.
    if !p.Connected() {
        if doneChan != nil {
            go func() {
                doneChan <- struct{}{}
            }()
        }
        return
    }
    p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
}
p := Peer{
        inbound:         inbound,
        wireEncoding:    wire.BaseEncoding,
        knownInventory:  newMruInventoryMap(maxKnownInventory),
        stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
        outputQueue:     make(chan outMsg, outputBufferSize),
        sendQueue:       make(chan outMsg, 1),   // nonblocking sync
        sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
        outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
        inQuit:          make(chan struct{}),
        queueQuit:       make(chan struct{}),
        outQuit:         make(chan struct{}),
        quit:            make(chan struct{}),
        cfg:             cfg, // Copy so caller can't mutate.
        services:        cfg.Services,
        protocolVersion: cfg.ProtocolVersion,
    }

接收消息

  • 通过inHandler,监听peer消息
  • 向stallControl发送 sccReceiveMessage
  • 向stallControl发送 sccHandlerStart
  • 调用onXXX相应消息,并通过queueMessage发送结果
  • 向stallContol 发送 sccHandlerDone
// inHandler handles all incoming messages for the peer.  It must be run as a
// goroutine.
func (p *Peer) inHandler() {
    // The timer is stopped when a new message is received and reset after it
    // is processed.
    idleTimer := time.AfterFunc(idleTimeout, func() {
        log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
        p.Disconnect()
    })

out:
    for atomic.LoadInt32(&p.disconnect) == 0 {
        // Read a message and stop the idle timer as soon as the read
        // is done.  The timer is reset below for the next iteration if
        // needed.
        rmsg, buf, err := p.readMessage(p.wireEncoding)
        idleTimer.Stop()
        if err != nil {
            // In order to allow regression tests with malformed messages, don't
            // disconnect the peer when we're in regression test mode and the
            // error is one of the allowed errors.
            if p.isAllowedReadError(err) {
                log.Errorf("Allowed test error from %s: %v", p, err)
                idleTimer.Reset(idleTimeout)
                continue
            }

            // Only log the error and send reject message if the
            // local peer is not forcibly disconnecting and the
            // remote peer has not disconnected.
            if p.shouldHandleReadError(err) {
                errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
                if err != io.ErrUnexpectedEOF {
                    log.Errorf(errMsg)
                }

                // Push a reject message for the malformed message and wait for
                // the message to be sent before disconnecting.
                //
                // NOTE: Ideally this would include the command in the header if
                // at least that much of the message was valid, but that is not
                // currently exposed by wire, so just used malformed for the
                // command.
                p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
                    true)
            }
            break out
        }
        atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
        p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

        // Handle each supported message type.
        p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
        switch msg := rmsg.(type) {
        case *wire.MsgVersion:
            // Limit to one version message per peer.
            p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
                "duplicate version message", nil, true)
            break out

        case *wire.MsgVerAck:

            // No read lock is necessary because verAckReceived is not written
            // to in any other goroutine.
            if p.verAckReceived {
                log.Infof("Already received 'verack' from peer %v -- "+
                    "disconnecting", p)
                break out
            }
            p.flagsMtx.Lock()
            p.verAckReceived = true
            p.flagsMtx.Unlock()
            if p.cfg.Listeners.OnVerAck != nil {
                p.cfg.Listeners.OnVerAck(p, msg)
            }
            
            
            ....
}

相关文章

网友评论

    本文标题:btcd 源码分析系列:4 - p2p网络的peer

    本文链接:https://www.haomeiwen.com/subject/rcuhyctx.html