主要代码在ProtocolManager,同步方法以pm的Start方法开始
- Start
- pm.minedBroadcastLoop 订阅区块广播
- pm.BroadcastBlock(ev.Block, true) 将区块信息同步给部分连接的节点
- peer.AsyncSendNewBlock(block, td)
- p.queuedProps <- &propEvent{block: block, td: td}: 向管道中插入数据
- prop := <-p.queuedProps: 管道的另一头在peer.broadcast方法中
- p.SendNewBlock(prop.block, prop.td),从上面的管道中拿到数据后,会执行这个方法
- p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) 调用p2p的方法发送数据,传递参数是NewBlockMsg
- handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockMsg
- pm.fetcher.Enqueue(p.id, request.Block) 下载区块
- f.inject <- op: 向管道中插入数据
- loop方法中的f.inject <- op:管道接受数据 在Fetcher类中
- f.enqueue(op.origin, op.block) 调用插入队列方法
- f.queue.Push(op, -float32(block.NumberU64())) 将区块插入队列中,这里其实区块并没有插入到区块链中
- 在loop方法循环代码块中,还有一段代码,是将f.queue队列中的区块插入到区块链中。
- f.insert(op.origin, op.block) 将区块插入到区块链中
- f.done <- hash 方法执行完后,会向done管道中插入数据, 通知区块插入成功
- f.verifyHeader(block.Header() 验证区块头信息
- engine.VerifyHeader(blockchain, header, true),调用共识模块的验证方法
- go f.broadcastBlock(block, true) 区块同步给链接的节点
- go f.broadcastBlock(block, false) 区块部分信息同步给链接的节点
- hash := <-f.done f.loop方法中的done管道取到数据,说明该区块已经插入数据库,删除队列缓存
- f.forgetHash(hash)
- f.forgetBlock(hash)
- pm.fetcher.Enqueue(p.id, request.Block) 下载区块
- peer.AsyncSendNewBlock(block, td)
- pm.BroadcastBlock(ev.Block, false) 将区块部分信息同步给所有连接的节点
- peer.AsyncSendNewBlockHash(block)
- p.queuedAnns <- block
- block := <-p.queuedAnns peer.broadcast方法中的管道接收到数据
- p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()})只是将区块的hash和高度同步给连接的节点
- p.SendNewBlockHashes 通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
- handleMsg p2p的另一端,pm的handleMsg接受消息,消息类型是NewBlockHashesMsg
- pm.fetcher.Notify 通知fetcher有新的区块需要同步
- f.notify <- block 向管道中插入数据
- notification := <-f.notify ,f.loop方法中的对应管道收到信息
- f.fetching[notification.hash] 说明该区块正在下载区块头,退出
- f.completing[notification.hash] 说明该区块正在下载区块体,退出
- f.announced[notification.hash] = append(f.announced[notification.hash], notification) 将区块插入到announced队列中,准备下载区块头
- 将其他p2p节点同步过来的数据,插入队列后,什么时候开始同步呢?其实这里定义了定时器
- <-fetchTimer.C f.loop方法中的定时器触发,用来同步区块头
- fetchHeader(hash) 下载区块头
- p2p.Send(p.rw, GetBlockHeadersMsg..)调用p2p方法下载区块头,消息类型:GetBlockHeadersMsg
- p.SendBlockHeaders(headers),调用p2p方法传递区块头信息,这里会传递最新的几个区块头,p2p节点请求一个区块头,实际上这里会返回多个区块头回去。
- p2p.Send(p.rw, BlockHeadersMsg, headers)
- BlockHeadersMsg p2p的另一端
- headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) 当返回的区块头只有一个时
- f.headerFilter <- filter 向管道插入数据
- f.fetched[hash] = append(f.fetched[hash], announce) 放入已经同步区块头的列表中,准备同步区块体
- f.enqueue(announce.origin, block) 如果该区块只有区块头,就不用在同步区块体了,直接插入区块链中
- f.headerFilter <- filter 向管道插入数据
- headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) 当返回的区块头只有一个时
- completeTimer.C f.loop方法中的定时器,用来同步区块体
- go f.completing[hashes[0]].fetchBodies(hashes) 调用peer的fetchBodies下载区块
- p2p.Send(p.rw, GetBlockBodiesMsg, hashes)调用p2p方法真正下载区块,消息类型:GetBlockBodiesMsg
- p.SendBlockBodiesRLP(bodies) 通过p2p网络将区块数据发送到请求节点
- p2p.Send(p.rw, BlockBodiesMsg, bodies) 发送的消息类型:BlockBodiesMsg
- BlockBodiesMsg handleMsg接收到区块数据后
- pm.fetcher.FilterBodies 调用fetcher的方法校验区块数据
- f.bodyFilter <- filter: 将数据插入到管道中
- block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) 根据p2p传过来的数据,生成区块
- f.enqueue(announce.origin, block) 将数据信息插入到区块链中
- peer.AsyncSendNewBlockHash(block)
- pm.BroadcastBlock(ev.Block, true) 将区块信息同步给部分连接的节点
- pm.minedBroadcastLoop 订阅区块广播
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
//交易广播
go pm.txBroadcastLoop()
// broadcast mined blocks
//挖矿广播
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range pm.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
}
}
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer
//参数propagate为true时
if propagate {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
// Send the block to a subset of our peers
//将区块同步给一部分连接节点
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
return
}
// Otherwise if the block is indeed in out own chain, announce it
// 参数propagate为false时
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
//将区块一部分信息同步给所有的连接节点
for _, peer := range peers {
peer.AsyncSendNewBlockHash(block)
}
}
}
// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
// the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
//向管道中插入数据
case p.queuedProps <- &propEvent{block: block, td: td}:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
}
}
// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
// 发现节点后,会注册节点,注册节点的时候,会调用这个方法,这个方法有一些管道
func (p *peer) broadcast() {
for {
select {
//处理广播交易的管道
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {
return
}
//处理广播区块的管道
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
case block := <-p.queuedAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
return
}
}
}
func (pm *ProtocolManager) handleMsg(p *peer) error {
case msg.Code == NewBlockMsg:
...
// 下载区块
pm.fetcher.Enqueue(p.id, request.Block)
//如果同步过来的难度值大于本地区块链中的难度值,同步区块链
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}
//同步区块(hash和高度)
case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
//通知fetcher有新的区块需要同步
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
// Block header query, collect the requested headers and reply
// 同步区块头,并且同步多个
case msg.Code == GetBlockHeadersMsg:
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
//以p2p网络传过来的头信息,查找区块链上的最新的区块信息,一次同步过去
// p2p传递过来的高度是10000,那么如果当前节点有10001,10002,这些节点也会传送过去
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
// Retrieve the next header satisfying the query
var origin *types.Header
origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
headers = append(headers, origin)
bytes += estHeaderRlpSize
// Advance to the next header of the query
switch {
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
}
return p.SendBlockHeaders(headers)
//拿到从其他节点返回的区块头信息
case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
//如果只有一个区块头信息返回
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
//如果有多个区块头信息返回
err := pm.downloader.DeliverHeaders(p.id, headers)
}
//获取区块体请求
case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
bytes int
bodies []rlp.RawValue
)
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block body, stopping if enough was found
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
//将bodies信息放在数组中
bodies = append(bodies, data)
bytes += len(data)
}
}
// 通过p2p网络发送区块数据
return p.SendBlockBodiesRLP(bodies)
//节点接收到区块数据
case msg.Code == BlockBodiesMsg:
//调用方法校验区块体,
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles,
}
// Enqueue tries to fill gaps the fetcher's future import queue.
//p2p网络接收到NewBlockMsg消息时,调用这个方法
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
op := &inject{
origin: peer,
block: block,
}
select {
//向管道中插入数据
case f.inject <- op:
return nil
case <-f.quit:
return errTerminated
}
}
func (f *Fetcher) loop() {
for{
//区块链目前的高度
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*inject)
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
// 如果排队的区块高度,大于区块链高度+1,说明暂时还不需要插入到区块中,等待合适的区块插入后,再把这个区块插入
if number > height+1 {
f.queue.Push(op, -float32(number))
if f.queueChangeHook != nil {
f.queueChangeHook(hash, true)
}
break
}
f.insert(op.origin, op.block)
// Wait for an outside event to occur
select {
//从管道中取到p2p发送过来的区块
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
f.forgetHash(hash)
f.forgetBlock(hash)
// 得到新块消息时
case notification := <-f.notify:
// All is well, schedule the announce if block's not yet downloading
//正在下载头
if _, ok := f.fetching[notification.hash]; ok {
break
}
//区块头已经下载,正在下载区块体
if _, ok := f.completing[notification.hash]; ok {
break
}
//每个节点请求的数量
f.announces[notification.origin] = count
//准备下载头
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
request := make(map[string][]common.Hash)
for hash, announces := range f.announced {
if f.getBlock(hash) == nil {
//将没有本地没有的区块,插入到request队列中
request[announce.origin] = append(request[announce.origin], hash)
//在正在下载的队列中保存该announce
f.fetching[hash] = announce
}
}
// Send out all block header requests
for peer, hashes := range request {
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() {
for _, hash := range hashes {
//下载区块头
fetchHeader(hash) // Suboptimal, but protocol doesn't allow
}
}
}
// 从p2p节点拿到区块头信息
case filter := <-f.headerFilter:
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
for _, header := range task.headers {
hash := header.Hash()
// 该头信息对应的区块是空块
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
//标记已经完成同步
complete = append(complete, block)
f.completing[hash] = announce
continue
}
// Otherwise add to the list of blocks needing completion
// 还没有完成同步,需要同步区块体
incomplete = append(incomplete, announce)
}
}
// Schedule the retrieved headers for body completion
for _, announce := range incomplete {
hash := announce.header.Hash()
// 已经在同步区块体了,返回
if _, ok := f.completing[hash]; ok {
continue
}
//已经同步了区块头,准备同步区块体
f.fetched[hash] = append(f.fetched[hash], announce)
if len(f.fetched) == 1 {
f.rescheduleComplete(completeTimer)
}
}
// Schedule the header-only blocks for import
// 将只有区块头的区块,插入到区块链中
for _, block := range complete {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
//同步区块体的定时器
case <-completeTimer.C:
// At least one header's timer ran out, retrieve everything
request := make(map[string][]common.Hash)
for hash, announces := range f.fetched {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for completion
if f.getBlock(hash) == nil {
//将需要下载的区块信息存入数组中
request[announce.origin] = append(request[announce.origin], hash)
f.completing[hash] = announce
}
}
// Send out all block body requests
for peer, hashes := range request {
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
//调用peer的下载方法,下载区块
go f.completing[hashes[0]].fetchBodies(hashes)
}
// filter从管道中拿到数据
case filter := <-f.bodyFilter:
var task *bodyFilterTask
blocks := []*types.Block{}
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
// Match up a body to any possible completion request
matched := false
// 根据p2p网络传过来的数据,生成区块
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
}
// 将区块插入区块链中
for _, block := range blocks {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
}
}
func (f *Fetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if _, ok := f.queued[hash]; !ok {
op := &inject{
origin: peer,
block: block,
}
f.queues[peer] = count
f.queued[hash] = op
//将区块加入到队列中
f.queue.Push(op, -float32(block.NumberU64()))
}
}
//将f.queue队列中的区块插入到区块链中
func (f *Fetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
go func() {
//方法执行完后,会向f.done管道插入数据
defer func() { f.done <- hash }()
// Quickly validate the header and propagate the block if it passes
//校验区块头是否合法
switch err := f.verifyHeader(block.Header()); err {
//如果没有错误,说明是区块头是合法的
case nil:
// 向周围节点广播区块
go f.broadcastBlock(block, true)
}
// Run the actual import and log any issues
// 真正将区块插入到区块链中
if _, err := f.insertChain(types.Blocks{block}); err != nil {
return
}
// 广播区块部分信心到其他节点
go f.broadcastBlock(block, false)
}()
}
//同步区块部分信息
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
select {
//向管道中插入数据
case p.queuedAnns <- block:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
}
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
request := make(newBlockHashesData, len(hashes))
for i := 0; i < len(hashes); i++ {
request[i].Hash = hashes[i]
request[i].Number = numbers[i]
}
//通过p2p网络发送区块部分信息,消息为:NewBlockHashesMsg
return p2p.Send(p.rw, NewBlockHashesMsg, request)
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{
hash: hash,
number: number,
time: time,
origin: peer,
fetchHeader: headerFetcher,
fetchBodies: bodyFetcher,
}
select {
//向管道中插入数据
case f.notify <- block:
return nil
case <-f.quit:
return errTerminated
}
}
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
filter := make(chan *headerFilterTask)
select {
//向f.headerFilter管道插入数据
case f.headerFilter <- filter:
case <-f.quit:
return nil
}
// Request the filtering of the header list
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
}
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
select {
//向管道中写入数据
case destCh <- packet:
return nil
case <-cancel:
return errNoSyncActive
}
}
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
select {
//将数据插入到f.bodyFilter管道中
case f.bodyFilter <- filter:
case <-f.quit:
return nil, nil
}
// Request the filtering of the body list
select {
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case <-f.quit:
return nil, nil
}
}

网友评论