百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 博客教程 > 正文

浅析Kubernetes HPA源码

connygpt 2024-08-20 13:54 51 浏览

1. 前言

最近需要做一下HPA相关的功能,工欲善其事必先利其器,那还是fake source code吧。

2. MetricsClient

HPA的弹性伸缩都是得基于采集到的指标进行判断的,而MetricsClient就是获取指标的接口

pkg/controller/podautoscaler/metrics/interfaces.go

// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for the specified named container in all pods matching the specified selector in the given namespace and when
// the container is an empty string it returns the sum of all the container metrics.
// GetResourceMetric是获取资源的指标,可以指定资源名称,命名空间,容器名,如果容器名传入为空,会返回所有容器的指标总和
GetResourceMetric(ctx context.Context, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error)

// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
// 与上一个方法区别在于可以通过metricName
GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)

// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
// 获取对象的指标值
GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)

// GetExternalMetric gets all the values of a given external metric
// that match the specified selector.
// 获取外部度量指标
GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}

从上面的接口定义来说,我们可以将MetricsClient大致分为三类

  • API 组metrics.k8s.io
  • 自定义度量指标客户端
  • 集群外部度量指标客户端
// restMetricsClient is a client which supports fetching
// metrics from the resource metrics API, the
// custom metrics API and the external metrics API.
type restMetricsClient struct {
*resourceMetricsClient
*customMetricsClient
*externalMetricsClient
}

cmd/kube-controller-manager/app/autoscaling.go#startHPAControllerWithRESTClient

func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// ...省略部分代码
// 创建 metrics 客户端
metricsClient := metrics.NewRESTMetricsClient(
// ① 集群内置度量指标客户端
resourceclient.NewForConfigOrDie(clientConfig),
// ② 自定义度量指标客户端
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
// ③ 集群外部度量指标客户端
external_metrics.NewForConfigOrDie(clientConfig),
)
// ...
}

3. HPA Controller注册

HPA的Controller也是注册在ControllerManager当中的。

controllers["horizontalpodautoscaling"] = startHPAController

3.1 startHPAController

startHPAController实际上调用了startHPAControllerWithRESTClient,上面所说的metricsClient也是在startHPAControllerWithRESTClient进行实例化之后,传入startHPAControllerWithMetricsClient

func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return nil, false, nil
}

return startHPAControllerWithRESTClient(ctx, controllerContext)
}

func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")

apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// invalidate the discovery information roughly once per resync interval our API
// information is *at most* two resync intervals old.
go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Done())
// 初始化
metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig),
)
return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
}

cmd/kube-controller-manager/app/autoscaling.go#startHPAControllerWithMetricsClient

func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")

// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
// so we want to re-fetch every time when we actually ask for it
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
go podautoscaler.NewHorizontalController(
// v1.CoreV1Interface
// 参考 "k8s.io/client-go/kubernetes/typed/core/v1" 包
hpaClient.CoreV1(),
// ① Scale 客户端(用来实现类似kubectl scale的功能)
scaleClient,
// ② HorizontalPodAutoscalersGetter
hpaClient.AutoscalingV2(),
// RESTMapper 用于处理 GKV 与 GKR 之间的映射关系
controllerContext.RESTMapper,
// ③ Metrics 客户端
metricsClient,
// ④ HPA Informer
controllerContext.InformerFactory.Autoscaling().V2().HorizontalPodAutoscalers(),
// ⑤ Pod Informer
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
// 注意这个后面还调用了Run方法,也就是上面的各种参数
).Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs))
return nil, true, nil
}

4. HPA Controller启动

Pod副本数的计算算法,这对于HPA来说是最为核心的位置,根据前文提到的各种已经初始化的组件,我们可以大概得知就是使用metricsClient获取到指标,然后根据HPA所声明的阈值进行计算。

// NewHorizontalController creates a new HorizontalController.
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper,
metricsClient metricsclient.MetricsClient,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,

)
*HorizontalController
{
......
// 只保留核心,也就是副本数计算算法的调用
replicaCalc := NewReplicaCalculator(
metricsClient,
hpaController.podLister,
tolerance,
cpuInitializationPeriod,
delayOfInitialReadinessStatus,
)
hpaController.replicaCalc = replicaCalc

return hpaController
}

4.1 Run

// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()

klog.Infof("Starting HPA controller")
defer klog.Infof("Shutting down HPA controller")

if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
return
}
// 使用协程来真正地执行任务
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, a.worker, time.Second)
}

<-ctx.Done()
}

上面的workders大小,我们是可以通过参数设置的,这也是为了提高HPA扩缩容的性能,然后这也会让HPA Controller所消耗的CPU、内存等资源更多,可以看看官方对于这个参数的描述。

fs.Int32Var(&o.ConcurrentHorizontalPodAutoscalerSyncs, "concurrent-horizontal-pod-autoscaler-syncs", o.ConcurrentHorizontalPodAutoscalerSyncs, "The number of horizontal pod autoscaler objects that are allowed to sync concurrently. Larger number = more responsive horizontal pod autoscaler objects processing, but more CPU (and network) load.")

这里就是Controller的常用套路,就不用详细解释了吧。。。

// 处理工作任务
func (a *HorizontalController) worker(ctx context.Context) {
for a.processNextWorkItem(ctx) {
}
klog.Infof("horizontal pod autoscaler controller worker shutting down")
}

// 依次从队列当中取出元素进行处理
func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
key, quit := a.queue.Get()
if quit {
return false
}
defer a.queue.Done(key)

deleted, err := a.reconcileKey(ctx, key.(string))
if err != nil {
utilruntime.HandleError(err)
}
// Add request processing HPA to queue with resyncPeriod delay.
// Requests are always added to queue with resyncPeriod delay. If there's already request
// for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
// in queue so HPAs are processed every resyncPeriod.
// Request is added here just in case last resync didn't insert request into the queue. This
// happens quite often because there is race condition between adding request after resyncPeriod
// and removing them from queue. Request can be added by resync before previous request is
// removed from queue. If we didn't add request here then in this case one request would be dropped
// and HPA would processed after 2 x resyncPeriod.
if !deleted {
a.queue.AddRateLimited(key)
}

return true
}

// 这里依然是一些常规的处理,取出具体的资源等等~
func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)

.....

return false, a.reconcileAutoscaler(ctx, hpa, key)
}

4.2 reconcileAutoscaler

最后调用的reconcileAutoscaler就是我们本次扩缩容算法的核心。

func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()

.....
// 如果当前的副本数为0,但是最小的副本数要求却不为0,不进行启动自动扩缩容计算
if scale.Spec.Replicas == 0 && minReplicas != 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
// 如果当前的副本数大于HPA的最大副本数,直接将最大副本数赋值给期望副本数
} else if currentReplicas > hpa.Spec.MaxReplicas {
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
// 如果当前副本数小于HPA的最小副本数,直接将最小副本数赋值给期望副本数
} else if currentReplicas < minReplicas {
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = minReplicas
} else {
var metricTimestamp time.Time
// 除去以上的其他情况,就需要进行比较指标值进行计算了。
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}

klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

rescaleMetric := ""
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
}
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
// Kubernetes 1.18之后开始支持behavior字段,这是为了防止HPA经常扩缩容Pod
// 如果声明了behavior字段,可以让Pod的数量在一个稳定的窗口
if hpa.Spec.Behavior == nil {
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
} else {
desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
}
rescale = desiredReplicas != currentReplicas
}

....

a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}

5. HPA副本数计算算法

5.1 computeReplicasForMetrics

可以看到排除掉一些边缘情况之后,真正的计算逻辑其实就是调用了computeReplicasForMetrics,这里metric加了s也是有原因的,下面我们简单看一个HPA的示例:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: php-apache
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: php-apache
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 50
- type: Pods
pods:
metric:
name: packets-per-second
target:
type: AverageValue
averageValue: 1k
- type: Object
object:
metric:
name: requests-per-second
describedObject:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
name: main-route
target:
type: Value
value: 10k

通过上面示例HPA yaml我们可以看到,其实metrics是一个数组,但是HPA的处理这个数组的逻辑就是,在这里的所定义的指标当中,计算一个最大的作为扩容量,思想也是很简单的,我已经满足了需要扩容的最大副本数,那么对于其他指标的阈值肯定也是能满足的。

// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec)
(replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error)
{

....

// 遍历定义的指标,然后在这个列表当中,取需要扩容的最大副本数
for i, metricSpec := range metricSpecs {
// 根据type类型计算需要扩缩容的数量
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])

if err != nil {
if invalidMetricsCount <= 0 {
invalidMetricCondition = condition
invalidMetricError = err
}
invalidMetricsCount++
}
// 记录最大的需要扩缩容的数量
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
}
}

// If all metrics are invalid or some are invalid and we would scale down,
// return an error and set the condition of the hpa based on the first invalid metric.
// Otherwise set the condition as scaling active as we're going to scale
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
}
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
return replicas, metric, statuses, timestamp, nil
}

5.2 computeReplicasForMetric

computeReplicasForMetric主要是根据不同的type进行副本数的计算,关于type的常量声明

const (
// ObjectMetricSourceType is a metric describing a kubernetes object
// (for example, hits-per-second on an Ingress object).
// 专门用来描述k8s的内置对象
ObjectMetricSourceType MetricSourceType = "Object"
// PodsMetricSourceType is a metric describing each pod in the current scale
// target (for example, transactions-processed-per-second). The values
// will be averaged together before being compared to the target value.
// pod平均期望值
PodsMetricSourceType MetricSourceType = "Pods"
// ResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
// 每个pod中资源,如CPU或内存
ResourceMetricSourceType MetricSourceType = "Resource"
// ContainerResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing a single container in each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
// 每个容器的资源,如CPU或内存
ContainerResourceMetricSourceType MetricSourceType = "ContainerResource"
// ExternalMetricSourceType is a global metric that is not associated
// with any Kubernetes object. It allows autoscaling based on information
// coming from components running outside of cluster
// (for example length of queue in cloud messaging service, or
// QPS from loadbalancer running outside of cluster).
// External类型表示的是一种全局的度量,和k8s对象无关,主要依赖外部集群提供信息
ExternalMetricSourceType MetricSourceType = "External"
)
// Computes the desired number of replicas for a specific hpa and metric specification,
// returning the metric status and a proposed condition to be set on the HPA object.
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus)
(replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error)
{
// 使用switch来决策不同的type的计算方式
switch spec.Type {
// 如果是K8s的对象,例如Ingress等
case autoscalingv2.ObjectMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
// Pod的度量指标
case autoscalingv2.PodsMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
// Pod
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
// Resource的度量指标
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
}
// 容器的资源度量指标
case autoscalingv2.ContainerResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
}
// 外部的度量指标
case autoscalingv2.ExternalMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
err = fmt.Errorf(errMsg)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

每个类型的计算核心逻辑大同小异,我们就用最为常见的PodsMetricSourceType来读一遍逻辑吧

// computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType.
func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
// 计算需要扩缩容的数量
replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
if err != nil {
condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, timestampProposal, "", condition, err
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricStatus{
Metric: autoscalingv2.MetricIdentifier{
Name: metricSpec.Pods.Metric.Name,
Selector: metricSpec.Pods.Metric.Selector,
},
Current: autoscalingv2.MetricValueStatus{
AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
},
},
}

return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

// GetMetricReplicas calculates the desired replica count based on a target metric usage
// (as a milli-value) for pods matching the given selector in the given namespace, and the
// current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
}
// 结合度量指标数据来计算希望扩缩容的数量是多少
replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, namespace, selector, v1.ResourceName(""))
return replicaCount, usage, timestamp, err
}

5.3 calcPlainMetricReplicas

// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUsage int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, usage int64, err error) {

podList, err := c.podLister.Pods(namespace).List(selector)
if err != nil {
return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}

if len(podList) == 0 {
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}

.....

// re-run the usage calculation with our new numbers
// 计算一个新的usageRatio,这里传入的metrics其实是将上述的移出的Pod缺失的指标重新填充进来的。
newUsageRatio, _ := metricsclient.GetMetricUsageRatio(metrics, targetUsage)

if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, usage, nil
}

newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics))))
// 这里是对扩缩容的场景做一次校验
// 1.如果比例小于1.0,证明是负载不高,但是期望新的副本数又大于当前的,不符合修改副本数的条件
// 2.如果比例大于1.0,证明负载是较高的,但是期望新的副本数又小于之前的副本数,这很明显也不符合修改副本数的条件
if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
// return the current replicas if the change of metrics length would cause a change in scale direction
return currentReplicas, usage, nil
}

// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return newReplicas, usage, nil
}

GetMetricUsageRatio是将全部的指标值加起来,然后除以指标的个数,先得到每个平均值,然后再除以目标的使用率,这样就知道与目标的差距倍数关系

// GetMetricUsageRatio takes in a set of metrics and a target usage value,
// and calculates the ratio of desired to actual usage
// (returning that and the actual usage)
func GetMetricUsageRatio(metrics PodMetricsInfo, targetUsage int64) (usageRatio float64, currentUsage int64) {
metricsTotal := int64(0)
for _, metric := range metrics {
metricsTotal += metric.Value
}

currentUsage = metricsTotal / int64(len(metrics))

return float64(currentUsage) / float64(targetUsage), currentUsage
}

6. 总结

整个HPA源码当中比较难的部分就是需要计算副本数计算的逻辑,获取到newReplicas之后,剩下的逻辑无非就是跟我们使用kubectl scale 的逻辑一样了。


相关推荐

3分钟让你的项目支持AI问答模块,完全开源!

hello,大家好,我是徐小夕。之前和大家分享了很多可视化,零代码和前端工程化的最佳实践,今天继续分享一下最近开源的Next-Admin的最新更新。最近对这个项目做了一些优化,并集成了大家比较关注...

干货|程序员的副业挂,12个平台分享

1、D2adminD2Admin是一个完全开源免费的企业中后台产品前端集成方案,使用最新的前端技术栈,小于60kb的本地首屏js加载,已经做好大部分项目前期准备工作,并且带有大量示例代码,助...

Github标星超200K,这10个可视化面板你知道几个

在Github上有很多开源免费的后台控制面板可以选择,但是哪些才是最好、最受欢迎的可视化控制面板呢?今天就和大家推荐Github上10个好看又流行的可视化面板:1.AdminLTEAdminLTE是...

开箱即用的炫酷中后台前端开源框架第二篇

#头条创作挑战赛#1、SoybeanAdmin(1)介绍:SoybeanAdmin是一个基于Vue3、Vite3、TypeScript、NaiveUI、Pinia和UnoCSS的清新优...

搭建React+AntDeign的开发环境和框架

搭建React+AntDeign的开发环境和框架随着前端技术的不断发展,React和AntDesign已经成为越来越多Web应用程序的首选开发框架。React是一个用于构建用户界面的JavaScrip...

基于.NET 5实现的开源通用权限管理平台

??大家好,我是为广大程序员兄弟操碎了心的小编,每天推荐一个小工具/源码,装满你的收藏夹,每天分享一个小技巧,让你轻松节省开发效率,实现不加班不熬夜不掉头发,是我的目标!??今天小编推荐一款基于.NE...

StreamPark - 大数据流计算引擎

使用Docker完成StreamPark的部署??1.基于h2和docker-compose进行StreamPark部署wgethttps://raw.githubusercontent.com/a...

教你使用UmiJS框架开发React

1、什么是Umi.js?umi,中文可发音为乌米,是一个可插拔的企业级react应用框架。你可以将它简单地理解为一个专注性能的类next.js前端框架,并通过约定、自动生成和解析代码等方式来辅助...

简单在线流程图工具在用例设计中的运用

敏捷模式下,测试团队的用例逐渐简化以适应快速的发版节奏,大家很早就开始运用思维导图工具比如xmind来编写测试方法、测试点。如今不少已经不少利用开源的思维导图组件(如百度脑图...)来构建测试测试...

【开源分享】神奇的大数据实时平台框架,让Flink&amp;Spark开发更简单

这是一个神奇的框架,让Flink|Spark开发更简单,一站式大数据实时平台!他就是StreamX!什么是StreamX大数据技术如今发展的如火如荼,已经呈现百花齐放欣欣向荣的景象,实时处理流域...

聊聊规则引擎的调研及实现全过程

摘要本期主要以规则引擎业务实现为例,陈述在陌生业务前如何进行业务深入、调研、技术选型、设计及实现全过程分析,如果你对规则引擎不感冒、也可以从中了解一些抽象实现过程。诉求从硬件采集到的数据提供的形式多种...

【开源推荐】Diboot 2.0.5 发布,自动化开发助理

一、前言Diboot2.0.5版本已于近日发布,在此次发布中,我们新增了file-starter组件,完善了iam-starter组件,对core核心进行了相关优化,让devtools也支持对IAM...

微软推出Copilot Actions,使用人工智能自动执行重复性任务

IT之家11月19日消息,微软在今天举办的Ignite大会上宣布了一系列新功能,旨在进一步提升Microsoft365Copilot的智能化水平。其中最引人注目的是Copilot...

Electron 使用Selenium和WebDriver

本节我们来学习如何在Electron下使用Selenium和WebDriver。SeleniumSelenium是ThoughtWorks提供的一个强大的基于浏览器的开源自动化测试工具...

Quick &#39;n Easy Web Builder 11.1.0设计和构建功能齐全的网页的工具

一个实用而有效的应用程序,能够让您轻松构建、创建和设计个人的HTML网站。Quick'nEasyWebBuilder是一款全面且轻巧的软件,为用户提供了一种简单的方式来创建、编辑...