美文网首页区块链技术探索
以太坊块同步源码分析

以太坊块同步源码分析

作者: hasika | 来源:发表于2018-10-13 10:45 被阅读16次

主要代码在ProtocolManager,同步方法以pm的Start方法开始

  • Start
    • pm.minedBroadcastLoop 订阅区块广播
      • pm.BroadcastBlock(ev.Block, true) 将区块信息同步给部分连接的节点
        • peer.AsyncSendNewBlock(block, td)
          • p.queuedProps <- &propEvent{block: block, td: td}: 向管道中插入数据
          • prop := <-p.queuedProps: 管道的另一头在peer.broadcast方法中
          • p.SendNewBlock(prop.block, prop.td),从上面的管道中拿到数据后,会执行这个方法
            • p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) 调用p2p的方法发送数据,传递参数是NewBlockMsg
        • handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockMsg
          • pm.fetcher.Enqueue(p.id, request.Block) 下载区块
            • f.inject <- op: 向管道中插入数据
          • loop方法中的f.inject <- op:管道接受数据 在Fetcher类中
            • f.enqueue(op.origin, op.block) 调用插入队列方法
            • f.queue.Push(op, -float32(block.NumberU64())) 将区块插入队列中,这里其实区块并没有插入到区块链中
          • 在loop方法循环代码块中,还有一段代码,是将f.queue队列中的区块插入到区块链中。
          • f.insert(op.origin, op.block) 将区块插入到区块链中
            • f.done <- hash 方法执行完后,会向done管道中插入数据, 通知区块插入成功
            • f.verifyHeader(block.Header() 验证区块头信息
              • engine.VerifyHeader(blockchain, header, true),调用共识模块的验证方法
            • go f.broadcastBlock(block, true) 区块同步给链接的节点
            • go f.broadcastBlock(block, false) 区块部分信息同步给链接的节点
          • hash := <-f.done f.loop方法中的done管道取到数据,说明该区块已经插入数据库,删除队列缓存
            • f.forgetHash(hash)
            • f.forgetBlock(hash)
      • pm.BroadcastBlock(ev.Block, false) 将区块部分信息同步给所有连接的节点
        • peer.AsyncSendNewBlockHash(block)
          • p.queuedAnns <- block
          • block := <-p.queuedAnns peer.broadcast方法中的管道接收到数据
          • p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()})只是将区块的hash和高度同步给连接的节点
          • p.SendNewBlockHashes 通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
        • handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockHashesMsg
        • pm.fetcher.Notify 通知fetcher有新的区块需要同步
        • f.notify <- block 向管道中插入数据
        • notification := <-f.notify ,f.loop方法中的对应管道收到信息
          • f.fetching[notification.hash] 说明该区块正在下载区块头,退出
          • f.completing[notification.hash] 说明该区块正在下载区块体,退出
          • f.announced[notification.hash] = append(f.announced[notification.hash], notification) 将区块插入到announced队列中,准备下载区块头
          • 将其他p2p节点同步过来的数据,插入队列后,什么时候开始同步呢?其实这里定义了定时器
        • <-fetchTimer.C f.loop方法中的定时器触发,用来同步区块头
          • fetchHeader(hash) 下载区块头
          • p2p.Send(p.rw, GetBlockHeadersMsg..)调用p2p方法下载区块头,消息类型:GetBlockHeadersMsg
          • p.SendBlockHeaders(headers),调用p2p方法传递区块头信息,这里会传递最新的几个区块头,p2p节点请求一个区块头,实际上这里会返回多个区块头回去。
          • p2p.Send(p.rw, BlockHeadersMsg, headers)
        • BlockHeadersMsg p2p的另一端
          • headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) 当返回的区块头只有一个时
            • f.headerFilter <- filter 向管道插入数据
              • f.fetched[hash] = append(f.fetched[hash], announce) 放入已经同步区块头的列表中,准备同步区块体
              • f.enqueue(announce.origin, block) 如果该区块只有区块头,就不用在同步区块体了,直接插入区块链中
        • completeTimer.C f.loop方法中的定时器,用来同步区块体
          • go f.completing[hashes[0]].fetchBodies(hashes) 调用peer的fetchBodies下载区块
          • p2p.Send(p.rw, GetBlockBodiesMsg, hashes)调用p2p方法真正下载区块,消息类型:GetBlockBodiesMsg
          • p.SendBlockBodiesRLP(bodies) 通过p2p网络将区块数据发送到请求节点
          • p2p.Send(p.rw, BlockBodiesMsg, bodies) 发送的消息类型:BlockBodiesMsg
          • BlockBodiesMsg handleMsg接收到区块数据后
          • pm.fetcher.FilterBodies 调用fetcher的方法校验区块数据
          • f.bodyFilter <- filter: 将数据插入到管道中
            • block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) 根据p2p传过来的数据,生成区块
            • f.enqueue(announce.origin, block) 将数据信息插入到区块链中
func (pm *ProtocolManager) Start(maxPeers int) {
   pm.maxPeers = maxPeers

   // broadcast transactions
   pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
   pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
   //交易广播
   go pm.txBroadcastLoop()

   // broadcast mined blocks
   //挖矿广播
   pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
   go pm.minedBroadcastLoop()

   // start sync handlers
   go pm.syncer()
   go pm.txsyncLoop()
}
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
   // automatically stops if unsubscribe
   for obj := range pm.minedBlockSub.Chan() {
      if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
         pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
         pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
      }
   }
}
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
   hash := block.Hash()
   peers := pm.peers.PeersWithoutBlock(hash)

   // If propagation is requested, send to a subset of the peer
   //参数propagate为true时
   if propagate {
         td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))

      // Send the block to a subset of our peers
      //将区块同步给一部分连接节点
      transfer := peers[:int(math.Sqrt(float64(len(peers))))]
      for _, peer := range transfer {
         peer.AsyncSendNewBlock(block, td)
      }
     
      return
   }
   // Otherwise if the block is indeed in out own chain, announce it
   // 参数propagate为false时
   if pm.blockchain.HasBlock(hash, block.NumberU64()) {
      //将区块一部分信息同步给所有的连接节点
      for _, peer := range peers {
         peer.AsyncSendNewBlockHash(block)
      }
   }
}
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
// the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
   select {
   //向管道中插入数据
   case p.queuedProps <- &propEvent{block: block, td: td}:
      p.knownBlocks.Add(block.Hash())
   default:
      p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
   }
}
// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
// 发现节点后,会注册节点,注册节点的时候,会调用这个方法,这个方法有一些管道
func (p *peer) broadcast() {
   for {
      select {
       //处理广播交易的管道
      case txs := <-p.queuedTxs:
         if err := p.SendTransactions(txs); err != nil {
            return
         }
      //处理广播区块的管道
      case prop := <-p.queuedProps:
         if err := p.SendNewBlock(prop.block, prop.td); err != nil {
            return
         }

      case block := <-p.queuedAnns:
         if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
            return
         }
   }
}
func (pm *ProtocolManager) handleMsg(p *peer) error {
    case msg.Code == NewBlockMsg:
        ...
        // 下载区块
        pm.fetcher.Enqueue(p.id, request.Block)
        //如果同步过来的难度值大于本地区块链中的难度值,同步区块链
            if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
                go pm.synchronise(p)
            }
        }
    //同步区块(hash和高度)
    case msg.Code == NewBlockHashesMsg:
        var announces newBlockHashesData
        if err := msg.Decode(&announces); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        }

        // Schedule all the unknown hashes for retrieval
        unknown := make(newBlockHashesData, 0, len(announces))
        for _, block := range announces {
            if !pm.blockchain.HasBlock(block.Hash, block.Number) {
                unknown = append(unknown, block)
            }
        }
        for _, block := range unknown {
            //通知fetcher有新的区块需要同步
            pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
        }

    // Block header query, collect the requested headers and reply
    // 同步区块头,并且同步多个
    case msg.Code == GetBlockHeadersMsg:

        // Gather headers until the fetch or network limits is reached
        var (
            bytes   common.StorageSize
            headers []*types.Header
            unknown bool
        )
        //以p2p网络传过来的头信息,查找区块链上的最新的区块信息,一次同步过去
        // p2p传递过来的高度是10000,那么如果当前节点有10001,10002,这些节点也会传送过去
        for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
            // Retrieve the next header satisfying the query
            var origin *types.Header

            origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)

            headers = append(headers, origin)
            bytes += estHeaderRlpSize

            // Advance to the next header of the query
            switch {
        
            case hashMode && !query.Reverse:
                // Hash based traversal towards the leaf block
                var (
                    current = origin.Number.Uint64()
                    next    = current + query.Skip + 1
                )
                if next <= current {
                    infos, _ := json.MarshalIndent(p.Peer.Info(), "", "  ")
                    p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
                    unknown = true
                } else {
                    if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
                        nextHash := header.Hash()
                        expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
                        if expOldHash == query.Origin.Hash {
                            query.Origin.Hash, query.Origin.Number = nextHash, next
                        } else {
                            unknown = true
                        }
                    } else {
                        unknown = true
                    }
                }
        }
        return p.SendBlockHeaders(headers)
            
      //拿到从其他节点返回的区块头信息
      case msg.Code == BlockHeadersMsg:
        // A batch of headers arrived to one of our previous requests
        var headers []*types.Header
        if err := msg.Decode(&headers); err != nil {
            return errResp(ErrDecode, "msg %v: %v", msg, err)
        }
        
        // Filter out any explicitly requested headers, deliver the rest to the downloader
        filter := len(headers) == 1
        if filter {
            //如果只有一个区块头信息返回
            headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
        }
        if len(headers) > 0 || !filter {
            //如果有多个区块头信息返回
            err := pm.downloader.DeliverHeaders(p.id, headers)
        }
            
      //获取区块体请求      
      case msg.Code == GetBlockBodiesMsg:
        // Decode the retrieval message
        msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
        if _, err := msgStream.List(); err != nil {
            return err
        }
        // Gather blocks until the fetch or network limits is reached
        var (
            hash   common.Hash
            bytes  int
            bodies []rlp.RawValue
        )
        for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
            // Retrieve the hash of the next block
            if err := msgStream.Decode(&hash); err == rlp.EOL {
                break
            } else if err != nil {
                return errResp(ErrDecode, "msg %v: %v", msg, err)
            }
            // Retrieve the requested block body, stopping if enough was found
            if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
                //将bodies信息放在数组中
                bodies = append(bodies, data)
                bytes += len(data)
            }
        }
        // 通过p2p网络发送区块数据
        return p.SendBlockBodiesRLP(bodies)
            
     //节点接收到区块数据   
     case msg.Code == BlockBodiesMsg:
        //调用方法校验区块体,
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, 
        
}
// Enqueue tries to fill gaps the fetcher's future import queue.
//p2p网络接收到NewBlockMsg消息时,调用这个方法
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
   op := &inject{
      origin: peer,
      block:  block,
   }
   select {
       //向管道中插入数据
   case f.inject <- op:
      return nil
   case <-f.quit:
      return errTerminated
   }
}
func (f *Fetcher) loop() {
    for{
        //区块链目前的高度
        height := f.chainHeight()
        for !f.queue.Empty() {
            op := f.queue.PopItem().(*inject)
            // If too high up the chain or phase, continue later
            number := op.block.NumberU64()
            // 如果排队的区块高度,大于区块链高度+1,说明暂时还不需要插入到区块中,等待合适的区块插入后,再把这个区块插入
            if number > height+1 {
                f.queue.Push(op, -float32(number))
                if f.queueChangeHook != nil {
                    f.queueChangeHook(hash, true)
                }
                break
            }
            f.insert(op.origin, op.block)
        
            // Wait for an outside event to occur
            select {
                //从管道中取到p2p发送过来的区块
                case op := <-f.inject:
                // A direct block insertion was requested, try and fill any pending gaps
                propBroadcastInMeter.Mark(1)
                f.enqueue(op.origin, op.block)
                
                case hash := <-f.done:
                // A pending import finished, remove all traces of the notification
                f.forgetHash(hash)
                f.forgetBlock(hash)
                
                // 得到新块消息时
        case notification := <-f.notify:
            // All is well, schedule the announce if block's not yet downloading
            //正在下载头
            if _, ok := f.fetching[notification.hash]; ok {
                break
            }

            //区块头已经下载,正在下载区块体
            if _, ok := f.completing[notification.hash]; ok {
                break
            }

            //每个节点请求的数量
            f.announces[notification.origin] = count

            //准备下载头
            f.announced[notification.hash] = append(f.announced[notification.hash], notification)
                
        case <-fetchTimer.C:
            // At least one block's timer ran out, check for needing retrieval
            request := make(map[string][]common.Hash)

            for hash, announces := range f.announced {
                    if f.getBlock(hash) == nil {
                        //将没有本地没有的区块,插入到request队列中
                        request[announce.origin] = append(request[announce.origin], hash)                   
                        //在正在下载的队列中保存该announce
                        f.fetching[hash] = announce
                    }
            
            }
            // Send out all block header requests
            for peer, hashes := range request {

                // Create a closure of the fetch and schedule in on a new thread
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
                go func() {
                    for _, hash := range hashes {
                        //下载区块头
                        fetchHeader(hash) // Suboptimal, but protocol doesn't allow 
                    }
        }
    }
                
     // 从p2p节点拿到区块头信息            
     case filter := <-f.headerFilter:

            unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
            for _, header := range task.headers {
                hash := header.Hash()

                        // 该头信息对应的区块是空块
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {

                            block := types.NewBlockWithHeader(header)
                            block.ReceivedAt = task.time
                            //标记已经完成同步
                            complete = append(complete, block)
                            f.completing[hash] = announce
                            continue
                        }
                        // Otherwise add to the list of blocks needing completion
                        // 还没有完成同步,需要同步区块体
                        incomplete = append(incomplete, announce)
                    } 
            }
            // Schedule the retrieved headers for body completion
            for _, announce := range incomplete {
                hash := announce.header.Hash()
                // 已经在同步区块体了,返回
                if _, ok := f.completing[hash]; ok {
                    continue
                }
                //已经同步了区块头,准备同步区块体
                f.fetched[hash] = append(f.fetched[hash], announce)
                if len(f.fetched) == 1 {
                    f.rescheduleComplete(completeTimer)
                }
            }
            // Schedule the header-only blocks for import
            // 将只有区块头的区块,插入到区块链中
            for _, block := range complete {
                if announce := f.completing[block.Hash()]; announce != nil {
                    f.enqueue(announce.origin, block)
                }
            }
            //同步区块体的定时器
            case <-completeTimer.C:
            // At least one header's timer ran out, retrieve everything
            request := make(map[string][]common.Hash)

            for hash, announces := range f.fetched {
                // Pick a random peer to retrieve from, reset all others
                announce := announces[rand.Intn(len(announces))]
                f.forgetHash(hash)

                // If the block still didn't arrive, queue for completion
                if f.getBlock(hash) == nil {
                    //将需要下载的区块信息存入数组中
                    request[announce.origin] = append(request[announce.origin], hash)
                    f.completing[hash] = announce
                }
            }
            // Send out all block body requests
            for peer, hashes := range request {
                log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)

                //调用peer的下载方法,下载区块
                go f.completing[hashes[0]].fetchBodies(hashes)
            }
            
     // filter从管道中拿到数据
     case filter := <-f.bodyFilter:
            var task *bodyFilterTask
            
            blocks := []*types.Block{}
            for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
                // Match up a body to any possible completion request
                matched := false
            // 根据p2p网络传过来的数据,生成区块
            block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])

            }
            // 将区块插入区块链中
            for _, block := range blocks {
                if announce := f.completing[block.Hash()]; announce != nil {
                    f.enqueue(announce.origin, block)
                }
            }
        }

}
func (f *Fetcher) enqueue(peer string, block *types.Block) {
   hash := block.Hash()

   // Ensure the peer isn't DOSing us
   count := f.queues[peer] + 1

   if _, ok := f.queued[hash]; !ok {
      op := &inject{
         origin: peer,
         block:  block,
      }
      f.queues[peer] = count
      f.queued[hash] = op
       //将区块加入到队列中
      f.queue.Push(op, -float32(block.NumberU64()))
   }
}
//将f.queue队列中的区块插入到区块链中
func (f *Fetcher) insert(peer string, block *types.Block) {
   hash := block.Hash()

   go func() {
       //方法执行完后,会向f.done管道插入数据
      defer func() { f.done <- hash }()

      // Quickly validate the header and propagate the block if it passes
       //校验区块头是否合法
      switch err := f.verifyHeader(block.Header()); err {
      //如果没有错误,说明是区块头是合法的
      case nil: 
         // 向周围节点广播区块
         go f.broadcastBlock(block, true)
      }
      // Run the actual import and log any issues
      // 真正将区块插入到区块链中
      if _, err := f.insertChain(types.Blocks{block}); err != nil {
         return
      }
        // 广播区块部分信心到其他节点
      go f.broadcastBlock(block, false)
   }()
}
//同步区块部分信息
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
   select {
   //向管道中插入数据
   case p.queuedAnns <- block:
      p.knownBlocks.Add(block.Hash())
   default:
      p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
   }
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
   for _, hash := range hashes {
      p.knownBlocks.Add(hash)
   }
   request := make(newBlockHashesData, len(hashes))
   for i := 0; i < len(hashes); i++ {
      request[i].Hash = hashes[i]
      request[i].Number = numbers[i]
   }
    //通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
   return p2p.Send(p.rw, NewBlockHashesMsg, request)
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
   headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
   block := &announce{
      hash:        hash,
      number:      number,
      time:        time,
      origin:      peer,
      fetchHeader: headerFetcher,
      fetchBodies: bodyFetcher,
   }
   select {
       //向管道中插入数据
   case f.notify <- block:
      return nil
   case <-f.quit:
      return errTerminated
   }
}
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
   log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

   // Send the filter channel to the fetcher
   filter := make(chan *headerFilterTask)

   select {
       //向f.headerFilter管道插入数据
   case f.headerFilter <- filter:
   case <-f.quit:
      return nil
   }
   // Request the filtering of the header list
   select {
   case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:

}
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
   select {
       //向管道中写入数据
   case destCh <- packet:
      return nil
   case <-cancel:
      return errNoSyncActive
   }
}
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {


   // Send the filter channel to the fetcher
   filter := make(chan *bodyFilterTask)

   select {
    //将数据插入到f.bodyFilter管道中
   case f.bodyFilter <- filter:
   case <-f.quit:
      return nil, nil
   }
   // Request the filtering of the body list
   select {
   case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
   case <-f.quit:
      return nil, nil
  
   }
}

https://t.zsxq.com/iiMvfea

我的星球.jpg

相关文章

网友评论

    本文标题:以太坊块同步源码分析

    本文链接:https://www.haomeiwen.com/subject/sdqoaftx.html