美文网首页
p2p网络,

p2p网络,

作者: nit小星星 | 来源:发表于2019-02-26 11:07 被阅读3次

这里使用ipfs的LIBP2P。实现p2p。网络,基本上就是调用Libp2p2封装好的接口

星云链除书画Nebulas的时候网络部分从neb_service开始。

进入neb_service初始化两个结构。一个是dispatch一个是node

在dispatch里面进行消息注册和分发

在node里面实现一个node的特性。

node里面首先会初始化localhost。开始自己的host主机。然后初始化streammanager

streammanager是管理stream的。一个node里面有多个stream。stream就是与其他主机的连接。stream提供write和read接口。

也就是。从stream里面读到的数据先在dispatch里面进行注册和分发。把数据收入到相应map结构里面。

从代码上看:

type Dispatcher struct{

subscribersMap    *sync.Map

quitCh chan bool

receivedMessageCh chan Message

dispatchedMessages    *lru.Cache

filters map[string]bool

}

这里是消息分发

subscribersMap 是消息类型。用一个sync.map来存。线程安全的

funcNewDispatcher() *Dispatcher{

dp:=&Dispatcher{

subscribersMap:new(sync.Map),

quitCh:make(chanbool,10),

receivedMessageCh:make(chanMessage,65536),

filters:make(map[string]bool),

}

dp.dispatchedMessages, _ = lru.New(51200)

returndp

}

在初始化dispatch过程中可以看到。quitch 是10。这个后面再说。最多接受65536个信息

func(dp*Dispatcher)Register(subscribers...*Subscriber) {

for_,v:=rangesubscribers {

mt:=v.MessageType()

m,_:=dp.subscribersMap.LoadOrStore(mt,new(sync.Map))

m.(*sync.Map).Store(v,true)

dp.filters[mt] = v.DoFilter()

}

}

这个是把消息注册到map上。也就是收到一个或者多个消息。首先进行消息判断。根据 消息类型将相应的Map给拿出来、

注意这里有两层Map结构。根据Mt取出来的是一个Map结构,然后取出的这个map结构存着消息。第一层map结构只是消息类型。然后将相应的filtermap置为true

和注册相反

func(dp*Dispatcher)Deregister(subscribers...*Subscriber) {

for_,v:=rangesubscribers {

mt:=v.MessageType()

m,_:=dp.subscribersMap.Load(mt)

ifm ==nil{

continue

}

m.(*sync.Map).Delete(v)

delete(dp.filters, mt)

}

}

当然还忘了,那都是处理Recmessage的,首先我们从stream的数据拿出来之后还得putmessage到receivemessage管道上

func(dp*Dispatcher)PutMessage(msgMessage) {

//it's a optimize strategy for message dispatch, according to https://github.com/nebulasio/go-nebulas/issues/50

hash:=msg.Hash()

ifdp.filters[msg.MessageType()] {

ifexist,_:=dp.dispatchedMessages.ContainsOrAdd(hash, hash); exist ==true{

//duplicated message, ignore.

metricsDuplicatedMessage(msg.MessageType())

return

}

}

dp.receivedMessageCh<-msg

}

接下来的消息就都在

subscribersMap里面了,然后一个循环将消息根据类型分发到不同的channel里面

func(dp*Dispatcher)loop() {

logging.CLog().Info("Started NewService Dispatcher.")

timerChan:=time.NewTicker(time.Second).C

for{

select{

case<-timerChan:

metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))

case<-dp.quitCh:

logging.CLog().Info("Stoped NebService Dispatcher.")

return

casemsg:=<-dp.receivedMessageCh:

msgType:=msg.MessageType()

v,_:=dp.subscribersMap.Load(msgType)

ifv ==nil{

continue

}

m,_:=v.(*sync.Map)

m.Range(func(key, valueinterface{})bool{

select{

casekey.(*Subscriber).msgChan<-msg:

default:

logging.VLog().WithFields(logrus.Fields{

"msgType": msgType,

}).Warn("timeout to dispatch message.")

}

returntrue

})

}

}

}

这样dispatch就结束了

我们在看Node结构

type Node struct{

synchronizing bool

quitCh chan bool

netService    *NebService

config        *Config

context      context.Context

id            peer.ID

networkKey    crypto.PrivKey

network      *swarm.Network

host          *basichost.BasicHost

streamManager *StreamManager

routeTable    *RouteTable

那个context是什么呢,小白的我去看了一下。哦原来是你为了结束Gorutine的,方便杀死一个gorutine

id是一串字符,代表唯一确定的node,我去看了一下pee.id是一串字符

相关文章

网友评论

      本文标题:p2p网络,

      本文链接:https://www.haomeiwen.com/subject/hvboyqtx.html