参考:btcd
- btc在p2p网络中与每一个节点的连接都视为一个peer对象,与该节点的消息交换都是通过该peer进行。本文主要分析peer对象的创建以及wire协议消息的收发机制。对应btcd中package peer,该package主要提供了与其他节点连接建立之后peer的创建,协商,消息处理,以及close功能。
一、创建peer对象
peer具体分为两种:
- inbound 当connmanager监听到其他节点的请求时创建
- outbound 当connmanager主动连接其他节点时创建
- 两者在创建时都使用了newPeerBase()函数,但是outbound 增加了对addr和na属性的初始化
// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
// processing incoming and outgoing messages.
func NewInboundPeer(cfg *Config) *Peer {
return newPeerBase(cfg, true)
}
// NewOutboundPeer returns a new outbound bitcoin peer.
func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
p := newPeerBase(cfg, false)
p.addr = addr
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, err
}
if cfg.HostToNetAddress != nil {
na, err := cfg.HostToNetAddress(host, uint16(port), 0)
if err != nil {
return nil, err
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}
return p, nil
}
// newPeerBase returns a new base bitcoin peer based on the inbound flag. This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers.
func newPeerBase(origCfg *Config, inbound bool) *Peer {
// Default to the max supported protocol version if not specified by the
// caller.
cfg := *origCfg // Copy to avoid mutating caller.
if cfg.ProtocolVersion == 0 {
cfg.ProtocolVersion = MaxProtocolVersion
}
// Set the chain parameters to testnet if the caller did not specify any.
if cfg.ChainParams == nil {
cfg.ChainParams = &chaincfg.TestNet3Params
}
// Set the trickle interval if a non-positive value is specified.
if cfg.TrickleInterval <= 0 {
cfg.TrickleInterval = DefaultTrickleInterval
}
p := Peer{
inbound: inbound,
wireEncoding: wire.BaseEncoding,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
}
return &p
}
二、启动peer
启动顺序
- 通过versionMsg交换协议版本,进行protocol version判断(如果是outbound先发送自己的version再读取对方的,如果是inbound则反之)
- 如果version握手成功,启动其他handler goroutine
- p.stallHandler() 处理消息超时
- p.inHandler() 处理接收的消息
- p.queueHandler() 处理消息发送队列
- p.outHandler() 处理发送的消息
- p.pingHandler() 发送周期心跳
- 最后发送verAck确认,完成握手
// start begins processing input and output messages.
func (p *Peer) start() error {
log.Tracef("Starting peer %s", p)
negotiateErr := make(chan error, 1)
go func() {
if p.inbound {
negotiateErr <- p.negotiateInboundProtocol()
} else {
negotiateErr <- p.negotiateOutboundProtocol()
}
}()
// Negotiate the protocol within the specified negotiateTimeout.
select {
case err := <-negotiateErr:
if err != nil {
p.Disconnect()
return err
}
case <-time.After(negotiateTimeout):
p.Disconnect()
return errors.New("protocol negotiation timeout")
}
log.Debugf("Connected to %s", p.Addr())
// The protocol has been negotiated successfully so start processing input
// and output messages.
go p.stallHandler()
go p.inHandler()
go p.queueHandler()
go p.outHandler()
go p.pingHandler()
// Send our verack message now that the IO processing machinery has started.
p.QueueMessage(wire.NewMsgVerAck(), nil)
return nil
}
三、交换协议version(inbound端)
主要流程:
- 读取接收的协议
- 发送自己的协议
检测协议过程:
- 检测nonce(防止跟自己建立连接)
- 填充peer的基础信息(block高度,id,代理、services,address等)
- 回调onVersion()方法
- 判断版本是否符合要求
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
// Notify and disconnect clients if the first message is not a version
// message.
msg, ok := remoteMsg.(*wire.MsgVersion)
if !ok {
reason := "a version message must precede all others"
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Negotiate the protocol version and set the services to what the remote
// peer advertised.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
p.services = msg.Services
p.flagsMtx.Unlock()
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Set the peer's ID, user agent, and potentially the flag which
// specifies the witness support is enabled.
p.flagsMtx.Lock()
p.id = atomic.AddInt32(&nodeCount, 1)
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
// Invoke the callback if specified.
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
if rejectMsg != nil {
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(rejectMsg.Reason)
}
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
// Send a reject message indicating the protocol version is
// obsolete and wait for the message to be sent before
// disconnecting.
reason := fmt.Sprintf("protocol version must be %d or greater",
MinAcceptableProtocolVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
return nil
}
发送消息
- 发送消息的直接入口是QueueMessage()方法
- outMesage 被发送到了outputQueue队列(带缓冲的channel)
- 然后再由queueHandler发给sendQueue
- outHandler 对sendQueue处理
- 发给 stallContol
- 发给peer
- stallhandler 对stallContol 处理
// QueueMessage adds the passed bitcoin message to the peer send queue.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}
// This function is safe for concurrent access.
func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
encoding wire.MessageEncoding) {
// Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected and *then* it drains the channels.
if !p.Connected() {
if doneChan != nil {
go func() {
doneChan <- struct{}{}
}()
}
return
}
p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
}
p := Peer{
inbound: inbound,
wireEncoding: wire.BaseEncoding,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
}
接收消息
- 通过inHandler,监听peer消息
- 向stallControl发送 sccReceiveMessage
- 向stallControl发送 sccHandlerStart
- 调用onXXX相应消息,并通过queueMessage发送结果
- 向stallContol 发送 sccHandlerDone
// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *Peer) inHandler() {
// The timer is stopped when a new message is received and reset after it
// is processed.
idleTimer := time.AfterFunc(idleTimeout, func() {
log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
p.Disconnect()
})
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
// Read a message and stop the idle timer as soon as the read
// is done. The timer is reset below for the next iteration if
// needed.
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
if err != nil {
// In order to allow regression tests with malformed messages, don't
// disconnect the peer when we're in regression test mode and the
// error is one of the allowed errors.
if p.isAllowedReadError(err) {
log.Errorf("Allowed test error from %s: %v", p, err)
idleTimer.Reset(idleTimeout)
continue
}
// Only log the error and send reject message if the
// local peer is not forcibly disconnecting and the
// remote peer has not disconnected.
if p.shouldHandleReadError(err) {
errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
if err != io.ErrUnexpectedEOF {
log.Errorf(errMsg)
}
// Push a reject message for the malformed message and wait for
// the message to be sent before disconnecting.
//
// NOTE: Ideally this would include the command in the header if
// at least that much of the message was valid, but that is not
// currently exposed by wire, so just used malformed for the
// command.
p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
true)
}
break out
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
// Handle each supported message type.
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:
// Limit to one version message per peer.
p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
"duplicate version message", nil, true)
break out
case *wire.MsgVerAck:
// No read lock is necessary because verAckReceived is not written
// to in any other goroutine.
if p.verAckReceived {
log.Infof("Already received 'verack' from peer %v -- "+
"disconnecting", p)
break out
}
p.flagsMtx.Lock()
p.verAckReceived = true
p.flagsMtx.Unlock()
if p.cfg.Listeners.OnVerAck != nil {
p.cfg.Listeners.OnVerAck(p, msg)
}
....
}
网友评论