美文网首页Golang
nsq源码(5) nsqd 消息投递

nsq源码(5) nsqd 消息投递

作者: Linrundong | 来源:发表于2019-01-21 17:56 被阅读25次

保证成功投递

  • nsq保证了"至少一次"成功投递,而不是仅一次
  • 通过client.subChannel.StartInFlightTimeout()协程

protocolV2处理对象

protocolV2.messagePump()协程
  • 在向订阅的client发送消息前,会在(开始投递超时处理)subChannel.StartInFlightTimeout()记录超时信息
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ...
        select {
        ...
        case msg := <-memoryMsgChan:
            ...
            // 发送前,在subChannel.StartInFlightTimeout()标记消息
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            // 向订阅的此client发送消息
            client.SendingMessage()
            err = p.SendMessage(client, msg)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }
}
标记超时时间-subchannel.StartInFlightTimeout()
  • 在当前client的subchannel使用messageID标记msg
  • msg.pri记录超时时间
  • 向subchannel的==超时队列inFlightPQ==消息数组添加时间和消息(压入栈顶)
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()

    // 给subchannel使用messageID标记msg
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    // 向subchannel的inFlightPQ消息数组添加此消息
    c.addToInFlightPQ(msg)
    return nil
}

定时检查超时-queueScanLoop

如果一条消息一直没有被消费,nsqd如何处理?

参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:

惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。

定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。

  • ==生产者/消费者模式==
  • 定期检查:==queueScanLoop方法中,每隔QueueScanInterval的时间,会从方法cache的channels list中随机选择QueueScanSelectionCount个channel,然后去执行resizePool。==
  • nsqd 启动的时候就 起了一个 queueScanLoop 线程
func (n *NSQD) Main() {
    ...
    // 超时消息检索和处理任务
    n.waitGroup.Wrap(n.queueScanLoop)
}
  • ==支持任务派发,任务响应,任务关闭的线程池模式==
  • 间隔性的派发 scan 任务, 并适时调整 queueScanWorker 的数量
  • responseCh获取任务结果,dirty代表处理过
func (n *NSQD) queueScanLoop() {
    // 任务派发队列
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    // 任务结果队列
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    // 创建worker并控制数量min(0.25 * chans, configMax)
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        // 定时刷新
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }
        
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        // 随机获取几个chan,发送到workCh任务队列
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        // 接收worker结果, 统计有多少channel是"脏"的
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        // 如果dirty的数量超过配置直接进行下一轮
        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

NSQD.resizePool() 动态调整worker协程数量

  • 常见worker协程
  • 调整worker数量min(0.25 * chans, configMax)
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            closeCh <- 1
            n.poolSize--
        } else {
            // idealPoolSize > n.poolSize,还需增加worker
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

NSQD.queueScanWorker() 消费者worker

  • 负责具体业务工作
  • 利用了go 队列的select随机接收特性
  • processInFlightQueue()处理超时后传入response通知此chan进行过
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

检索和处理超时消息-channel.processInFlightQueue()

  • 读取==inFlightPQ消息队列==
  • 判断是否超时,超时了则通知并重新发送,再次进行超时预处理StartInFlightTimeout()
// queueScanWorker任务会传入当前时间到t
func (c *Channel) processInFlightQueue(t int64) bool {
    // 同步状态,防止正在这个channel正在退出
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    // 循环处理inFlightPQ消息队列栈顶消息
    for {
        c.inFlightMutex.Lock()
        // 没有超时,则返回nil, 然后goto exit->return dirty
        // 超时了,inFlightPQ弹出并返回msg
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        // 只要发送过消息,则标记此subchannel为dirty
        dirty = true

        // 删除超时消息对应channel的inFlightMessages消息map
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            // 向超时消息对应client发送超时通知
            client.TimedOutMessage()
        }
        // 重新发送消息
        c.put(msg)
    }

exit:
    return dirty
}

相关文章

网友评论

    本文标题:nsq源码(5) nsqd 消息投递

    本文链接:https://www.haomeiwen.com/subject/kvqhjqtx.html