这里使用ipfs的LIBP2P。实现p2p。网络,基本上就是调用Libp2p2封装好的接口
星云链除书画Nebulas的时候网络部分从neb_service开始。
进入neb_service初始化两个结构。一个是dispatch一个是node
在dispatch里面进行消息注册和分发
在node里面实现一个node的特性。
node里面首先会初始化localhost。开始自己的host主机。然后初始化streammanager
streammanager是管理stream的。一个node里面有多个stream。stream就是与其他主机的连接。stream提供write和read接口。
也就是。从stream里面读到的数据先在dispatch里面进行注册和分发。把数据收入到相应map结构里面。
从代码上看:
type Dispatcher struct{
subscribersMap *sync.Map
quitCh chan bool
receivedMessageCh chan Message
dispatchedMessages *lru.Cache
filters map[string]bool
}
这里是消息分发
subscribersMap 是消息类型。用一个sync.map来存。线程安全的
funcNewDispatcher() *Dispatcher{
dp:=&Dispatcher{
subscribersMap:new(sync.Map),
quitCh:make(chanbool,10),
receivedMessageCh:make(chanMessage,65536),
filters:make(map[string]bool),
}
dp.dispatchedMessages, _ = lru.New(51200)
returndp
}
在初始化dispatch过程中可以看到。quitch 是10。这个后面再说。最多接受65536个信息
func(dp*Dispatcher)Register(subscribers...*Subscriber) {
for_,v:=rangesubscribers {
mt:=v.MessageType()
m,_:=dp.subscribersMap.LoadOrStore(mt,new(sync.Map))
m.(*sync.Map).Store(v,true)
dp.filters[mt] = v.DoFilter()
}
}
这个是把消息注册到map上。也就是收到一个或者多个消息。首先进行消息判断。根据 消息类型将相应的Map给拿出来、
注意这里有两层Map结构。根据Mt取出来的是一个Map结构,然后取出的这个map结构存着消息。第一层map结构只是消息类型。然后将相应的filtermap置为true
和注册相反
func(dp*Dispatcher)Deregister(subscribers...*Subscriber) {
for_,v:=rangesubscribers {
mt:=v.MessageType()
m,_:=dp.subscribersMap.Load(mt)
ifm ==nil{
continue
}
m.(*sync.Map).Delete(v)
delete(dp.filters, mt)
}
}
当然还忘了,那都是处理Recmessage的,首先我们从stream的数据拿出来之后还得putmessage到receivemessage管道上
func(dp*Dispatcher)PutMessage(msgMessage) {
//it's a optimize strategy for message dispatch, according to https://github.com/nebulasio/go-nebulas/issues/50
hash:=msg.Hash()
ifdp.filters[msg.MessageType()] {
ifexist,_:=dp.dispatchedMessages.ContainsOrAdd(hash, hash); exist ==true{
//duplicated message, ignore.
metricsDuplicatedMessage(msg.MessageType())
return
}
}
dp.receivedMessageCh<-msg
}
接下来的消息就都在
subscribersMap里面了,然后一个循环将消息根据类型分发到不同的channel里面
func(dp*Dispatcher)loop() {
logging.CLog().Info("Started NewService Dispatcher.")
timerChan:=time.NewTicker(time.Second).C
for{
select{
case<-timerChan:
metricsDispatcherCached.Update(int64(len(dp.receivedMessageCh)))
case<-dp.quitCh:
logging.CLog().Info("Stoped NebService Dispatcher.")
return
casemsg:=<-dp.receivedMessageCh:
msgType:=msg.MessageType()
v,_:=dp.subscribersMap.Load(msgType)
ifv ==nil{
continue
}
m,_:=v.(*sync.Map)
m.Range(func(key, valueinterface{})bool{
select{
casekey.(*Subscriber).msgChan<-msg:
default:
logging.VLog().WithFields(logrus.Fields{
"msgType": msgType,
}).Warn("timeout to dispatch message.")
}
returntrue
})
}
}
}
这样dispatch就结束了
我们在看Node结构
type Node struct{
synchronizing bool
quitCh chan bool
netService *NebService
config *Config
context context.Context
id peer.ID
networkKey crypto.PrivKey
network *swarm.Network
host *basichost.BasicHost
streamManager *StreamManager
routeTable *RouteTable
}
那个context是什么呢,小白的我去看了一下。哦原来是你为了结束Gorutine的,方便杀死一个gorutine
id是一串字符,代表唯一确定的node,我去看了一下pee.id是一串字符
网友评论