Skip to content

Commit

Permalink
Add support for frequent loops when provisioningrequest is encountere…
Browse files Browse the repository at this point in the history
…d in last iteration
  • Loading branch information
Duke0404 committed Sep 12, 2024
1 parent 2800c70 commit 2f629ac
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
11 changes: 9 additions & 2 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,25 @@ type scalingTimesGetter interface {
LastScaleDownDeleteTime() time.Time
}

type provisioningRequestProcessTimeGetter interface {
LastProvisioningRequestProcessedTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration, provisioningRequestProcessTimeGetter provisioningRequestProcessTimeGetter) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter,
}
}

Expand All @@ -67,7 +73,8 @@ func (t *LoopTrigger) Wait(lastRun time.Time) {
// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) ||
!t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessedTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
Expand Down
29 changes: 16 additions & 13 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,12 +464,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}

func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()

autoscalingOptions.KubeClientOpts.KubeClientBurst = int(*kubeClientBurst)
autoscalingOptions.KubeClientOpts.KubeClientQPS = float32(*kubeClientQPS)
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, autoscalingOptions config.AutoscalingOptions, restConfig *rest.Config, injector pods.PodListProcessor) (core.Autoscaler, error) {
kubeClient := kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)

// Informer transform to trim ManagedFields for memory efficiency.
Expand Down Expand Up @@ -507,7 +502,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
Expand All @@ -524,10 +518,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}
Expand Down Expand Up @@ -608,7 +598,20 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)

autoscaler, err := buildAutoscaler(debuggingSnapshotter)
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()

autoscalingOptions.KubeClientOpts.KubeClientBurst = int(*kubeClientBurst)
autoscalingOptions.KubeClientOpts.KubeClientQPS = float32(*kubeClientQPS)

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)

injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
klog.Fatalf("Failed to create provisioning request pods injector: %v", err)
}

autoscaler, err := buildAutoscaler(debuggingSnapshotter, autoscalingOptions, restConfig, injector)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
Expand All @@ -629,7 +632,7 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval, injector)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
Expand Down
16 changes: 12 additions & 4 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
// "k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand All @@ -41,8 +41,9 @@ const (

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
lastProvisioningRequestProcessedTime time.Time
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand All @@ -68,6 +69,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
return err
}
p.lastProvisioningRequestProcessedTime = p.clock.Now()
return nil
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err := p.MarkAsAccepted(pr); err != nil {
continue
}

return podsFromProvReq, nil
}
return nil, nil
Expand Down Expand Up @@ -136,10 +139,15 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}

// LastProvisioningRequestProcessedTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessedTime() time.Time {
return p.lastProvisioningRequestProcessedTime
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now), now}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down

0 comments on commit 2f629ac

Please sign in to comment.