Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for frequent loops when provisioningrequest is encountered in last iteration #7271

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package loop

import (
"context"
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand All @@ -43,29 +45,50 @@ type scalingTimesGetter interface {
LastScaleDownDeleteTime() time.Time
}

// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the
// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that
// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration
// should be started immediately.
type provisioningRequestProcessingTimesGetter interface {
LastProvisioningRequestProcessTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
initialized bool
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
podObserver: nil,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter,
}
}

// Initialize initializes the LoopTrigger object by providing a poiumer to the UnschedulablePodObserver
func (t *LoopTrigger) Initialize(podObserver *UnschedulablePodObserver) {
t.podObserver = podObserver
t.initialized = true
}

// Wait waits for the next autoscaling iteration
func (t *LoopTrigger) Wait(lastRun time.Time) {
func (t *LoopTrigger) Wait(lastRun time.Time) errors.AutoscalerError {
sleepStart := time.Now()
defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart)

// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.initialized {
return errors.ToAutoscalerError(errors.InternalError, fmt.Errorf("LoopTrigger not initialized"))
}

if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
select {
Expand All @@ -74,16 +97,23 @@ func (t *LoopTrigger) Wait(lastRun time.Time) {
default:
klog.Infof("Autoscaler loop triggered immediately after a productive iteration")
}
return
}

// Unschedulable pod triggers autoscaling immediately.
select {
case <-time.After(t.scanInterval):
klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval)
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
} else if t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof("Autoscaler loop triggered immediately after a provisioning request was processed")
}
} else {
// Unschedulable pod triggers autoscaling immediately.
select {
case <-time.After(t.scanInterval):
klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval)
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
}
}
return nil
}

// UnschedulablePodObserver triggers a new loop if there are new unschedulable pods
Expand Down
30 changes: 19 additions & 11 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}

func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) {
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()

Expand All @@ -484,7 +484,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig)
if err != nil {
return nil, err
return nil, nil, err
}
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
Expand All @@ -505,13 +505,14 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere)

var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
return nil, nil, err
}
provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
Expand All @@ -522,11 +523,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
return nil, nil, err
}
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(ProvisioningRequestInjector)
podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down Expand Up @@ -591,21 +592,26 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
// Create autoscaler.
autoscaler, err := core.NewAutoscaler(opts, informerFactory)
if err != nil {
return nil, err
return nil, nil, err
}

// Start informers. This must come after fully constructing the autoscaler because
// additional informers might have been registered in the factory during NewAutoscaler.
stop := make(chan struct{})
informerFactory.Start(stop)

return autoscaler, nil
// A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a
// ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods
// also marks the ProvisioningRequest as accepted or failed.
trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, *scanInterval)

return autoscaler, trigger, nil
}

func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)

autoscaler, err := buildAutoscaler(debuggingSnapshotter)
autoscaler, trigger, err := buildAutoscaler(debuggingSnapshotter)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
Expand All @@ -626,10 +632,12 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
trigger.Initialize(podObserver)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
if err := trigger.Wait(lastRun); err != nil {
klog.Fatalf("Failed to wait for next loop: %v", err)
}
lastRun = time.Now()
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
}
Expand Down
16 changes: 12 additions & 4 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand All @@ -41,8 +40,9 @@ const (

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
lastProvisioningRequestProcessTime time.Time
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand All @@ -68,6 +68,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
return err
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
return nil
}

Expand All @@ -77,6 +78,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
}

// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
Expand Down Expand Up @@ -109,6 +111,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err := p.MarkAsAccepted(pr); err != nil {
continue
}

return podsFromProvReq, nil
}
return nil, nil
Expand Down Expand Up @@ -136,10 +139,15 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}

// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time {
return p.lastProvisioningRequestProcessTime
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), now}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down
Loading