Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented batch processing for check capacity provisioning class #7283

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ type AutoscalingOptions struct {
ProvisioningRequestEnabled bool
// AsyncNodeGroupsEnabled tells if CA creates/deletes node groups asynchronously.
AsyncNodeGroupsEnabled bool
// CheckCapacityBatchProcessing is used to enable/disable batch processing of check capacity provisioning class
CheckCapacityBatchProcessing bool
// MaxBatchSize is the maximum number of provisioning requests to process in a single batch
MaxBatchSize int
// BatchTimebox is the maximum time to spend processing a batch of provisioning requests
BatchTimebox time.Duration
}

// KubeClientOptions specify options for kube client
Expand Down
38 changes: 26 additions & 12 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,14 @@ var (
"--max-graceful-termination-sec flag should not be set when this flag is set. Not setting this flag will use unordered evictor by default."+
"Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+
"Eg. flag usage: '10000:20,1000:100,0:60'")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
checkCapacityBatchProcessing = flag.Bool("check-capacity-batch-processing", false, "Whether to enable batch processing for check capacity requests.")
maxBatchSize = flag.Int("max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
batchTimebox = flag.Duration("batch-timebox", 5*time.Minute, "Maximum time to process a batch of provisioning requests.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -447,6 +450,9 @@ func createAutoscalingOptions() config.AutoscalingOptions {
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
AsyncNodeGroupsEnabled: *asyncNodeGroupsEnabled,
CheckCapacityBatchProcessing: *checkCapacityBatchProcessing,
MaxBatchSize: *maxBatchSize,
BatchTimebox: *batchTimebox,
}
}

Expand Down Expand Up @@ -513,20 +519,28 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
if err != nil {
return nil, err
}

injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)
var provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.CheckCapacityBatchProcessing {
klog.Infof("Batch processing for check capacity requests is enabled. Passing provisioning request injector to check capacity processor.")
provisioningRequestPodsInjector = injector
}

provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
checkcapacity.New(client, provisioningRequestPodsInjector),
besteffortatomic.New(client),
})
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)

podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand Down Expand Up @@ -136,7 +135,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
Expand Down
27 changes: 27 additions & 0 deletions cluster-autoscaler/processors/provreq/testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/utils/clock/testing"
)

// NewFakePodsInjector creates a new instance of ProvisioningRequestPodsInjector with the given client and clock for testing.
func NewFakePodsInjector(client *provreqclient.ProvisioningRequestClient, clock *testing.FakePassiveClock) *ProvisioningRequestPodsInjector {
return &ProvisioningRequestPodsInjector{client: client, clock: clock}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,47 @@ package checkcapacity

import (
"fmt"
"sort"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type checkCapacityProvClass struct {
context *context.AutoscalingContext
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
context *context.AutoscalingContext
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
batchProcessing bool
maxBatchSize int
batchTimebox time.Duration
provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector
}

// New create check-capacity scale-up mode.
func New(
client *provreqclient.ProvisioningRequestClient,
provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector,
) *checkCapacityProvClass {
return &checkCapacityProvClass{client: client}
return &checkCapacityProvClass{client: client, provisioningRequestPodsInjector: provisioningRequestPodsInjector}
}

func (o *checkCapacityProvClass) Initialize(
Expand All @@ -61,6 +71,9 @@ func (o *checkCapacityProvClass) Initialize(
) {
o.context = autoscalingContext
o.injector = injector
o.batchProcessing = autoscalingContext.CheckCapacityBatchProcessing
o.batchTimebox = autoscalingContext.BatchTimebox
o.maxBatchSize = autoscalingContext.MaxBatchSize
}

// Provision return if there is capacity in the cluster for pods from ProvisioningRequest.
Expand All @@ -70,28 +83,74 @@ func (o *checkCapacityProvClass) Provision(
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if len(unschedulablePods) == 0 {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
combinedStatus := NewCombinedStatusSet()
provisioningRequestsProcessed := 0
startTime := time.Now()

prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods)
prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassCheckCapacity)
if len(prs) == 0 {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
// Pick 1 ProvisioningRequest.
pr := prs[0]
o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()

scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods, pr)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
}
if scaleUpIsSuccessful {
return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil
for len(unschedulablePods) > 0 {
prs := provreqclient.ProvisioningRequestsForPods(o.client, unschedulablePods)
prs = provreqclient.FilterOutProvisioningClass(prs, v1.ProvisioningClassCheckCapacity)
if len(prs) == 0 {
break
}

// Pick 1 ProvisioningRequest.
pr := prs[0]

scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods, pr)
if err != nil {
st, err := status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))

if o.batchProcessing {
combinedStatus.Add(st)
} else {
return st, err
}
}
if scaleUpIsSuccessful {
combinedStatus.Add(&status.ScaleUpStatus{Result: status.ScaleUpSuccessful})
} else {
combinedStatus.Add(&status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable})
}

if !o.batchProcessing {
break
}

if o.provisioningRequestPodsInjector == nil {
klog.Errorf("ProvisioningRequestPodsInjector is not set, falling back to non-batch processing")
break
}

if o.maxBatchSize <= 1 {
klog.Errorf("MaxBatchSize is set to %d, falling back to non-batch processing", o.maxBatchSize)
break
}

provisioningRequestsProcessed++
if provisioningRequestsProcessed >= o.maxBatchSize {
break
}

if time.Since(startTime) > o.batchTimebox {
klog.Infof("Batch timebox exceeded, processed %d check capacity provisioning requests this iteration", provisioningRequestsProcessed)
break
}

unschedulablePods, err = (*o.provisioningRequestPodsInjector).GetPodsFromNextRequest(func(pr *provreqwrapper.ProvisioningRequest) bool {
return pr.Spec.ProvisioningClassName == v1.ProvisioningClassCheckCapacity
})
if err != nil {
st, _ := status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))

combinedStatus.Add(st)
}
}
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil

return combinedStatus.Export(), nil
}

// Assuming that all unschedulable pods comes from one ProvisioningRequest.
Expand All @@ -110,3 +169,109 @@ func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, p
}
return capacityAvailable, err
}

// combinedStatusSet is a helper struct to combine multiple ScaleUpStatuses into one. It keeps track of the best result and all errors that occurred during the ScaleUp process.
type combinedStatusSet struct {
Result status.ScaleUpResult
ScaleupErrors map[*errors.AutoscalerError]bool
}

// Add adds a ScaleUpStatus to the combinedStatusSet.
func (c *combinedStatusSet) Add(status *status.ScaleUpStatus) {
// This relies on the fact that the ScaleUpResult enum is ordered in a way that the higher the value, the worse the result. This way we can just take the minimum of the results. If new results are added, either the enum should be updated keeping the order, or a different approach should be used to combine the results.
if c.Result > status.Result {
c.Result = status.Result
}
if status.ScaleUpError != nil {
if _, found := c.ScaleupErrors[status.ScaleUpError]; !found {
c.ScaleupErrors[status.ScaleUpError] = true
}
}
}

// formatMessageFromBatchErrors formats a message from a list of errors.
func (c *combinedStatusSet) formatMessageFromBatchErrors(errs []errors.AutoscalerError, printErrorTypes bool) string {
firstErr := errs[0]
var builder strings.Builder
builder.WriteString(firstErr.Error())
builder.WriteString(" ...and other concurrent errors: [")
formattedErrs := map[errors.AutoscalerError]bool{
firstErr: true,
}
for _, err := range errs {
if _, has := formattedErrs[err]; has {
continue
}
formattedErrs[err] = true
var message string
if printErrorTypes {
message = fmt.Sprintf("[%s] %s", err.Type(), err.Error())
} else {
message = err.Error()
}
if len(formattedErrs) > 2 {
builder.WriteString(", ")
}
builder.WriteString(fmt.Sprintf("%q", message))
}
builder.WriteString("]")
return builder.String()
}

// combineBatchScaleUpErrors combines multiple errors into one. If there is only one error, it returns that error. If there are multiple errors, it combines them into one error with a message that contains all the errors.
func (c *combinedStatusSet) combineBatchScaleUpErrors() *errors.AutoscalerError {
if len(c.ScaleupErrors) == 0 {
return nil
}
if len(c.ScaleupErrors) == 1 {
for err := range c.ScaleupErrors {
return err
}
}
uniqueMessages := make(map[string]bool)
uniqueTypes := make(map[errors.AutoscalerErrorType]bool)
for err := range c.ScaleupErrors {
uniqueTypes[(*err).Type()] = true
uniqueMessages[(*err).Error()] = true
}
if len(uniqueTypes) == 1 && len(uniqueMessages) == 1 {
for err := range c.ScaleupErrors {
return err
}
}
// sort to stabilize the results and easier log aggregation
errs := make([]errors.AutoscalerError, 0, len(c.ScaleupErrors))
for err := range c.ScaleupErrors {
errs = append(errs, *err)
}
sort.Slice(errs, func(i, j int) bool {
errA := errs[i]
errB := errs[j]
if errA.Type() == errB.Type() {
return errs[i].Error() < errs[j].Error()
}
return errA.Type() < errB.Type()
})
firstErr := errs[0]
printErrorTypes := len(uniqueTypes) > 1
message := c.formatMessageFromBatchErrors(errs, printErrorTypes)
combinedErr := errors.NewAutoscalerError(firstErr.Type(), message)
return &combinedErr
}

// Export converts the combinedStatusSet into a ScaleUpStatus.
func (c *combinedStatusSet) Export() *status.ScaleUpStatus {
result := &status.ScaleUpStatus{Result: c.Result}
if len(c.ScaleupErrors) > 0 {
result.ScaleUpError = c.combineBatchScaleUpErrors()
}
return result
}

// NewCombinedStatusSet creates a new combinedStatusSet.
func NewCombinedStatusSet() combinedStatusSet {
return combinedStatusSet{
Result: status.ScaleUpNotTried,
ScaleupErrors: make(map[*errors.AutoscalerError]bool),
}
}
Loading
Loading