kube-scheduler 源码分析系列重点作为代码流程梳理,对于 kube-scheduler 的文档还请详见[kube-scheduler](https://kubernetes.io/zh/docs/concepts/scheduling-eviction/kube-scheduler/)介绍。
如下代码分析对于细节的处理会跳过,只看主干
代码入口分析
func main() {
rand.Seed(time.Now().UnixNano())
// @1
command := app.NewSchedulerCommand()
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//@2
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
- @1 app.NewSchedulerCommand() 主要为命令行注册对应的 options, 借助cobra
库进行 Command的对应 parse 和初始化工作。
- @2 command.Execute()实际上是服务启动入口,继续 step in
// ExecuteContext is the same as Execute(), but sets the ctx on the command.
// Retrieve ctx by calling cmd.Context() inside your *Run lifecycle functions.
func (c *Command) ExecuteContext(ctx context.Context) error {
c.ctx = ctx
return c.Execute()
}
// Execute uses the args (os.Args[1:] by default)
// and run through the command tree finding appropriate matches
// for commands and then corresponding flags.
func (c *Command) Execute() error {
_, err := c.ExecuteC()
return err
}
func (c *Command) ExecuteC() (cmd *Command, err error) {
.....
// initialize help as the last point possible to allow for user
// overriding
c.InitDefaultHelpCmd()
.....
// initialize the hidden command to be used for bash completion
c.initCompleteCmd(args)
// @3
err = cmd.execute(flags)
.....
return cmd, err
}
此时我们通过函数名发现 @3 部分是真正执行的部分,继续深入看看:
func (c *Command) execute(a []string) (err error) {
.....
// initialize help and version flag at the last point possible to allow for user
// overriding
c.InitDefaultHelpFlag()
c.InitDefaultVersionFlag()
.....
c.preRun()
.....
if c.PreRunE != nil {
if err := c.PreRunE(c, argWoFlags); err != nil {
return err
}
} else if c.PreRun != nil {
c.PreRun(c, argWoFlags)
}
if err := c.validateRequiredFlags(); err != nil {
return err
}
if c.RunE != nil {
if err := c.RunE(c, argWoFlags); err != nil {
return err
}
} else {
c.Run(c, argWoFlags)
}
if c.PostRunE != nil {
if err := c.PostRunE(c, argWoFlags); err != nil {
return err
}
} else if c.PostRun != nil {
c.PostRun(c, argWoFlags)
}
.....
}
在此段库函数代码中我们会发现一堆与Run相关的函数,此部分库函数我也未做详细学习,但是根据名称我们可以理解实际上是做真正运行的一些前置工作,由于与 kube-scheduler 核心机制没有太大的干系,可以先大概了解下,不能忘了此次分析的主要矛盾。
既然我们知道了最终是 Command调用了,结合 kube-scheduler我们是怎么和 上下文关联起来的呢?这里我们回头再继续看下 app.NewSchedulerCommand():
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
.....
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
.....
}
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
.....
return Run(ctx, cc, sched)
}
// 此处才是kube-scheduler真正意义上执行的具体逻辑
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
.....
}
入口启动流程梳理
以上为主要的启动流程分析,主要还是围绕着 Command运行的入口流程进行分析。下面我们重点关注kube-scheduler 初始化及真正运行具体做了哪些事情。
kube-scheduler 创建及运行
- 入口
在runCommand这个函数
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
.....
cc, sched, err := Setup(ctx, opts, registryOptions...)
.....
return Run(ctx, cc, sched)
}
- Setup创建工作
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// 获取 config 对象
// Get the completed config
cc := c.Complete()
// 初始化 outOfTreeRegistry, 后续基于 Scheduler Framwork 自定义的 插件注册关联的地方,后续再介绍 Scheduler Framework
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
// 创建事件记录器
recorderFactory := getRecorderFactory(&cc)
// 初始化 scheduler
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}
return &cc, sched, nil
至此 scheduler 初始化完成,初始化结束
- Run 服务运行
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// 根据提供的名称创建Configz
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// 事件广播准备
// Prepare the event broadcaster.
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
// 创建健康检查数组
// Setup healthz checks.
var checks []healthz.HealthChecker
// 判断是否需要进行leader选举
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
// 进行健康检查
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// 运行所有informer
// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// 在调度前等待缓存同步
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 根据是否开启选举开关进行选举逻辑
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 运行服务
// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
至此 kube-scheduler 算是真正意义上的运行了。
scheduler初始化及运行流程梳理
至此kube-scheduler入口启动过程算是结束了,个人理解,如若有不对地方还望多多指教。
欢迎访问个人站点 https://kiragoo.github.io/