Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logging statements in recommender:checkpoint,input #7290

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func getVpasToCheckpoint(clusterVpas map[model.VpaID]*model.Vpa) []*model.Vpa {
vpas := make([]*model.Vpa, 0, len(clusterVpas))
for _, vpa := range clusterVpas {
if isFetchingHistory(vpa) {
klog.V(3).Infof("VPA %s is loading history, skipping checkpoints", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName))
klog.V(3).InfoS("VPA is loading history, skipping checkpoints", "vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName))
continue
}
vpas = append(vpas, vpa)
Expand Down Expand Up @@ -94,7 +94,7 @@ func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, now time.T
for container, aggregatedContainerState := range aggregateContainerStateMap {
containerCheckpoint, err := aggregatedContainerState.SaveToCheckpoint()
if err != nil {
klog.Errorf("Cannot serialize checkpoint for vpa %s/%s container %v. Reason: %+v", vpa.ID.Namespace, vpa.ID.VpaName, container, err)
klog.ErrorS(err, "Cannot serialize checkpoint", "vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName), "container", container)
continue
}
checkpointName := fmt.Sprintf("%s-%s", vpa.ID.VpaName, container)
Expand All @@ -108,11 +108,9 @@ func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, now time.T
}
err = api_util.CreateOrUpdateVpaCheckpoint(writer.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(vpa.ID.Namespace), &vpaCheckpoint)
if err != nil {
klog.Errorf("Cannot save VPA %s/%s checkpoint for %s. Reason: %+v",
vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName, vpaCheckpoint.Spec.ContainerName, err)
klog.ErrorS(err, "Cannot save VPA checkpoint", "namespace", vpa.ID.Namespace, "vpa", vpaCheckpoint.Spec.VPAObjectName, "container", vpaCheckpoint.Spec.ContainerName)
} else {
klog.V(3).Infof("Saved VPA %s/%s checkpoint for %s",
vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName, vpaCheckpoint.Spec.ContainerName)
klog.V(3).InfoS("Saved VPA checkpoint", "namespace", vpa.ID.Namespace, "vpa", vpaCheckpoint.Spec.VPAObjectName, "container", vpaCheckpoint.Spec.ContainerName)
vpa.CheckpointWritten = now
}
minCheckpoints--
Expand Down
66 changes: 33 additions & 33 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func WatchEvictionEventsWithRetries(kubeClient kube_client.Interface, observer o
watchEvictionEventsOnce := func() {
watchInterface, err := kubeClient.CoreV1().Events(namespace).Watch(context.TODO(), options)
if err != nil {
klog.Errorf("Cannot initialize watching events. Reason %v", err)
klog.ErrorS(err, "Cannot initialize watching events")
return
}
watchEvictionEvents(watchInterface.ResultChan(), observer)
Expand All @@ -128,7 +128,7 @@ func WatchEvictionEventsWithRetries(kubeClient kube_client.Interface, observer o
watchEvictionEventsOnce()
// Wait between attempts, retrying too often breaks API server.
waitTime := wait.Jitter(evictionWatchRetryWait, evictionWatchJitterFactor)
klog.V(1).Infof("An attempt to watch eviction events finished. Waiting %v before the next one.", waitTime)
klog.V(1).InfoS("An attempt to watch eviction events finished", "waiting", waitTime)
time.Sleep(waitTime)
}
}()
Expand All @@ -138,7 +138,7 @@ func watchEvictionEvents(evictedEventChan <-chan watch.Event, observer oom.Obser
for {
evictedEvent, ok := <-evictedEventChan
if !ok {
klog.V(3).Infof("Eviction event chan closed")
klog.V(3).InfoS("Eviction event chan closed")
return
}
if evictedEvent.Type == watch.Added {
Expand Down Expand Up @@ -199,13 +199,13 @@ type clusterStateFeeder struct {
}

func (feeder *clusterStateFeeder) InitFromHistoryProvider(historyProvider history.HistoryProvider) {
klog.V(3).Info("Initializing VPA from history provider")
klog.V(3).InfoS("Initializing VPA from history provider")
clusterHistory, err := historyProvider.GetClusterHistory()
if err != nil {
klog.Errorf("Cannot get cluster history: %v", err)
klog.ErrorS(err, "Cannot get cluster history")
}
for podID, podHistory := range clusterHistory {
klog.V(4).Infof("Adding pod %v with labels %v", podID, podHistory.LastLabels)
klog.V(4).InfoS("Adding pod with labels", "pod", podID, "labels", podHistory.LastLabels)
feeder.clusterState.AddOrUpdatePod(podID, podHistory.LastLabels, apiv1.PodUnknown)
for containerName, sampleList := range podHistory.Samples {
containerID := model.ContainerID{
Expand All @@ -215,7 +215,7 @@ func (feeder *clusterStateFeeder) InitFromHistoryProvider(historyProvider histor
if err = feeder.clusterState.AddOrUpdateContainer(containerID, nil); err != nil {
klog.Warningf("Failed to add container %+v. Reason: %+v", containerID, err)
}
klog.V(4).Infof("Adding %d samples for container %v", len(sampleList), containerID)
klog.V(4).InfoS("Adding samples for container", "samples", len(sampleList), "container", containerID)
for _, sample := range sampleList {
if err := feeder.clusterState.AddSample(
&model.ContainerUsageSampleWithKey{
Expand Down Expand Up @@ -246,7 +246,7 @@ func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.Vertica
}

func (feeder *clusterStateFeeder) InitFromCheckpoints() {
klog.V(3).Info("Initializing VPA from checkpoints")
klog.V(3).InfoS("Initializing VPA from checkpoints")
feeder.LoadVPAs(context.TODO())

namespaces := make(map[string]bool)
Expand All @@ -255,48 +255,48 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {
}

for namespace := range namespaces {
klog.V(3).Infof("Fetching checkpoints from namespace %s", namespace)
klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace)
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Cannot list VPA checkpoints from namespace %s. Reason: %+v", namespace, err)
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
}
for _, checkpoint := range checkpointList.Items {

klog.V(3).Infof("Loading VPA %s/%s checkpoint for %s", checkpoint.ObjectMeta.Namespace, checkpoint.Spec.VPAObjectName, checkpoint.Spec.ContainerName)
klog.V(3).InfoS("Loading VPA checkpoint", "namespace", checkpoint.ObjectMeta.Namespace, "VPA", checkpoint.Spec.VPAObjectName, "container", checkpoint.Spec.ContainerName)
err = feeder.setVpaCheckpoint(&checkpoint)
if err != nil {
klog.Errorf("Error while loading checkpoint. Reason: %+v", err)
klog.ErrorS(err, "Error while loading checkpoint")
}

}
}
}

func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
klog.V(3).Info("Starting garbage collection of checkpoints")
klog.V(3).InfoS("Starting garbage collection of checkpoints")
feeder.LoadVPAs(context.TODO())

namespaceList, err := feeder.coreClient.Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Cannot list namespaces. Reason: %+v", err)
klog.ErrorS(err, "Cannot list namespaces")
return
}

for _, namespaceItem := range namespaceList.Items {
namespace := namespaceItem.Name
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Cannot list VPA checkpoints from namespace %v. Reason: %+v", namespace, err)
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
}
for _, checkpoint := range checkpointList.Items {
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
_, exists := feeder.clusterState.Vpas[vpaID]
if !exists {
err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{})
if err == nil {
klog.V(3).Infof("Orphaned VPA checkpoint cleanup - deleting %v/%v.", namespace, checkpoint.Name)
klog.V(3).InfoS("Orphaned VPA checkpoint cleanup - deleting", "namespace", namespace, "checkpoint", checkpoint.Name)
} else {
klog.Errorf("Cannot delete VPA checkpoint %v/%v. Reason: %+v", namespace, checkpoint.Name, err)
klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "namespace", namespace, "checkpoint", checkpoint.Name)
}
}
}
Expand All @@ -318,27 +318,27 @@ func selectsRecommender(selectors []*vpa_types.VerticalPodAutoscalerRecommenderS

// Filter VPA objects whose specified recommender names are not default
func filterVPAs(feeder *clusterStateFeeder, allVpaCRDs []*vpa_types.VerticalPodAutoscaler) []*vpa_types.VerticalPodAutoscaler {
klog.V(3).Infof("Start selecting the vpaCRDs.")
klog.V(3).InfoS("Start selecting the vpaCRDs.")
var vpaCRDs []*vpa_types.VerticalPodAutoscaler
for _, vpaCRD := range allVpaCRDs {
if feeder.recommenderName == DefaultRecommenderName {
if !implicitDefaultRecommender(vpaCRD.Spec.Recommenders) && !selectsRecommender(vpaCRD.Spec.Recommenders, &feeder.recommenderName) {
klog.V(6).Infof("Ignoring vpaCRD %s as current recommender's name %v doesn't appear among its recommenders", klog.KObj(vpaCRD), feeder.recommenderName)
klog.V(6).InfoS("Ignoring vpaCRD as current current recommender's name doesn't appear among its recommenders", "vpaCRD", klog.KObj(vpaCRD), "recommenderName", feeder.recommenderName)
omerap12 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
} else {
if implicitDefaultRecommender(vpaCRD.Spec.Recommenders) {
klog.V(6).Infof("Ignoring vpaCRD %s as %v recommender doesn't process CRDs implicitly destined to %v recommender", klog.KObj(vpaCRD), feeder.recommenderName, DefaultRecommenderName)
klog.V(6).InfoS("Ignoring vpaCRD as recommender doesn't process CRDs implicitly destined to default recommender", "vpaCRD", klog.KObj(vpaCRD), "recommenderName", feeder.recommenderName, "defaultRecommenderName", DefaultRecommenderName)
continue
}
if !selectsRecommender(vpaCRD.Spec.Recommenders, &feeder.recommenderName) {
klog.V(6).Infof("Ignoring vpaCRD %s as current recommender's name %v doesn't appear among its recommenders", klog.KObj(vpaCRD), feeder.recommenderName)
klog.V(6).InfoS("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders", "vpaCRD", klog.KObj(vpaCRD), "recommenderName", feeder.recommenderName)
continue
}
}

if slices.Contains(feeder.ignoredNamespaces, vpaCRD.ObjectMeta.Namespace) {
klog.V(6).Infof("Ignoring vpaCRD %s in namespace %s as namespace is ignored", klog.KObj(vpaCRD), vpaCRD.Namespace)
klog.V(6).InfoS("Ignoring vpaCRD in namespace as namespace is ignored", "vpaCRD", klog.KObj(vpaCRD), "namespace", vpaCRD.Namespace)
continue
}

Expand All @@ -352,14 +352,14 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
// List VPA API objects.
allVpaCRDs, err := feeder.vpaLister.List(labels.Everything())
if err != nil {
klog.Errorf("Cannot list VPAs. Reason: %+v", err)
klog.ErrorS(err, "Cannot list VPAs")
return
}

// Filter out VPAs that specified recommenders with names not equal to "default"
vpaCRDs := filterVPAs(feeder, allVpaCRDs)

klog.V(3).Infof("Fetched %d VPAs.", len(vpaCRDs))
klog.V(3).InfoS("Fetching VPAs", "count", len(vpaCRDs))
// Add or update existing VPAs in the model.
vpaKeys := make(map[model.VpaID]bool)
for _, vpaCRD := range vpaCRDs {
Expand All @@ -369,7 +369,7 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
}

selector, conditions := feeder.getSelector(ctx, vpaCRD)
klog.V(4).Infof("Using selector %s for VPA %s", selector.String(), klog.KObj(vpaCRD))
klog.V(4).InfoS("Using selector", "selector", selector.String(), "vpa", klog.KObj(vpaCRD))

if feeder.clusterState.AddOrUpdateVpa(vpaCRD, selector) == nil {
// Successfully added VPA to the model.
Expand All @@ -387,9 +387,9 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
// Delete non-existent VPAs from the model.
for vpaID := range feeder.clusterState.Vpas {
if _, exists := vpaKeys[vpaID]; !exists {
klog.V(3).Infof("Deleting VPA %s", klog.KRef(vpaID.Namespace, vpaID.VpaName))
klog.V(3).InfoS("Deleting VPA", "vpa", klog.KRef(vpaID.Namespace, vpaID.VpaName))
if err := feeder.clusterState.DeleteVpa(vpaID); err != nil {
klog.Errorf("Deleting VPA %s failed: %v", klog.KRef(vpaID.Namespace, vpaID.VpaName), err)
klog.ErrorS(err, "Deleting VPA failed", "vpa", klog.KRef(vpaID.Namespace, vpaID.VpaName))
}
}
}
Expand All @@ -400,15 +400,15 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
func (feeder *clusterStateFeeder) LoadPods() {
podSpecs, err := feeder.specClient.GetPodSpecs()
if err != nil {
klog.Errorf("Cannot get SimplePodSpecs. Reason: %+v", err)
klog.ErrorS(err, "Cannot get SimplePodSpecs")
}
pods := make(map[model.PodID]*spec.BasicPodSpec)
for _, spec := range podSpecs {
pods[spec.ID] = spec
}
for key := range feeder.clusterState.Pods {
if _, exists := pods[key]; !exists {
klog.V(3).Infof("Deleting Pod %s", klog.KRef(key.Namespace, key.PodName))
klog.V(3).InfoS("Deleting Pod", "pod", klog.KRef(key.Namespace, key.PodName))
feeder.clusterState.DeletePod(key)
}
}
Expand All @@ -428,7 +428,7 @@ func (feeder *clusterStateFeeder) LoadPods() {
func (feeder *clusterStateFeeder) LoadRealTimeMetrics() {
containersMetrics, err := feeder.metricsClient.GetContainersMetrics()
if err != nil {
klog.Errorf("Cannot get ContainerMetricsSnapshot from MetricsClient. Reason: %+v", err)
klog.ErrorS(err, "Cannot get ContainerMetricsSnapshot from MetricsClient")
}

sampleCount := 0
Expand All @@ -447,12 +447,12 @@ func (feeder *clusterStateFeeder) LoadRealTimeMetrics() {
}
}
}
klog.V(3).Infof("ClusterSpec fed with #%v ContainerUsageSamples for #%v containers. Dropped #%v samples.", sampleCount, len(containersMetrics), droppedSampleCount)
klog.V(3).InfoS("ClusterSpec fed with ContainerUsageSamples", "sampleCount", sampleCount, "containerCount", len(containersMetrics), "droppedSampleCount", droppedSampleCount)
Loop:
for {
select {
case oomInfo := <-feeder.oomChan:
klog.V(3).Infof("OOM detected %+v", oomInfo)
klog.V(3).InfoS("OOM detected", "oomInfo", oomInfo)
if err = feeder.clusterState.RecordOOM(oomInfo.ContainerID, oomInfo.Timestamp, oomInfo.Memory); err != nil {
klog.Warningf("Failed to record OOM %+v. Reason: %+v", oomInfo, err)
}
Expand Down Expand Up @@ -539,7 +539,7 @@ func (feeder *clusterStateFeeder) getSelector(ctx context.Context, vpa *vpa_type
}
msg := "Cannot read targetRef"
if fetchErr != nil {
klog.Errorf("Cannot get target selector from VPA's targetRef. Reason: %+v", fetchErr)
klog.ErrorS(fetchErr, "Cannot get target selector from VPA's targetRef")
msg = fmt.Sprintf("Cannot read targetRef. Reason: %s", fetchErr.Error())
}
return labels.Nothing(), []condition{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,14 @@ func (p *prometheusHistoryProvider) GetClusterHistory() (map[model.PodID]*PodHis
podSelector = fmt.Sprintf("%s, %s=\"%s\"", podSelector, p.config.CtrNamespaceLabel, p.config.Namespace)
}
historicalCpuQuery := fmt.Sprintf("rate(container_cpu_usage_seconds_total{%s}[%s])", podSelector, p.config.HistoryResolution)
klog.V(4).Infof("Historical CPU usage query used: %s", historicalCpuQuery)
klog.V(4).InfoS("Historical CPU usage query", "query", historicalCpuQuery)
err := p.readResourceHistory(res, historicalCpuQuery, model.ResourceCPU)
if err != nil {
return nil, fmt.Errorf("cannot get usage history: %v", err)
}

historicalMemoryQuery := fmt.Sprintf("container_memory_working_set_bytes{%s}", podSelector)
klog.V(4).Infof("Historical memory usage query used: %s", historicalMemoryQuery)
klog.V(4).InfoS("Historical memory usage query", "query", historicalMemoryQuery)
err = p.readResourceHistory(res, historicalMemoryQuery, model.ResourceMemory)
if err != nil {
return nil, fmt.Errorf("cannot get usage history: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func (s *externalMetricsClient) List(ctx context.Context, namespace string, opts
return nil, err
}
if m == nil || len(m.Items) == 0 {
klog.V(4).Infof("External Metrics Query for VPA %s: resource %+v, metric %+v, No items,", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName), resourceName, metricName)
klog.V(4).InfoS("External Metrics Query for VPA: No items","vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName),"resource", resourceName,"metric", metricName)
omerap12 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
klog.V(4).Infof("External Metrics Query for VPA %s: resource %+v, metric %+v, %d items, item[0]: %+v", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName), resourceName, metricName, len(m.Items), m.Items[0])
klog.V(4).InfoS("External Metrics Query for VPA","vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName),"resource", resourceName,"metric", metricName,"itemCount", len(m.Items),"firstItem", m.Items[0])
omerap12 marked this conversation as resolved.
Show resolved Hide resolved
podMets.Timestamp = m.Items[0].Timestamp
if m.Items[0].WindowSeconds != nil {
podMets.Window = v1.Duration{Duration: time.Duration(*m.Items[0].WindowSeconds) * time.Second}
Expand Down
8 changes: 4 additions & 4 deletions vertical-pod-autoscaler/pkg/recommender/input/oom/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func parseEvictionEvent(event *apiv1.Event) []OomInfo {
}
memory, err := resource.ParseQuantity(offendingContainersUsage[i])
if err != nil {
klog.Errorf("Cannot parse resource quantity in eviction event %v. Error: %v", offendingContainersUsage[i], err)
klog.ErrorS(err, "Cannot parse resource quantity in eviction", "event", offendingContainersUsage[i])
continue
}
oomInfo := OomInfo{
Expand All @@ -107,7 +107,7 @@ func parseEvictionEvent(event *apiv1.Event) []OomInfo {

// OnEvent inspects k8s eviction events and translates them to OomInfo.
func (o *observer) OnEvent(event *apiv1.Event) {
klog.V(1).Infof("OOM Observer processing event: %+v", event)
klog.V(1).InfoS("OOM Observer processing event", "event", event)
for _, oomInfo := range parseEvictionEvent(event) {
o.observedOomsChannel <- oomInfo
}
Expand Down Expand Up @@ -139,11 +139,11 @@ func (o *observer) OnAdd(obj interface{}, isInInitialList bool) {}
func (o *observer) OnUpdate(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*apiv1.Pod)
if !ok {
klog.Errorf("OOM observer received invalid oldObj: %v", oldObj)
klog.ErrorS(nil, "OOM observer received invalid oldObj", "oldObj", oldObj)
}
newPod, ok := newObj.(*apiv1.Pod)
if !ok {
klog.Errorf("OOM observer received invalid newObj: %v", newObj)
klog.ErrorS(nil, "OOM observer received invalid newObj", "newObj", newObj)
}

for _, containerStatus := range newPod.Status.ContainerStatuses {
Expand Down
Loading