美文网首页Docker容器k8s那点事儿
[k8s源码分析][kube-scheduler]schedul

[k8s源码分析][kube-scheduler]schedul

作者: nicktming | 来源:发表于2019-10-11 07:28 被阅读0次

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

本文将分析kubernetes/pkg/scheduler/internal/queue中的文件, 其中包括scheduling_queue.go
源码位置: https://github.com/nicktming/kubernetes/blob/tming-v1.13/pkg/scheduler/internal/queue/scheduling_queue.go
分支: tming-v1.13 (基于v1.13版本)

2. SchedulingQueue (以PriorityQueue为例)

SchedulingQueue 是一个存着等待被调度的所有pods的数据结构, 也就是说所有没有nodeNamepod都会被SchedulingQueue管理.

SchedulingQueue 主要涉及三个重要的模块:

activeQ: 存着即将要被调度的pods
unschedulableQ: 存着已经试着调度但是没有成功的
nominatedPods: 存着抢占资源而成功的pods (因为抢占资源, 意味着正常调度失败, 然后如果杀死某些优先级低的pods可以使得该pod可以调度在某个节点上, 因此该pod还没有真正调度到那个被选中的节点上, 因此杀死那些优先级低的pods需要时间)

注意: 一个pod是不能同时出现在activeQunschedulableQ中的.

2.1 activeQ

首先介绍一下activeQ, 它是一个heap组成的数据结构, heap中的数据按照优先级从高到低排序, 也就是说第一个出队列的是优先级最高的pod.

// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
    // data stores objects and has a queue that keeps their ordering according
    // to the heap invariant.
    data *heapData
}

Heap是一个用堆实现的类似于生产者/消费者的队列.

2.1.1 heapData

heapData是一个实现heap接口的数据结构, 就是一个真正的heap.

// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool

// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)

type heapItem struct {
    obj   interface{} // The object which is stored in the heap.
    index int         // The index of the object's key in the Heap.queue.
}

type itemKeyValue struct {
    key string
    obj interface{}
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
    // items is a map from key of the objects to the objects and their index.
    // We depend on the property that items in the map are in the queue and vice versa.
    items map[string]*heapItem
    // queue implements a heap data structure and keeps the order of elements
    // according to the heap invariant. The queue keeps the keys of objects stored
    // in "items".
    queue []string

    // keyFunc is used to make the key used for queued item insertion and retrieval, and
    // should be deterministic.
    keyFunc KeyFunc
    // lessFunc is used to compare two objects in the heap.
    lessFunc LessFunc
}

queue数组是一个真正实现heap的数组, 里面的数据是根据keyFunc生成的key.

// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
    if i > len(h.queue) || j > len(h.queue) {
        return false
    }
    itemi, ok := h.items[h.queue[i]]
    if !ok {
        return false
    }
    itemj, ok := h.items[h.queue[j]]
    if !ok {
        return false
    }
    return h.lessFunc(itemi.obj, itemj.obj)
}

// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }

// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
    h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
    item := h.items[h.queue[i]]
    item.index = i
    item = h.items[h.queue[j]]
    item.index = j
}

// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
    keyValue := kv.(*itemKeyValue)
    n := len(h.queue)
    h.items[keyValue.key] = &heapItem{keyValue.obj, n}
    h.queue = append(h.queue, keyValue.key)
}

// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
    key := h.queue[len(h.queue)-1]
    h.queue = h.queue[0 : len(h.queue)-1]
    item, ok := h.items[key]
    if !ok {
        // This is an error
        return nil
    }
    delete(h.items, key)
    return item.obj
}

都是一些基本的操作, 就不多说了.

2.1.2 Heap

上面已经说了Heap是一个用堆实现的类似于生产者/消费者的队列. 所以就看一下它的Addpop方法就行.

// 1. 计算key
// 2. 根据item的map结构检查该obj是否存在
// 3. 如果存在 则更新该obj 并重新调整堆结构
// 4. 如果不存在 则添加该obj
func (h *Heap) Add(obj interface{}) error {
    key, err := h.data.keyFunc(obj)
    if err != nil {
        return cache.KeyError{Obj: obj, Err: err}
    }
    if _, exists := h.data.items[key]; exists {
        h.data.items[key].obj = obj
        heap.Fix(h.data, h.data.items[key].index)
    } else {
        heap.Push(h.data, &itemKeyValue{key, obj})
    }
    return nil
}
  1. 计算key
  2. 根据item的map结构检查该obj是否存在
  3. 如果存在 则更新该obj 并重新调整堆结构
  4. 如果不存在 则添加该obj
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
    obj := heap.Pop(h.data)
    if obj != nil {
        return obj, nil
    }
    return nil, fmt.Errorf("object was removed from heap data")
}

Pop则直接返回堆的头

2.1.3 总结

所以activeQ是这样的一个数据结构.

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
    pq := &PriorityQueue{
        clock:            util.RealClock{},
        stop:             stop,
        activeQ:          newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
        unschedulableQ:   newUnschedulablePodsMap(),
        nominatedPods:    newNominatedPodMap(),
        moveRequestCycle: -1,
    }
    pq.cond.L = &pq.lock

    pq.run()
    return pq
}

// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
    return &Heap{
        data: &heapData{
            items:    map[string]*heapItem{},
            queue:    []string{},
            keyFunc:  keyFn,
            lessFunc: lessFn,
        },
    }
}

MetaNamespaceKeyFunc基本上就是pod_namespace/pod_name, 而activeQComp就是heap的比较方法. 按照优先级大小进行比较, 如果优先级一样, 就按最后一次被调度的时间.

func podTimestamp(pod *v1.Pod) *metav1.Time {
    _, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
    if condition == nil {
        return &pod.CreationTimestamp
    }
    if condition.LastProbeTime.IsZero() {
        return &condition.LastTransitionTime
    }
    return &condition.LastProbeTime
}

// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
    p1 := pod1.(*v1.Pod)
    p2 := pod2.(*v1.Pod)
    prio1 := util.GetPodPriority(p1)
    prio2 := util.GetPodPriority(p2)
    return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
}

3. unschedulableQ

接下来看一下unschedulableQ的数据结构以及它的一些方法.

// UnschedulablePodsMap 就是一个Map结构
// key为keyFunc计算出来的key 
// value就是对应的pod
type UnschedulablePodsMap struct {
    // pods is a map key by a pod's full-name and the value is a pointer to the pod.
    pods    map[string]*v1.Pod
    keyFunc func(*v1.Pod) string
}

可以看到UnschedulablePodsMap是一个标准的Map结构, key是由keyFunc方法计算出来的key, value为对应的pod.

3.1 方法

由于此结构比较简单, 所以它的方法包括addOrUpdate, delete 等等都是对map进行直接操作.

// 添加或者更新
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
    u.pods[u.keyFunc(pod)] = pod
}

// Delete deletes a pod from the unschedulable pods.
// 删除
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
    delete(u.pods, u.keyFunc(pod))
}

4. nominatedPodMap

nominatedPodMap用于存储那些抢占成功的pods. 下面这段话的大致意思就是该pod现在的nominatedNodeName是有可能与最后真正运行到的节点不一样的, 因此需要有结构存储并且可以进行操作.

// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.

nominatedPodMap的结构如下:

type nominatedPodMap struct {
    // 这些pods可能在activeQ或者unschedulableQ中
    // nodeName -> pods
    nominatedPods map[string][]*v1.Pod
    // pod_UID -> nodeName
    nominatedPodToNode map[ktypes.UID]string
}

func newNominatedPodMap() *nominatedPodMap {
    return &nominatedPodMap{
        nominatedPods:      make(map[string][]*v1.Pod),
        nominatedPodToNode: make(map[ktypes.UID]string),
    }
}

4.1 对应的方法

func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
    // 无论是否存在 先删除
    npm.delete(p)

    nnn := nodeName
    if len(nnn) == 0 {
        nnn = NominatedNodeName(p)
        if len(nnn) == 0 {
            return
        }
    }
    // 1. 如果nodeName和pod.Status.NominatedNodeName都为空 直接返回
    // 2. 如果nodeName不为空 nnn = nodeName 否则 nnn = pod.Status.NominatedNodeName
    
    npm.nominatedPodToNode[p.UID] = nnn
    for _, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
            return
        }
    }
    npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}

// 如果该pod存在nominatedPodMap中 就删除
// 不存在 就直接返回
func (npm *nominatedPodMap) delete(p *v1.Pod) {
    nnn, ok := npm.nominatedPodToNode[p.UID]
    if !ok {
        return
    }
    for i, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
            if len(npm.nominatedPods[nnn]) == 0 {
                delete(npm.nominatedPods, nnn)
            }
            break
        }
    }
    delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
    // We update irrespective of the nominatedNodeName changed or not, to ensure
    // that pod pointer is updated.
    npm.delete(oldPod)
    npm.add(newPod, "")
}
// 取该节点下所有nominated Pods
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
    if list, ok := npm.nominatedPods[nodeName]; ok {
        return list
    }
    return nil
}

相关文章

网友评论

    本文标题:[k8s源码分析][kube-scheduler]schedul

    本文链接:https://www.haomeiwen.com/subject/eqgapctx.html