三大核心组件之一 kube-scheduler,就是负责将Pod调度到Node,那么Kubernetes调度程序如何工作?能支持哪些调度策略?集群高可用场景Master如何选主? 且看下文分析
scheduler简介
kube-scheduler是kubernetes的调度器,主要的任务是监视PodSpec.NodeName值为空的Pod,使用predicates和priorities定义一个pod应该运行的节点,可以配置自定义调度策略覆盖参数默认值
调度需要满足多种目标
- 高效利用资源 集群所有资源最大化被使用
- 灵活 允许用户根据自己的需求配置调度策略
- 效率 快速调度应用程序以使其不处于挂起状态
- 公平 保证每个节点都能被分配资源
- 容错 对错误具有鲁棒性并始终可用
Scheduler模型示例
以一个Pod完整生命周期为例,介绍调度发生的时期
- 用户创建一个Pod,并将其期望状态保存到etcd中
- kube-scheduler Watch API Server观察到一个没有与节点绑定的Pod(spec.nodeName="")
- 根据调度策略找到最适合该Pod的Node(对每个pod都会创建一个binding--pod应该放置的节点)
- 告诉API Server将Pod绑定到哪个Node,保存期望状态到etcd(spec.nodeName="XXNode")
- kubelet通过API Server观察到绑定的Pod,然后通过CRI在特定Node上启动容器
调度流程
kube-apiserver初始化时,建立对etcd的连接,并对etcd进行watch。当kube-scheduler等客户端调用watch API时,kube-apiserver内部建立的WatchServer从etcd里面获取资源的Watch event,event经过加工过滤后发送给客户端
通过watch机制获取到的资源对象,将经过预选和优选两个环节调度到节点
预选:根据Predicates Policies过滤掉不满足这些Policy的Node
优选:根据Priorities Policies给预选后的Node打分排名,得分最高的Node即为Pod最合适的绑定目标(如果经过优选打分有多个Node并列得分最高,kube-scheduler随机选择一个作为目标Node)
predicates 调度一个新pod到集群的强制规则,如果没有机器对应于预选规则,pod将保持pending状态,知道有机器满足它们
- Predicate 节点要求
- PodFitPorts 需要能托管pod而不发生任何端口冲突
- PodFitsResources 有足够的资源托管pod
- NoDiskConflict 有足够的内存空间来容纳pod和它链接的volume
- MatchNodeSelector 匹配pod中定义的选择器查询参数
- HostName 具有pod中定义的主机参数的名称
Priorities 如果调度器发现多个满足预选策略的node, 将会使用优选策略去找到最适合pod运行的机器。以键值对形式表示优选策略及其权重(优先级)
- Priority 节点重要性
- LeastRequestedPriority 计算节点上已有的pod所请求的内存和CPU百分比,具有最小百分比的节点为最佳
- BalancedResourceAllocation 具有相似内存和CPU使用率的节点
- ServiceSpreadingPriority 不同pod使用的节点优先
- EqualPriority 为集群中所有节点赋予相同优先级(仅用于测试)
策略配置文件样例
# policy-config-file.json { "kind": "Policy", "apiVersion": "v1", "predicate": [ { "name": "PodFitsPorts" }, { "name": "PodFitsResources" }, { "name": "NoDiskConflict" }, { "name": "MatchNodeSelector" }, { "name": "HostName" }], "priorities": [ { "name": "LeastRequestedPriority", "weight": 1 }, { "name": "BalancedResourceAllocation", "weight": 1 }, { "name": "ServiceSpreadingPriority", "weight": 1 }, { "name": "EqualPriority", "weight": 1 }] }
调度策略源码走读
调度算法源码:github.com/kubernetes/kubernetes
其中cmd/kube-scheduler和pkg/scheduler分别定义了kube-scheduler中使用到的结构体和参数封装及内部实现
目录结果如下
配置函数 cmd/kube-scheduler/app/config/config.go
// Config has all the context to run a Scheduler type Config struct { // config is the scheduler server's configuration object. ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration // LoopbackClientConfig is a config for a privileged loopback connection LoopbackClientConfig *restclient.Config InsecureServing *apiserver.DeprecatedInsecureServingInfo // nil will disable serving on an insecure port InsecureMetricsServing *apiserver.DeprecatedInsecureServingInfo // non-nil if metrics should be served independently Authentication apiserver.AuthenticationInfo Authorization apiserver.AuthorizationInfo SecureServing *apiserver.SecureServingInfo Client clientset.Interface InformerFactory informers.SharedInformerFactory PodInformer coreinformers.PodInformer EventClient v1core.EventsGetter Recorder record.EventRecorder Broadcaster record.EventBroadcaster // LeaderElection is optional. LeaderElection *leaderelection.LeaderElectionConfig }
kube-scheduler入口函数
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
Scheduler结构: pkg/scheduler/scheduler.go
// Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { config *factory.Config }
pkg/scheduler/factory/factory.go
// Config is an implementation of the Scheduler's configured input data. // TODO over time we should make this struct a hidden implementation detail of the scheduler. type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache internalcache.Cache NodeLister algorithm.NodeLister Algorithm core.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. PodConditionUpdater PodConditionUpdater // PodPreemptor is used to evict pods and update 'NominatedNode' field of // the preemptor pod. PodPreemptor PodPreemptor // PlugingSet has a set of plugins and data used to run them. PluginSet pluginsv1alpha1.PluginSet // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. NextPod func() *v1.Pod // WaitForCacheSync waits for scheduler cache to populate. // It returns true if it was successful, false if the controller should shutdown. WaitForCacheSync func() bool // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*v1.Pod, error) // Recorder is the EventRecorder to use Recorder record.EventRecorder // Close this to shut down the scheduler. StopEverything <-chan struct{} // VolumeBinder handles PVC/PV binding for the pod. VolumeBinder *volumebinder.VolumeBinder // Disable pod preemption or not. DisablePreemption bool // SchedulingQueue holds pods to be scheduled SchedulingQueue internalqueue.SchedulingQueue }
new scheduler
// New returns a Scheduler func New(client clientset.Interface, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, pvInformer coreinformers.PersistentVolumeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, replicationControllerInformer coreinformers.ReplicationControllerInformer, replicaSetInformer appsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, storageClassInformer storageinformers.StorageClassInformer, recorder record.EventRecorder, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, stopCh <-chan struct{}, opts ...func(o *schedulerOptions)) (*Scheduler, error) { options := defaultSchedulerOptions for _, opt := range opts { opt(&options) } // Set up the configurator which can create schedulers from configs. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ SchedulerName: options.schedulerName, Client: client, NodeInformer: nodeInformer, PodInformer: podInformer, PvInformer: pvInformer, PvcInformer: pvcInformer, ReplicationControllerInformer: replicationControllerInformer, ReplicaSetInformer: replicaSetInformer, StatefulSetInformer: statefulSetInformer, ServiceInformer: serviceInformer, PdbInformer: pdbInformer, StorageClassInformer: storageClassInformer, HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, DisablePreemption: options.disablePreemption, PercentageOfNodesToScore: options.percentageOfNodesToScore, BindTimeoutSeconds: options.bindTimeoutSeconds, }) var config *factory.Config source := schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. sc, err := configurator.CreateFromProvider(*source.Provider) if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } config = sc case source.Policy != nil: // Create the config from a user specified policy source. policy := &schedulerapi.Policy{} switch { case source.Policy.File != nil: if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil { return nil, err } case source.Policy.ConfigMap != nil: if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil { return nil, err } } sc, err := configurator.CreateFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } config = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. config.Recorder = recorder config.DisablePreemption = options.disablePreemption config.StopEverything = stopCh // Create the scheduler. sched := NewFromConfig(config) AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer) return sched, nil }
run
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func (sched *Scheduler) Run() { if !sched.config.WaitForCacheSync() { return } go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) }
默认调度策略 位于algorithmprovider/defaults/defaults.go模块
- 默认预选配置
- NoVolumeZoneConflictPred
- MaxEBSVolumeCountPred
- MaxGCEPxDVolumeCountPred
- MaxAzureDiskVolumeCountPred
- MaxCSIVolumeCountPred
- MatchInterPodAffinityPred
- NoDiskConflictPred
- GeneralPred
- CheckNodeMemoryPressurePred
- CheckNodeDiskPressurePred
- CheckNodePIDPressurePred
- CheckNodeConditionPred
- PodToleratesNodeTaintsPred
- CheckVolumeBindingPred
- 默认优选配置
- SelectorSpreadPriority
- InterPodAffinityPriority
- LeastRequestedPriority
- BalancedResourceAllocation
- NodePreferAvoidPodsPriority
- NodeAffinityPriority
- TaintTolerationPriority
- ImageLocalityPriority
当前支持的调度策略
- predicates policy
- MatchInterPodAffinityPred
- CheckVolumeBindingPred
- CheckNodeConditionPred
- GeneralPred
- HostNamePred
- PodFitsHostPortsPred
- MatchNodeSelectorPred
- PodFitsResourcesPred
- NoDiskConflictPred
- PodToleratesNodeTaintsPred
- CheckNodeUnschedulablePred
- PodToleratesNodeNoExecuteTaintsPred
- CheckNodeLabelPresencePred
- CheckServiceAffinityPred
- MaxEBSVolumeCountPred
- MaxGCEPDVolumeCountPred
- MaxAzureDiskVolumeCountPred
- MaxCinderVolumeCountPred
- MaxCSIVolumeCountPred
- NoVolumeZoneConflictPred
- CheckNodeMemoryPressurePred
- CheckNodeDiskPressurePred
- CheckNodePIDPressurePred
- priorities policy
- EqualPriority
- MostRequestedPriority
- RequestedToCapacityRatioPriority
- SelectorSpreadPriority
- ServiceSpreadingPriority
- InterPodAffinityPriority
- LeastRequestedPriority
- BalancedResourceAllocation
- NodePreferAvoidPodsPriority
- NodeAffinityPriority
- TaintTolerationPriority
- ImageLocalityPriority
- ResourceLimitsPriority
Kubenetes Master Node 选主
Kubernetes HA按Etcd Memeber和Master Node相对位置分类为两种拓扑
- 堆叠etcd拓扑 耦合control plane和etcd member于相同节点
- 外部etcd拓扑 解耦control plane与etcd member
Master HA 实现没有单点故障的目标,需要为:etcd、kube-apiserver、kube-controller-manager、kube-scheduler组件建立高可用方案
etcd 通常是集群化的,自有一套选主机制
kube-apiserver 本身是无状态服务,实现高可用的主要思路就是负载均衡,具体实现有外部负载均衡、网络层负载均衡及Node节点上使用反向代理对多个Master做负载均衡等方案
这里不对Etcd HA、kube-apiserver HA及两种拓扑特点做详细解说,主要关注参与选主的组件kube-controller-manager、kube-scheduler其实现机制
kube-controller-manager和kube-scheduler 服务是Master节点的一部分,启动参数`--leader-elect=true`,告知组件以高可用方式启动,获取到锁(apiserver中的Endpoint)的实例成为leader,其他候选者通过对比锁跟新时间和持有者来判断自己是否能成为新的leader,leader通过更新RenewTime来确保持续持有锁并保持leadership
分布式资源锁实现 通过创建资源(ConfigMap和Endpoint)来维护锁的状态
tools主要用来编写Controller,选主由leaderelection实现,其目录结构如下
源码实现 kube-controller-manager实例,若以`--leader-elect=true`方式启动,则进入下面代码,获取自身uuid并创建资源锁
id, err := os.Hostname() if err != nil { return err } // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) rl, err := resourcelock.New(c.Generic.ComponentConfig.LeaderElection.ResourceLock, "kube-system", // 锁对象命名空间 "kube-controller-manager", // 锁对象名称 c.Generic.LeaderElectionClient.CoreV1(), resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: c.Generic.EventRecorder, }) if err != nil { glog.Fatalf("error creating lock: %v", err) } leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.Generic.ComponentConfig.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.Generic.ComponentConfig.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.Generic.ComponentConfig.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, WatchDog: electionChecker, Name: "kube-controller-manager", })
LeaderCallbacks获取锁(成为leader, OnStartedLeading)之后执行run方法,失去锁log fatal后退出
LeaderElectionConfig 结构如下
type LeaderElectionConfig struct { // Lock is the resource that will be used for locking Lock rl.Interface // LeaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. This is measured against time of // last observed ack. LeaseDuration time.Duration // RenewDeadline is the duration that the acting master will retry // refreshing leadership before giving up. RenewDeadline time.Duration // RetryPeriod is the duration the LeaderElector clients should wait // between tries of actions. RetryPeriod time.Duration // Callbacks are callbacks that are triggered during certain lifecycle // events of the LeaderElector Callbacks LeaderCallbacks // WatchDog is the associated health checker // WatchDog may be null if its not needed/configured. WatchDog *HealthzAdaptor // ReleaseOnCancel should be set true if the lock should be released // when the run context is cancelled. If you set this to true, you must // ensure all code guarded by this lease has successfully completed // prior to cancelling the context, or you may have two processes // simultaneously acting on the critical path. ReleaseOnCancel bool // Name is the name of the resource lock for debugging Name string }
另外一个重要的结构 LeaderElectionRecord--保存锁信息,以annotation方式(key:control-plane.alpha.kubernetes.io/leader )保存到ConfigMap或Endpoint
// LeaderElectionRecord is the record that is stored in the leader election annotation. // This information should be used for observational purposes only and could be replaced // with a random string (e.g. UUID) with only slight modification of this code. // TODO(mikedanese): this should potentially be versioned type LeaderElectionRecord struct { // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not // attempt to acquire leases with empty identities and will wait for the full lease // interval to expire before attempting to reacquire. This value is set to empty when // a client voluntarily steps down. HolderIdentity string `json:"holderIdentity"` LeaseDurationSeconds int `json:"leaseDurationSeconds"` AcquireTime metav1.Time `json:"acquireTime"` RenewTime metav1.Time `json:"renewTime"` LeaderTransitions int `json:"leaderTransitions"` }
资源锁接口 client-go/tools/leaderelection/resourcelock/interface.go
目前有三种实现 configmaplock,endpointlock,leaselock
// Interface offers a common interface for locking on arbitrary // resources used in leader election. The Interface is used // to hide the details on specific implementations in order to allow // them to change over time. This interface is strictly for use // by the leaderelection code. type Interface interface { // Get returns the LeaderElectionRecord Get() (*LeaderElectionRecord, error) // Create attempts to create a LeaderElectionRecord Create(ler LeaderElectionRecord) error // Update will update and existing LeaderElectionRecord Update(ler LeaderElectionRecord) error // RecordEvent is used to record events RecordEvent(string) // Identity will return the locks Identity Identity() string // Describe is used to convert details on current resource lock // into a string Describe() string }
锁的获取和使用
// 入口 Run starts the leader election loop func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) le.renew(ctx) }
两个调用方法 acquire 和 renew
// acquire函数通过 wait.JitterUntil 定期调用 tryAcquireOrRenew 如果成功获取锁返回true; 如果ctx信号done返回false,否则会以 RetryPeriod 间隔不断尝试直到成功 func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) wait.JitterUntil(func() { succeeded = le.tryAcquireOrRenew() le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded }
renew在成功获取锁之后调用,它会持续更新资源锁数据,确保继续持有锁,保存leadership
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { done := make(chan bool, 1) go func() { defer close(done) done <- le.tryAcquireOrRenew() }() select { case <-timeoutCtx.Done(): return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err()) case result := <-done: return result, nil } }, timeoutCtx.Done()) // 判断是否出现了 leader 的切换 le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { klog.V(5).Infof("successfully renewed lease %v", desc) return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { le.release() } }
如果发生 leader 切换,调用 Callbacks 的 OnNewLeader 方法
func (le *LeaderElector) maybeReportTransition() { if le.observedRecord.HolderIdentity == le.reportedLeader { return } le.reportedLeader = le.observedRecord.HolderIdentity if le.config.Callbacks.OnNewLeader != nil { go le.config.Callbacks.OnNewLeader(le.reportedLeader) } }
tryAcquireOrRenew两个功能:获取锁; 已经持有锁更新锁,确保锁不被抢走
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. func (le *LeaderElector) tryAcquireOrRenew() bool { now := metav1.Now() // 保存在 Endpoint 中 annotation 的值,每个节点将 HolderIdentity 设置为自己 // RenewTime 和 AcquireTime 会在修正后保存到 API Server leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1.获取或创建 ElectionRecord oldLeaderElectionRecord, err := le.config.Lock.Get() // 没有保存记录,处理 error if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } // 记录不存在则创建一条新的记录 if err = le.config.Lock.Create(leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false } // 创建记录成功并成功获得锁,返回true le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true } // 2. 正常获取到锁记录, 检查锁持有者和更新时间 if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { // 记录锁持有者 le.observedRecord = *oldLeaderElectionRecord le.observedTime = le.clock.Now() } // 如果当前锁持有者并非自己 && 距离上次观察时间小于租约期,认为锁正在被他人持有并且还没到期,返回 false if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } // 3.更新资源 annotation 内容 if le.IsLeader() { // 如果持有锁,默认值修改为之前的获取时间和 leader 切换次数 leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { // leader 切换次数 +1 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } // 更新锁资源对象 if err = le.config.Lock.Update(leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true }
总结
简单介绍了kubernetes调度和Master选主机制,源码值得反复阅读,其中一些简单又另辟蹊径的设计思路非常值得借鉴。于kuberntes庞大的代码体系,我也只窥探到冰山一角,在知识面前,保持空杯心态,持续学习,持续进步