Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: With this PR we add the possibility to have multiple connection pools in Orca #4619

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

enum class ConnectionPools (val type: String
) {
READ("read"),
WRITE("write"),
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.netflix.spinnaker.orca.sql.pipeline.persistence

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import java.sql.ResultSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.spinnaker.orca.sql.pipeline.persistence

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.core.RetrySupport
import com.netflix.spinnaker.kork.exceptions.ConfigurationException
import com.netflix.spinnaker.kork.exceptions.SystemException
Expand Down Expand Up @@ -50,6 +51,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu
import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException
import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException
import de.huxhorn.sulky.ulid.SpinULID
import org.checkerframework.checker.units.qual.C
import java.lang.System.currentTimeMillis
import java.security.SecureRandom
import org.jooq.DSLContext
Expand Down Expand Up @@ -89,7 +91,7 @@ class SqlExecutionRepository(
private val retryProperties: RetryProperties,
private val batchReadSize: Int = 10,
private val stageReadSize: Int = 200,
private val poolName: String = "default",
private val poolName: String = ConnectionPools.WRITE.type,
private val interlink: Interlink? = null,
private val executionRepositoryListeners: Collection<ExecutionRepositoryListener> = emptyList()
) : ExecutionRepository, ExecutionStatisticsRepository {
Expand Down Expand Up @@ -245,7 +247,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -341,25 +343,26 @@ class SqlExecutionRepository(
private fun cleanupOldDeletedExecutions() {
// Note: this runs as part of a delete operation but is not critical (best effort cleanup)
// Hence it doesn't need to be in a transaction and we "eat" the exceptions here

try {
val idsToDelete = jooq
.select(field("id"))
.from(table("deleted_executions"))
.where(field("deleted_at").lt(timestampSub(now(), 1, DatePart.DAY)))
.fetch(field("id"), Int::class.java)

// Perform chunked delete in the rare event that there are many executions to clean up
idsToDelete
.chunked(25)
.forEach { chunk ->
jooq
.deleteFrom(table("deleted_executions"))
.where(field("id").`in`(*chunk.toTypedArray()))
.execute()
}
} catch (e: Exception) {
log.error("Failed to cleanup some deleted_executions", e)
withPool(poolName) {
try {
val idsToDelete = jooq
.select(field("id"))
.from(table("deleted_executions"))
.where(field("deleted_at").lt(timestampSub(now(), 1, DatePart.DAY)))
.fetch(field("id"), Int::class.java)

// Perform chunked delete in the rare event that there are many executions to clean up
idsToDelete
.chunked(25)
.forEach { chunk ->
jooq
.deleteFrom(table("deleted_executions"))
.where(field("id").`in`(*chunk.toTypedArray()))
.execute()
}
} catch (e: Exception) {
log.error("Failed to cleanup some deleted_executions", e)
}
}
}

Expand All @@ -380,7 +383,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
type,
fields = selectFields() + field("status"),
Expand Down Expand Up @@ -409,7 +412,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand All @@ -426,7 +429,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -470,7 +473,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -508,7 +511,7 @@ class SqlExecutionRepository(
}

override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val execution = jooq.selectExecution(ORCHESTRATION)
.where(
field("id").eq(
Expand Down Expand Up @@ -536,7 +539,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val execution = jooq.selectExecution(PIPELINE)
.where(
field("id").eq(
Expand Down Expand Up @@ -575,7 +578,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -598,7 +601,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -624,7 +627,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -653,7 +656,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
PIPELINE,
conditions = {
Expand Down Expand Up @@ -716,7 +719,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -725,7 +728,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -745,7 +748,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -971,7 +974,7 @@ class SqlExecutionRepository(
id: String,
forUpdate: Boolean = false
): PipelineExecution? {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = ctx.selectExecution(type).where(id.toWhereCondition())
if (forUpdate) {
select.forUpdate()
Expand All @@ -986,7 +989,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(ConnectionPools.READ.type) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down