1. 前言
最近需要做一下HPA相关的功能,工欲善其事必先利其器,那还是fake source code吧。
2. MetricsClient
HPA
的弹性伸缩都是得基于采集到的指标进行判断的,而MetricsClient
就是获取指标的接口
pkg/controller/podautoscaler/metrics/interfaces.go
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for the specified named container in all pods matching the specified selector in the given namespace and when
// the container is an empty string it returns the sum of all the container metrics.
// GetResourceMetric是获取资源的指标,可以指定资源名称,命名空间,容器名,如果容器名传入为空,会返回所有容器的指标总和
GetResourceMetric(ctx context.Context, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
// 与上一个方法区别在于可以通过metricName
GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
// 获取对象的指标值
GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)
// GetExternalMetric gets all the values of a given external metric
// that match the specified selector.
// 获取外部度量指标
GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}
从上面的接口定义来说,我们可以将MetricsClient大致分为三类
API 组 metrics.k8s.io
自定义度量指标客户端 集群外部度量指标客户端
// restMetricsClient is a client which supports fetching
// metrics from the resource metrics API, the
// custom metrics API and the external metrics API.
type restMetricsClient struct {
*resourceMetricsClient
*customMetricsClient
*externalMetricsClient
}
cmd/kube-controller-manager/app/autoscaling.go#startHPAControllerWithRESTClient
func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// ...省略部分代码
// 创建 metrics 客户端
metricsClient := metrics.NewRESTMetricsClient(
// ① 集群内置度量指标客户端
resourceclient.NewForConfigOrDie(clientConfig),
// ② 自定义度量指标客户端
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
// ③ 集群外部度量指标客户端
external_metrics.NewForConfigOrDie(clientConfig),
)
// ...
}
3. HPA Controller注册
HPA的Controller也是注册在ControllerManager当中的。
controllers["horizontalpodautoscaling"] = startHPAController
3.1 startHPAController
startHPAController
实际上调用了startHPAControllerWithRESTClient
,上面所说的metricsClient
也是在startHPAControllerWithRESTClient
进行实例化之后,传入startHPAControllerWithMetricsClient
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
return nil, false, nil
}
return startHPAControllerWithRESTClient(ctx, controllerContext)
}
func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
// invalidate the discovery information roughly once per resync interval our API
// information is *at most* two resync intervals old.
go custom_metrics.PeriodicallyInvalidate(
apiVersionsGetter,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
ctx.Done())
// 初始化
metricsClient := metrics.NewRESTMetricsClient(
resourceclient.NewForConfigOrDie(clientConfig),
custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
external_metrics.NewForConfigOrDie(clientConfig),
)
return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
}
cmd/kube-controller-manager/app/autoscaling.go#startHPAControllerWithMetricsClient
func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
// we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
// so we want to re-fetch every time when we actually ask for it
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
return nil, false, err
}
go podautoscaler.NewHorizontalController(
// v1.CoreV1Interface
// 参考 "k8s.io/client-go/kubernetes/typed/core/v1" 包
hpaClient.CoreV1(),
// ① Scale 客户端(用来实现类似kubectl scale的功能)
scaleClient,
// ② HorizontalPodAutoscalersGetter
hpaClient.AutoscalingV2(),
// RESTMapper 用于处理 GKV 与 GKR 之间的映射关系
controllerContext.RESTMapper,
// ③ Metrics 客户端
metricsClient,
// ④ HPA Informer
controllerContext.InformerFactory.Autoscaling().V2().HorizontalPodAutoscalers(),
// ⑤ Pod Informer
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
// 注意这个后面还调用了Run方法,也就是上面的各种参数
).Run(ctx, int(controllerContext.ComponentConfig.HPAController.ConcurrentHorizontalPodAutoscalerSyncs))
return nil, true, nil
}
4. HPA Controller启动
Pod副本数的计算算法,这对于HPA来说是最为核心的位置,根据前文提到的各种已经初始化的组件,我们可以大概得知就是使用metricsClient
获取到指标,然后根据HPA所声明的阈值进行计算。
// NewHorizontalController creates a new HorizontalController.
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer scaleclient.ScalesGetter,
hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
mapper apimeta.RESTMapper,
metricsClient metricsclient.MetricsClient,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
podInformer coreinformers.PodInformer,
resyncPeriod time.Duration,
downscaleStabilisationWindow time.Duration,
tolerance float64,
cpuInitializationPeriod,
delayOfInitialReadinessStatus time.Duration,
) *HorizontalController {
......
// 只保留核心,也就是副本数计算算法的调用
replicaCalc := NewReplicaCalculator(
metricsClient,
hpaController.podLister,
tolerance,
cpuInitializationPeriod,
delayOfInitialReadinessStatus,
)
hpaController.replicaCalc = replicaCalc
return hpaController
}
4.1 Run
// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
klog.Infof("Starting HPA controller")
defer klog.Infof("Shutting down HPA controller")
if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
return
}
// 使用协程来真正地执行任务
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, a.worker, time.Second)
}
<-ctx.Done()
}
上面的workders大小,我们是可以通过参数设置的,这也是为了提高HPA扩缩容的性能,然后这也会让HPA Controller所消耗的CPU、内存等资源更多,可以看看官方对于这个参数的描述。
fs.Int32Var(&o.ConcurrentHorizontalPodAutoscalerSyncs, "concurrent-horizontal-pod-autoscaler-syncs", o.ConcurrentHorizontalPodAutoscalerSyncs, "The number of horizontal pod autoscaler objects that are allowed to sync concurrently. Larger number = more responsive horizontal pod autoscaler objects processing, but more CPU (and network) load.")
这里就是Controller的常用套路,就不用详细解释了吧。。。
// 处理工作任务
func (a *HorizontalController) worker(ctx context.Context) {
for a.processNextWorkItem(ctx) {
}
klog.Infof("horizontal pod autoscaler controller worker shutting down")
}
// 依次从队列当中取出元素进行处理
func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
key, quit := a.queue.Get()
if quit {
return false
}
defer a.queue.Done(key)
deleted, err := a.reconcileKey(ctx, key.(string))
if err != nil {
utilruntime.HandleError(err)
}
// Add request processing HPA to queue with resyncPeriod delay.
// Requests are always added to queue with resyncPeriod delay. If there's already request
// for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
// in queue so HPAs are processed every resyncPeriod.
// Request is added here just in case last resync didn't insert request into the queue. This
// happens quite often because there is race condition between adding request after resyncPeriod
// and removing them from queue. Request can be added by resync before previous request is
// removed from queue. If we didn't add request here then in this case one request would be dropped
// and HPA would processed after 2 x resyncPeriod.
if !deleted {
a.queue.AddRateLimited(key)
}
return true
}
// 这里依然是一些常规的处理,取出具体的资源等等~
func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
.....
return false, a.reconcileAutoscaler(ctx, hpa, key)
}
4.2 reconcileAutoscaler
最后调用的reconcileAutoscaler
就是我们本次扩缩容算法的核心。
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()
.....
// 如果当前的副本数为0,但是最小的副本数要求却不为0,不进行启动自动扩缩容计算
if scale.Spec.Replicas == 0 && minReplicas != 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
// 如果当前的副本数大于HPA的最大副本数,直接将最大副本数赋值给期望副本数
} else if currentReplicas > hpa.Spec.MaxReplicas {
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
// 如果当前副本数小于HPA的最小副本数,直接将最小副本数赋值给期望副本数
} else if currentReplicas < minReplicas {
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = minReplicas
} else {
var metricTimestamp time.Time
// 除去以上的其他情况,就需要进行比较指标值进行计算了。
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)
rescaleMetric := ""
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
}
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
// Kubernetes 1.18之后开始支持behavior字段,这是为了防止HPA经常扩缩容Pod
// 如果声明了behavior字段,可以让Pod的数量在一个稳定的窗口
if hpa.Spec.Behavior == nil {
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
} else {
desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
}
rescale = desiredReplicas != currentReplicas
}
....
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}
5. HPA副本数计算算法
5.1 computeReplicasForMetrics
可以看到排除掉一些边缘情况之后,真正的计算逻辑其实就是调用了computeReplicasForMetrics
,这里metric加了s也是有原因的,下面我们简单看一个HPA的示例:
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: php-apache
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: php-apache
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 50
- type: Pods
pods:
metric:
name: packets-per-second
target:
type: AverageValue
averageValue: 1k
- type: Object
object:
metric:
name: requests-per-second
describedObject:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
name: main-route
target:
type: Value
value: 10k
通过上面示例HPA yaml我们可以看到,其实metrics是一个数组,但是HPA的处理这个数组的逻辑就是,在这里的所定义的指标当中,计算一个最大的作为扩容量,思想也是很简单的,我已经满足了需要扩容的最大副本数,那么对于其他指标的阈值肯定也是能满足的。
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
....
// 遍历定义的指标,然后在这个列表当中,取需要扩容的最大副本数
for i, metricSpec := range metricSpecs {
// 根据type类型计算需要扩缩容的数量
replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
if err != nil {
if invalidMetricsCount <= 0 {
invalidMetricCondition = condition
invalidMetricError = err
}
invalidMetricsCount++
}
// 记录最大的需要扩缩容的数量
if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
timestamp = timestampProposal
replicas = replicaCountProposal
metric = metricNameProposal
}
}
// If all metrics are invalid or some are invalid and we would scale down,
// return an error and set the condition of the hpa based on the first invalid metric.
// Otherwise set the condition as scaling active as we're going to scale
if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
}
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
return replicas, metric, statuses, timestamp, nil
}
5.2 computeReplicasForMetric
computeReplicasForMetric
主要是根据不同的type进行副本数的计算,关于type的常量声明
const (
// ObjectMetricSourceType is a metric describing a kubernetes object
// (for example, hits-per-second on an Ingress object).
// 专门用来描述k8s的内置对象
ObjectMetricSourceType MetricSourceType = "Object"
// PodsMetricSourceType is a metric describing each pod in the current scale
// target (for example, transactions-processed-per-second). The values
// will be averaged together before being compared to the target value.
// pod平均期望值
PodsMetricSourceType MetricSourceType = "Pods"
// ResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
// 每个pod中资源,如CPU或内存
ResourceMetricSourceType MetricSourceType = "Resource"
// ContainerResourceMetricSourceType is a resource metric known to Kubernetes, as
// specified in requests and limits, describing a single container in each pod in the current
// scale target (e.g. CPU or memory). Such metrics are built in to
// Kubernetes, and have special scaling options on top of those available
// to normal per-pod metrics (the "pods" source).
// 每个容器的资源,如CPU或内存
ContainerResourceMetricSourceType MetricSourceType = "ContainerResource"
// ExternalMetricSourceType is a global metric that is not associated
// with any Kubernetes object. It allows autoscaling based on information
// coming from components running outside of cluster
// (for example length of queue in cloud messaging service, or
// QPS from loadbalancer running outside of cluster).
// External类型表示的是一种全局的度量,和k8s对象无关,主要依赖外部集群提供信息
ExternalMetricSourceType MetricSourceType = "External"
)
// Computes the desired number of replicas for a specific hpa and metric specification,
// returning the metric status and a proposed condition to be set on the HPA object.
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
// 使用switch来决策不同的type的计算方式
switch spec.Type {
// 如果是K8s的对象,例如Ingress等
case autoscalingv2.ObjectMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
// Pod的度量指标
case autoscalingv2.PodsMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
// Pod
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
// Resource的度量指标
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
}
// 容器的资源度量指标
case autoscalingv2.ContainerResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
}
// 外部的度量指标
case autoscalingv2.ExternalMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
err = fmt.Errorf(errMsg)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
每个类型的计算核心逻辑大同小异,我们就用最为常见的PodsMetricSourceType
来读一遍逻辑吧
// computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType.
func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
// 计算需要扩缩容的数量
replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
if err != nil {
condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, timestampProposal, "", condition, err
}
*status = autoscalingv2.MetricStatus{
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricStatus{
Metric: autoscalingv2.MetricIdentifier{
Name: metricSpec.Pods.Metric.Name,
Selector: metricSpec.Pods.Metric.Selector,
},
Current: autoscalingv2.MetricValueStatus{
AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
},
},
}
return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
// GetMetricReplicas calculates the desired replica count based on a target metric usage
// (as a milli-value) for pods matching the given selector in the given namespace, and the
// current replica count
func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
if err != nil {
return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
}
// 结合度量指标数据来计算希望扩缩容的数量是多少
replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, namespace, selector, v1.ResourceName(""))
return replicaCount, usage, timestamp, err
}
5.3 calcPlainMetricReplicas
// calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics.
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUsage int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, usage int64, err error) {
podList, err := c.podLister.Pods(namespace).List(selector)
if err != nil {
return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
}
if len(podList) == 0 {
return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
}
.....
// re-run the usage calculation with our new numbers
// 计算一个新的usageRatio,这里传入的metrics其实是将上述的移出的Pod缺失的指标重新填充进来的。
newUsageRatio, _ := metricsclient.GetMetricUsageRatio(metrics, targetUsage)
if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
// return the current replicas if the change would be too small,
// or if the new usage ratio would cause a change in scale direction
return currentReplicas, usage, nil
}
newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics))))
// 这里是对扩缩容的场景做一次校验
// 1.如果比例小于1.0,证明是负载不高,但是期望新的副本数又大于当前的,不符合修改副本数的条件
// 2.如果比例大于1.0,证明负载是较高的,但是期望新的副本数又小于之前的副本数,这很明显也不符合修改副本数的条件
if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
// return the current replicas if the change of metrics length would cause a change in scale direction
return currentReplicas, usage, nil
}
// return the result, where the number of replicas considered is
// however many replicas factored into our calculation
return newReplicas, usage, nil
}
GetMetricUsageRatio
是将全部的指标值加起来,然后除以指标的个数,先得到每个平均值,然后再除以目标的使用率,这样就知道与目标的差距倍数关系
// GetMetricUsageRatio takes in a set of metrics and a target usage value,
// and calculates the ratio of desired to actual usage
// (returning that and the actual usage)
func GetMetricUsageRatio(metrics PodMetricsInfo, targetUsage int64) (usageRatio float64, currentUsage int64) {
metricsTotal := int64(0)
for _, metric := range metrics {
metricsTotal += metric.Value
}
currentUsage = metricsTotal / int64(len(metrics))
return float64(currentUsage) / float64(targetUsage), currentUsage
}
6. 总结
整个HPA源码当中比较难的部分就是需要计算副本数计算的逻辑,获取到newReplicas
之后,剩下的逻辑无非就是跟我们使用kubectl scale 的逻辑一样了。