diff --git a/cluster-autoscaler/loop/trigger.go b/cluster-autoscaler/loop/trigger.go index 52ef962ba1b..5cfaa43fbcb 100644 --- a/cluster-autoscaler/loop/trigger.go +++ b/cluster-autoscaler/loop/trigger.go @@ -18,11 +18,13 @@ package loop import ( "context" + "fmt" "time" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -43,29 +45,50 @@ type scalingTimesGetter interface { LastScaleDownDeleteTime() time.Time } +// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the +// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that +// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration +// should be started immediately. +type provisioningRequestProcessingTimesGetter interface { + LastProvisioningRequestProcessTime() time.Time +} + // LoopTrigger object implements criteria used to start new autoscaling iteration type LoopTrigger struct { - podObserver *UnschedulablePodObserver - scanInterval time.Duration - scalingTimesGetter scalingTimesGetter + initialized bool + podObserver *UnschedulablePodObserver + scanInterval time.Duration + scalingTimesGetter scalingTimesGetter + provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter } // NewLoopTrigger creates a LoopTrigger object -func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger { +func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, scanInterval time.Duration) *LoopTrigger { return &LoopTrigger{ - podObserver: podObserver, - scanInterval: scanInterval, - scalingTimesGetter: scalingTimesGetter, + podObserver: nil, + scanInterval: scanInterval, + scalingTimesGetter: scalingTimesGetter, + provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter, } } +// Initialize initializes the LoopTrigger object by providing a poiumer to the UnschedulablePodObserver +func (t *LoopTrigger) Initialize(podObserver *UnschedulablePodObserver) { + t.podObserver = podObserver + t.initialized = true +} + // Wait waits for the next autoscaling iteration -func (t *LoopTrigger) Wait(lastRun time.Time) { +func (t *LoopTrigger) Wait(lastRun time.Time) errors.AutoscalerError { sleepStart := time.Now() defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart) // To improve scale-up throughput, Cluster Autoscaler starts new iteration // immediately if the previous one was productive. + if !t.initialized { + return errors.ToAutoscalerError(errors.InternalError, fmt.Errorf("LoopTrigger not initialized")) + } + if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) || !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { select { @@ -74,16 +97,23 @@ func (t *LoopTrigger) Wait(lastRun time.Time) { default: klog.Infof("Autoscaler loop triggered immediately after a productive iteration") } - return - } - - // Unschedulable pod triggers autoscaling immediately. - select { - case <-time.After(t.scanInterval): - klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval) - case <-t.podObserver.unschedulablePodChan: - klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + } else if t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun) { + select { + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + default: + klog.Infof("Autoscaler loop triggered immediately after a provisioning request was processed") + } + } else { + // Unschedulable pod triggers autoscaling immediately. + select { + case <-time.After(t.scanInterval): + klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval) + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + } } + return nil } // UnschedulablePodObserver triggers a new loop if there are new unschedulable pods diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index dfcd58fa8d7..62cda670b0a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -465,7 +465,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) { +func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) { // Create basic config from flags. autoscalingOptions := createAutoscalingOptions() @@ -484,7 +484,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig) if err != nil { - return nil, err + return nil, nil, err } deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) @@ -505,13 +505,14 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere) + var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) client, err := provreqclient.NewProvisioningRequestClient(restConfig) if err != nil { - return nil, err + return nil, nil, err } provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ checkcapacity.New(client), @@ -522,11 +523,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.ScaleUpOrchestrator = scaleUpOrchestrator provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker) opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) + ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig) if err != nil { - return nil, err + return nil, nil, err } - podListProcessor.AddProcessor(injector) + podListProcessor.AddProcessor(ProvisioningRequestInjector) podListProcessor.AddProcessor(provreqProcesor) } @@ -591,7 +592,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter // Create autoscaler. autoscaler, err := core.NewAutoscaler(opts, informerFactory) if err != nil { - return nil, err + return nil, nil, err } // Start informers. This must come after fully constructing the autoscaler because @@ -599,13 +600,18 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter stop := make(chan struct{}) informerFactory.Start(stop) - return autoscaler, nil + // A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a + // ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods + // also marks the ProvisioningRequest as accepted or failed. + trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, *scanInterval) + + return autoscaler, trigger, nil } func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { metrics.RegisterAll(*emitPerNodeGroupMetrics) - autoscaler, err := buildAutoscaler(debuggingSnapshotter) + autoscaler, trigger, err := buildAutoscaler(debuggingSnapshotter) if err != nil { klog.Fatalf("Failed to create autoscaler: %v", err) } @@ -626,10 +632,12 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho defer cancel() if *frequentLoopsEnabled { podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts)) - trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval) + trigger.Initialize(podObserver) lastRun := time.Now() for { - trigger.Wait(lastRun) + if err := trigger.Wait(lastRun); err != nil { + klog.Fatalf("Failed to wait for next loop: %v", err) + } lastRun = time.Now() loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun) } diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 538563d24cb..35977247dc4 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -41,8 +40,9 @@ const ( // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client *provreqclient.ProvisioningRequestClient - clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient + clock clock.PassiveClock + lastProvisioningRequestProcessTime time.Time } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -68,6 +68,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) return err } + p.lastProvisioningRequestProcessTime = p.clock.Now() return nil } @@ -77,6 +78,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) } + p.lastProvisioningRequestProcessTime = p.clock.Now() } // GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it. @@ -109,6 +111,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( if err := p.MarkAsAccepted(pr); err != nil { continue } + return podsFromProvReq, nil } return nil, nil @@ -136,10 +139,15 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningRequestPodsInjector, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err } return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil } + +// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed. +func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time { + return p.lastProvisioningRequestProcessTime +} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 533c2b979bd..4a9a0d0e096 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -124,7 +124,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { } for _, tc := range testCases { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)} + injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), now} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)