保证成功投递
- nsq保证了"至少一次"成功投递,而不是仅一次
- 通过client.subChannel.StartInFlightTimeout()协程
protocolV2处理对象
protocolV2.messagePump()协程
- 在向订阅的client发送消息前,会在(开始投递超时处理)subChannel.StartInFlightTimeout()记录超时信息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
...
select {
...
case msg := <-memoryMsgChan:
...
// 发送前,在subChannel.StartInFlightTimeout()标记消息
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
// 向订阅的此client发送消息
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
标记超时时间-subchannel.StartInFlightTimeout()
- 在当前client的subchannel使用messageID标记msg
- msg.pri记录超时时间
- 向subchannel的==超时队列inFlightPQ==消息数组添加时间和消息(压入栈顶)
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
// 给subchannel使用messageID标记msg
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
// 向subchannel的inFlightPQ消息数组添加此消息
c.addToInFlightPQ(msg)
return nil
}
定时检查超时-queueScanLoop
如果一条消息一直没有被消费,nsqd如何处理?
参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:
惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。
定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。
- ==生产者/消费者模式==
- 定期检查:==queueScanLoop方法中,每隔QueueScanInterval的时间,会从方法cache的channels list中随机选择QueueScanSelectionCount个channel,然后去执行resizePool。==
- nsqd 启动的时候就 起了一个 queueScanLoop 线程
func (n *NSQD) Main() {
...
// 超时消息检索和处理任务
n.waitGroup.Wrap(n.queueScanLoop)
}
- ==支持任务派发,任务响应,任务关闭的线程池模式==
- 间隔性的派发 scan 任务, 并适时调整 queueScanWorker 的数量
- responseCh获取任务结果,dirty代表处理过
func (n *NSQD) queueScanLoop() {
// 任务派发队列
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
// 任务结果队列
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
// 创建worker并控制数量min(0.25 * chans, configMax)
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
// 定时刷新
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
// 随机获取几个chan,发送到workCh任务队列
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
// 接收worker结果, 统计有多少channel是"脏"的
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
// 如果dirty的数量超过配置直接进行下一轮
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf(LOG_INFO, "QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
NSQD.resizePool() 动态调整worker协程数量
- 常见worker协程
- 调整worker数量min(0.25 * chans, configMax)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
closeCh <- 1
n.poolSize--
} else {
// idealPoolSize > n.poolSize,还需增加worker
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
NSQD.queueScanWorker() 消费者worker
- 负责具体业务工作
- 利用了go 队列的select随机接收特性
- processInFlightQueue()处理超时后传入response通知此chan进行过
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
检索和处理超时消息-channel.processInFlightQueue()
- 读取==inFlightPQ消息队列==
- 判断是否超时,超时了则通知并重新发送,再次进行超时预处理StartInFlightTimeout()
// queueScanWorker任务会传入当前时间到t
func (c *Channel) processInFlightQueue(t int64) bool {
// 同步状态,防止正在这个channel正在退出
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
// 循环处理inFlightPQ消息队列栈顶消息
for {
c.inFlightMutex.Lock()
// 没有超时,则返回nil, 然后goto exit->return dirty
// 超时了,inFlightPQ弹出并返回msg
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
// 只要发送过消息,则标记此subchannel为dirty
dirty = true
// 删除超时消息对应channel的inFlightMessages消息map
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
// 向超时消息对应client发送超时通知
client.TimedOutMessage()
}
// 重新发送消息
c.put(msg)
}
exit:
return dirty
}
网友评论