Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline executions/orca) : Manual Judgement Navigation Enhancement Backend API Implementation #4132

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public interface StageExecution {

void setContext(@Nonnull Map<String, Object> context);

@Nonnull
Map<String, Object> getOthers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a very strangely named method, with no comments on what it's used for.


void setOthers(@Nonnull Map<String, Object> others);

/** TODO(rz): getOutputs(Class)? */
@Nonnull
Map<String, Object> getOutputs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ public void setOutputs(@Nonnull Map<String, Object> outputs) {
this.outputs = outputs;
}

private Map<String, Object> others = new HashMap<>();

public @Nonnull Map<String, Object> getOthers() {
return others;
}

public void setOthers(@Nonnull Map<String, Object> others) {
this.others = others;
}

/**
* Returns the tasks that are associated with this stage. Tasks are the most granular unit of work
* in a stage. Because tasks can be dynamically composed, this list is open updated during a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class DualExecutionRepository(
select(stage.execution).updateStageContext(stage)
}

override fun updateStageOthers(stage: StageExecution) {
select(stage.execution).updateStageOthers(stage)
}

override fun deleteStageOthers(stage: StageExecution) {
select(stage.execution).deleteStageOthers(stage)
}

override fun removeStage(execution: PipelineExecution, stageId: String) {
select(execution).removeStage(execution, stageId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public interface ExecutionRepository {

void updateStageContext(@Nonnull StageExecution stage);

void updateStageOthers(@Nonnull StageExecution stage);

void deleteStageOthers(@Nonnull StageExecution stage);

void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId);

void addStage(@Nonnull StageExecution stage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ class InMemoryExecutionRepository : ExecutionRepository {
// Do nothing
}

override fun updateStageOthers(stage: StageExecution) {
// Do nothing
}

override fun deleteStageOthers(stage: StageExecution) {
// Do nothing
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
return retrieveByCorrelationId(PIPELINE, correlationId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.api.pipeline.TaskResult
import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization
import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import org.springframework.beans.factory.annotation.Value

import javax.annotation.Nonnull
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -73,14 +75,20 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage
final long backoffPeriod = 15000
final long timeout = TimeUnit.DAYS.toMillis(3)

@Value('${spinnaker.manual-judgment-navigation:false}')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there's a bit of prior art under the stages prefix.

Perhaps, stages.manual-judgment.navigation.

boolean manualJudgmentNavigation

private final EchoService echoService
private final ManualJudgmentAuthorization manualJudgmentAuthorization
private final ExecutionRepository executionRepository

@Autowired
WaitForManualJudgmentTask(Optional<EchoService> echoService,
ManualJudgmentAuthorization manualJudgmentAuthorization) {
ManualJudgmentAuthorization manualJudgmentAuthorization,
ExecutionRepository executionRepository) {
this.echoService = echoService.orElse(null)
this.manualJudgmentAuthorization = manualJudgmentAuthorization
this.executionRepository = executionRepository
}

@Override
Expand All @@ -89,14 +97,24 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage
String notificationState
ExecutionStatus executionStatus

if (manualJudgmentNavigation) {
checkForAnyParentExecutions(stage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of this method implies no side effects, I'd suggest a change of name.

}

switch (stageData.state) {
case StageData.State.CONTINUE:
notificationState = "manualJudgmentContinue"
executionStatus = ExecutionStatus.SUCCEEDED
if (manualJudgmentNavigation) {
deleteLeafnodeAttributesFromTheParentExecutions(stage)
}
break
case StageData.State.STOP:
notificationState = "manualJudgmentStop"
executionStatus = ExecutionStatus.TERMINAL
if (manualJudgmentNavigation) {
deleteLeafnodeAttributesFromTheParentExecutions(stage)
}
break
default:
notificationState = "manualJudgment"
Expand All @@ -120,6 +138,69 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage
return TaskResult.builder(executionStatus).context(outputs).build()
}

/**
* This method checks if this manual judgment stage is triggered by any other pipeline(parent execution).
* If yes, it fetches all the parent executions, which triggered this stage and sets the current
* running stage(manual judgment stage execution id and application name) to leafnode execution id and
* application name.
*
* p1 --> p2 --> p3 --> p4 (running manual judgment stage & waiting for judgment)
*
* p1 leafnodeExecutionId : p4 execution id
* p1 leafnodeApplicationName : p4 application name
*
* p2 leafnodeExecutionId : p4 execution id
* p2 leafnodeApplicationName : p4 application name
*
* p3 leafnodeExecutionId : p4 execution id
* p3 leafnodeApplicationName : p4 application name
*
* @param stage
*/
void checkForAnyParentExecutions(StageExecution stage) {

def status = stage?.execution?.status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what situation is stage null?

def trigger = stage?.execution?.trigger
def appName = stage?.execution?.application
def executionId = stage?.execution?.id
def stageId = stage?.execution?.id
while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) {
PipelineExecution parentExecution = trigger?.parentExecution
parentExecution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id)
parentExecution.getStages().each {
if (("pipeline").equals(it.getType()) && (ExecutionStatus.RUNNING.equals(it.getStatus()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably a constant for pipeline somewhere in the PipelineStage?

if (it.context && stageId.equals(it.context.executionId)) {
def others = [leafnodePipelineExecutionId: executionId, leafnodeApplicationName: appName]
it.setOthers(others)
stageId = it.execution.getId()
executionRepository.updateStageOthers(it)
}
}
}
trigger = parentExecution?.trigger
}
}

/**
* This method deletes the leafnode attributes from all the parent stage executions.
* @param stage
*/
void deleteLeafnodeAttributesFromTheParentExecutions(StageExecution stage) {

def status = stage?.execution?.status
def trigger = stage?.execution?.trigger
while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) {
PipelineExecution parentExecution = trigger?.parentExecution
PipelineExecution execution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id)
execution.getStages().each {
if (ExecutionStatus.RUNNING.equals(it.getStatus())) {
executionRepository.deleteStageOthers(it)
}
}
trigger = parentExecution?.trigger
}
}

Map processNotifications(StageExecution stage, StageData stageData, String notificationState) {
if (echoService) {
// sendNotifications will be true if using the new scheme for configuration notifications.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import com.netflix.spinnaker.orca.echo.EchoService
import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization
import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl
import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import spock.lang.Specification
import spock.lang.Unroll
import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.Notification
import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.WaitForManualJudgmentTask

class ManualJudgmentStageSpec extends Specification {
EchoService echoService = Mock(EchoService)
ExecutionRepository executionRepository = Mock(ExecutionRepository)

FiatPermissionEvaluator fiatPermissionEvaluator = Mock(FiatPermissionEvaluator)

Expand All @@ -48,7 +50,7 @@ class ManualJudgmentStageSpec extends Specification {
@Unroll
void "should return execution status based on judgmentStatus"() {
given:
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization)
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository)

when:
def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context))
Expand All @@ -73,7 +75,7 @@ class ManualJudgmentStageSpec extends Specification {
new UserPermission().addResources([new Role('foo')]).setAdmin(isAdmin).view
}

def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization)
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository)

when:
def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context)
Expand All @@ -97,7 +99,7 @@ class ManualJudgmentStageSpec extends Specification {

void "should only send notifications for supported types"() {
given:
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization)
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository)

when:
def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [notifications: [
Expand All @@ -118,7 +120,7 @@ class ManualJudgmentStageSpec extends Specification {
@Unroll
void "if deprecated notification configuration is in use, only send notifications for awaiting judgment state"() {
given:
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization)
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository)

when:
def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [
Expand Down Expand Up @@ -199,7 +201,7 @@ class ManualJudgmentStageSpec extends Specification {
@Unroll
void "should retain unknown fields in the notification context"() {
given:
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization)
def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository)

def slackNotification = new Notification(type: "slack")
slackNotification.setOther("customMessage", "hello slack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,44 @@ public void updateStageContext(@Nonnull StageExecution stage) {
});
}

@Override
public void updateStageOthers(@Nonnull StageExecution stage) {
RedisClientDelegate delegate = getRedisDelegate(stage);
String key = executionKey(stage);
String contextKey = format("stage.%s.others", stage.getId());
delegate.withCommandsClient(
c -> {
try {
c.hset(key, contextKey, mapper.writeValueAsString(stage.getOthers()));
} catch (JsonProcessingException e) {
throw new StageSerializationException(
format(
"Failed serializing stage, executionId: %s, stageId: %s",
stage.getExecution().getId(), stage.getId()),
e);
}
});
}

@Override
public void deleteStageOthers(@Nonnull StageExecution stage) {
RedisClientDelegate delegate = getRedisDelegate(stage);
String key = executionKey(stage);
String contextKey = format("stage.%s.others", stage.getId());
delegate.withCommandsClient(
c -> {
try {
c.hdel(key, contextKey, mapper.writeValueAsString(stage.getOthers()));
} catch (JsonProcessingException e) {
throw new StageSerializationException(
format(
"Failed serializing stage, executionId: %s, stageId: %s",
stage.getExecution().getId(), stage.getId()),
e);
}
});
}

@Override
public void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId) {
RedisClientDelegate delegate = getRedisDelegate(execution);
Expand Down Expand Up @@ -936,6 +974,11 @@ protected PipelineExecution buildExecution(
} else {
stage.setOutputs(emptyMap());
}
if (map.get(prefix + "others") != null) {
stage.setOthers(mapper.readValue(map.get(prefix + "others"), MAP_STRING_TO_OBJECT));
} else {
stage.setOthers(emptyMap());
}
if (map.get(prefix + "tasks") != null) {
stage.setTasks(mapper.readValue(map.get(prefix + "tasks"), LIST_OF_TASKS));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ class SqlExecutionRepository(
storeStage(stage)
}

override fun updateStageOthers(stage: StageExecution) {
storeStage(stage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just boiling down to saving the entire stage ... why bother introducing "others" (which is likely going to be confusing) instead of just storing in the stage relevant stage context.

}

override fun deleteStageOthers(stage: StageExecution) {
val others = mapOf<String, Object>()
stage.others = others;
storeStage(stage)
}

override fun removeStage(execution: PipelineExecution, stageId: String) {
validateHandledPartitionOrThrow(execution)

Expand Down