Skip to content

Commit

Permalink
fix(queue): Manual Judgment propagation (#4474)
Browse files Browse the repository at this point in the history
Manual Judgment was always propagated if the MJ stage was the first one in the pipeline, regardless of the value of `propagateAuthenticationContext`.

The code for backtracking through stages to find an `AuthenticatedStage` provider would improperly return any stage that did not have an ancestor, ignoring criteria for selecting a stage that should be propagating authentication.

Additionally, code in `AuthenticationAware` would mutate a `SKIPPED` MJ stage, copying the `LastModifiedDetails` value from the "real" MJ stage ancestor (or the root stage, as described above).  This mutation is not necessary, and `AuthenticationAware` can just ignore it as it traverses ancestors of the current stage.
  • Loading branch information
jcavanagh committed Jun 20, 2023
1 parent 7a0b18d commit ba9ac0d
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,16 @@ interface AuthenticationAware {
fun retrieveAuthenticatedUser(stage: StageExecution) : PipelineExecution.AuthenticationDetails? {
return stageNavigator
.ancestors(stage)
.firstOrNull { it.stageBuilder is AuthenticatedStage } ?.let{
(it.stageBuilder as AuthenticatedStage).authenticatedUser(solveAuthStages(it.stage)).orElse(null)
.firstOrNull {
it.stageBuilder is AuthenticatedStage &&
it.stage.isManualJudgmentType &&
!it.stage.status.isSkipped &&
it.stage.withPropagateAuthentication()
}?.let{
val authStage = it.stage
return if(authStage != null) {
(it.stageBuilder as AuthenticatedStage).authenticatedUser(authStage).orElse(null)
} else null
}
}


// When a first valid candidate is found in the ancestors chain is returned
// until the ancestor chain was iterated completely at the pipeline beginning
fun backtrackStages(stage: StageExecution): StageExecution {
if (stage.isManualJudgmentType &&
!stage.status.isSkipped &&
stage.withPropagateAuthentication()) {
return stage;
}
val previousStage = if (stageNavigator.ancestors(stage).size > 1) stageNavigator.ancestors(stage).get(1).stage else null
return if (previousStage == null) stage else backtrackStages(previousStage)
}

//Next method will look by a possible stage with authentication propagated in case that previous
//stage brokes the auth chain, iterating the stage ancestors. By the moment only MJ stages approved with
//auth propagated are considerated as candidates
fun solveAuthStages(stage: StageExecution): StageExecution {
if (stage.isManualJudgmentType()) {
val result = backtrackStages(stage)
stage.lastModified = result.lastModified
return result
}
return stage
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.DefaultStageResolver
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution.LastModifiedDetails
import com.netflix.spinnaker.orca.api.test.pipeline
import com.netflix.spinnaker.orca.api.test.stage
import com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage
import com.netflix.spinnaker.orca.pipeline.WaitStage
import com.netflix.spinnaker.orca.pipeline.util.StageNavigator
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek
import org.junit.jupiter.api.Assertions.assertEquals
import org.springframework.beans.factory.ObjectProvider
import java.time.Instant
import java.util.*

class AuthenticationAwareTest : SubjectSpek<AuthenticationAware> ({
val sdbProvider : ObjectProvider<Collection<StageDefinitionBuilder>> = mock()
whenever(sdbProvider.getIfAvailable(any())).thenReturn(listOf(
ManualJudgmentStage(),
WaitStage()
))
val stageNavigator = StageNavigator(DefaultStageResolver(sdbProvider))

subject(CachingMode.GROUP) {
object : AuthenticationAware {
override val stageNavigator: StageNavigator
get() = stageNavigator
}
}

// Arg defines which named stages get which propagation setting:
// {
// mj1: true,
// mj2: false,
// mj3: null,
// ...
// }
fun makePipeline(propagateConfig: Map<String, Boolean?>) : PipelineExecution {
// Set up a base context to be copied around
val ctx : MutableMap<String, Any> = mutableMapOf()
ctx["judgmentStatus"] = "continue"

// Creates a pipeline that looks like this
// MJ 1 - Wait 1 - MJ 2 - Wait 2
// \ MJ 3 - Wait 3 - MJ 4 (skipped manually) - Wait 4
//
// Covering the following important cases:
// - A root stage being MJ
// - Multiple MJs in a row
// - Propagating or not
// - Different users approving different MJ stages
//
// All MJ stages are approved by "propagateUserX" where X is the MJ stage number above
// Except for MJ 4, which is skipped manually and always has no lastModifiedDetails
return pipeline {
stage {
refId = "1"
name = "mj1"
type = "manualJudgment"
status = ExecutionStatus.SUCCEEDED
context = ctx.toMutableMap().also {
it["lastModifiedBy"] = "propagatedUser1"
if(propagateConfig["mj1"] != null) {
it["propagateAuthenticationContext"] = propagateConfig["mj1"]!!
}
}
lastModified = LastModifiedDetails().also {
it.user = "propagatedUser1"
it.lastModifiedTime = Instant.now().toEpochMilli()
}
}
stage {
refId = "2"
name = "wait1"
type = "wait"
status = ExecutionStatus.SUCCEEDED
requisiteStageRefIds = listOf("1")
}
stage {
refId = "3"
name = "mj2"
type = "manualJudgment"
status = ExecutionStatus.SUCCEEDED
requisiteStageRefIds = listOf("2")
context = ctx.toMutableMap().also {
it["lastModifiedBy"] = "propagatedUser2"
if(propagateConfig["mj2"] != null) {
it["propagateAuthenticationContext"] = propagateConfig["mj2"]!!
}
}
lastModified = LastModifiedDetails().also {
it.user = "propagatedUser2"
it.lastModifiedTime = Instant.now().toEpochMilli()
}
}
stage {
refId = "4"
name = "wait2"
type = "wait"
requisiteStageRefIds = listOf("3")
}
stage {
refId = "5"
name = "mj3"
type = "manualJudgment"
status = ExecutionStatus.SUCCEEDED
requisiteStageRefIds = listOf("1")
context = ctx.toMutableMap().also {
it["lastModifiedBy"] = "propagatedUser3"
if(propagateConfig["mj3"] != null) {
it["propagateAuthenticationContext"] = propagateConfig["mj3"]!!
}
}
lastModified = LastModifiedDetails().also {
it.user = "propagatedUser3"
it.lastModifiedTime = Instant.now().toEpochMilli()
}
}
stage {
refId = "6"
name = "wait3"
type = "wait"
status = ExecutionStatus.SUCCEEDED
requisiteStageRefIds = listOf("5")
}
stage {
refId = "7"
name = "mj4"
type = "manualJudgment"
status = ExecutionStatus.SKIPPED
requisiteStageRefIds = listOf("6")
context = ctx.toMutableMap().also {
ctx["manualSkip"] = true
}
lastModified = null
}
stage {
refId = "8"
name = "wait4"
type = "wait"
status = ExecutionStatus.SUCCEEDED
requisiteStageRefIds = listOf("7")
}
}
}

describe("Propagates authentication from manual judgment stages if configured") {
it("Finds the propagated user in wait1 after root mj1") {
val mjp = makePipeline(mapOf("mj1" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("2"))
assertEquals("propagatedUser1", stageUser?.user)
}

it("Finds no propagated user in wait1 after root mj1") {
val mjp = makePipeline(mapOf("mj1" to false))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("2"))
assertEquals(null, stageUser?.user, )
}

it("Finds no propagated user in wait1 after root mj1 when unset") {
val mjp = makePipeline(mapOf("mj1" to null))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("2"))
assertEquals(null, stageUser?.user)
}

it("Finds propagated user in wait2 when mj1 & mj2 are set") {
val mjp = makePipeline(mapOf("mj1" to true, "mj2" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("4"))
assertEquals("propagatedUser2", stageUser?.user)
}

it("Finds propagated user in wait2 when mj1 only is set") {
val mjp = makePipeline(mapOf("mj1" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("4"))
assertEquals("propagatedUser1", stageUser?.user)
}

it("Finds propagated user in wait3 when mj1 & mj3 are set") {
val mjp = makePipeline(mapOf("mj1" to true, "mj3" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("6"))
assertEquals("propagatedUser3", stageUser?.user)
}

it("Finds propagated user in wait3 when mj1 only is set") {
val mjp = makePipeline(mapOf("mj1" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("6"))
assertEquals("propagatedUser1", stageUser?.user)
}

it("Finds propagated user in wait4 when mj1 & mj3 are set") {
val mjp = makePipeline(mapOf("mj1" to true, "mj3" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("8"))
assertEquals("propagatedUser3", stageUser?.user)
}

it("Finds propagated user in wait4 when mj1 only is set") {
val mjp = makePipeline(mapOf("mj1" to true))
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("8"))
assertEquals("propagatedUser1", stageUser?.user)
}

it("Finds no propagated user in wait4 when none are set") {
val mjp = makePipeline(mapOf())
val stageUser = subject.retrieveAuthenticatedUser(mjp.stageByRef("8"))
assertEquals(null, stageUser?.user)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -1848,73 +1848,6 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}
}
}

describe("when a previous stage was a skipped manual judgment stage with auth propagated") {
given("a stage with a manual judgment type and auth propagated") {
val timeout = Duration.ofMinutes(5)
val lastModifiedUser = StageExecution.LastModifiedDetails()
lastModifiedUser.user = "user"
lastModifiedUser.allowedAccounts = listOf("user")

val pipeline = pipeline {
stage {
refId = "1"
type = manualJudgmentStage.type
manualJudgmentStage.plan(this)
status = SUCCEEDED
context["propagateAuthenticationContext"] = true
context["judgmentStatus"] = "continue"
lastModified = lastModifiedUser
}
}

val stageSkipped : StageExecution = stage {
refId = "2"
type = manualJudgmentStage.type
manualJudgmentStage.plan(this)
context["manualSkip"] = true
status = SKIPPED
requisiteStageRefIds = setOf("1")
}

val stageSpy = spy(stageSkipped)

val stageResult1 : com.netflix.spinnaker.orca.pipeline.util.StageNavigator.Result = mock()
whenever(stageResult1.stage) doReturn pipeline.stageByRef("1")
whenever(stageResult1.stageBuilder) doReturn manualJudgmentStage
val stageResult2 : com.netflix.spinnaker.orca.pipeline.util.StageNavigator.Result = mock()
whenever(stageResult2.stage) doReturn stageSpy
whenever(stageResult2.stageBuilder) doReturn manualJudgmentStage
pipeline.stages.add(stageSpy)

val stage = pipeline.stageByRef("2")
val message = RunTask(pipeline.type, pipeline.id, "foo", stage.id, "1", DummyTask::class.java)


beforeGroup {
tasks.forEach { whenever(it.extensionClass) doReturn it::class.java }
taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage }
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis()
whenever(stageNavigator.ancestors(stage)) doReturn listOf(stageResult2, stageResult1)
}

afterGroup(::resetMocks)

action("the handler receives a message") {
subject.handle(message)
}

it("marks the task as skipped") {
verify(queue).push(CompleteTask(message, SKIPPED))
}

it("verify if last authenticated user was retrieved from candidate stage") {
assertEquals(stageSpy.lastModified, lastModifiedUser)
}

}
}
})

data class BackOff(
Expand Down

0 comments on commit ba9ac0d

Please sign in to comment.