Skip to content

Commit

Permalink
feat(batchUpdate): Pipeline config batch update (#4773)
Browse files Browse the repository at this point in the history
* feat(pipeline executions/orca) : Added code to save multiple pipelines at once to sql database.

This is part of: spinnaker/spinnaker#6147.

Enhanced SavePipelineTask.java to

Added code to ensure that the SavePipelineTask.java also accepts list of pipelines json.
This method will validate all the pipelines.
This method will call the front50 service to save the pipelines list.

Enhanced Front50Service.groovy to

Added new rest api which accepts list of pipelines json.

* feat(pipelines executions/orca): Support for bulk saving pipelines

* feat(front50): Add okhttp timeouts for front50 service

Adds configuration properties for readTimeout, writeTimeout, and connectTimeout on the Front50 OkHttp service.

* bug(front50): add GroovyJson dependency

Test was failing without it. The test code that uses it was added back when we were in Groovy 2, after the upgrade to Groovy 3 I believe it's required as an explicit dependency.

* refactor(front50): remove old timeout config value from orca.yml

* refactor(front50): change RetrofitError to SpinnakerHttpException

---------

Co-authored-by: sanopsmx <[email protected]>
Co-authored-by: Arifullah Pattan <[email protected]>
Co-authored-by: David Byron <[email protected]>
Co-authored-by: Richard Timpson <[email protected]>
  • Loading branch information
5 people committed Aug 21, 2024
1 parent 4c9d513 commit 1d85e7c
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 55 deletions.
1 change: 1 addition & 0 deletions orca-front50/orca-front50.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
testImplementation(project(":orca-test-groovy"))
testImplementation(project(":orca-pipelinetemplate"))
testImplementation("com.github.ben-manes.caffeine:guava")
testImplementation("org.codehaus.groovy:groovy-json")
testRuntimeOnly("net.bytebuddy:byte-buddy")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ interface Front50Service {
@POST("/pipelines")
Response savePipeline(@Body Map pipeline, @Query("staleCheck") boolean staleCheck)

@POST("/pipelines/batchUpdate")
Response savePipelines(@Body List<Map<String, Object>> pipelines, @Query("staleCheck") boolean staleCheck)

@PUT("/pipelines/{pipelineId}")
Response updatePipeline(@Path("pipelineId") String pipelineId, @Body Map pipeline)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.retrofit.RetrofitConfiguration
import com.netflix.spinnaker.orca.retrofit.logging.RetrofitSlf4jLog
import groovy.transform.CompileStatic
import okhttp3.OkHttpClient
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand All @@ -42,6 +43,8 @@ import retrofit.RequestInterceptor
import retrofit.RestAdapter
import retrofit.converter.JacksonConverter

import java.util.concurrent.TimeUnit

import static retrofit.Endpoints.newFixedEndpoint

@Configuration
Expand Down Expand Up @@ -71,11 +74,17 @@ class Front50Configuration {
}

@Bean
Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper) {
Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper, Front50ConfigurationProperties front50ConfigurationProperties) {
OkHttpClient okHttpClient = clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl()));
okHttpClient = okHttpClient.newBuilder()
.readTimeout(front50ConfigurationProperties.okhttp.readTimeoutMs, TimeUnit.MILLISECONDS)
.writeTimeout(front50ConfigurationProperties.okhttp.writeTimeoutMs, TimeUnit.MILLISECONDS)
.connectTimeout(front50ConfigurationProperties.okhttp.connectTimeoutMs, TimeUnit.MILLISECONDS)
.build();
new RestAdapter.Builder()
.setRequestInterceptor(spinnakerRequestInterceptor)
.setEndpoint(front50Endpoint)
.setClient(new Ok3Client(clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl()))))
.setClient(new Ok3Client(okHttpClient))
.setLogLevel(retrofitLogLevel)
.setLog(new RetrofitSlf4jLog(Front50Service))
.setConverter(new JacksonConverter(mapper))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,15 @@ public class Front50ConfigurationProperties {
* <p>When true: GET /pipelines/triggeredBy/{pipelineId}/{status} When false: GET /pipelines
*/
boolean useTriggeredByEndpoint = true;

OkHttpConfigurationProperties okhttp = new OkHttpConfigurationProperties();

@Data
public static class OkHttpConfigurationProperties {
int readTimeoutMs = 10000;

int writeTimeoutMs = 10000;

int connectTimeoutMs = 10000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.netflix.spinnaker.orca.exceptions.PipelineTemplateValidationException
import com.netflix.spinnaker.orca.api.pipeline.ExecutionPreprocessor
import com.netflix.spinnaker.orca.front50.DependentPipelineStarter
import com.netflix.spinnaker.orca.front50.Front50Service
import com.netflix.spinnaker.orca.front50.config.Front50Configuration
import com.netflix.spinnaker.orca.front50.config.Front50ConfigurationProperties
import com.netflix.spinnaker.orca.listeners.ExecutionListener
import com.netflix.spinnaker.orca.listeners.Persister
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.netflix.spinnaker.orca.front50.tasks;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException;
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask;
import com.netflix.spinnaker.orca.api.pipeline.TaskResult;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
Expand All @@ -25,6 +28,7 @@
import com.netflix.spinnaker.orca.front50.pipeline.SavePipelineStage;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +40,7 @@
@Component
public class SavePipelineTask implements RetryableTask {

private Logger log = LoggerFactory.getLogger(getClass());
private final Logger log = LoggerFactory.getLogger(getClass());

@Autowired
SavePipelineTask(
Expand All @@ -60,62 +64,90 @@ public TaskResult execute(StageExecution stage) {
"Front50 is not enabled, no way to save pipeline. Fix this by setting front50.enabled: true");
}

if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException("pipeline context must be provided");
}
Map<String, Object> pipeline = new HashMap<>();
List<Map<String, Object>> pipelines = new ArrayList<>();

boolean isSavingMultiplePipelines =
(boolean) stage.getContext().getOrDefault("isSavingMultiplePipelines", false);

boolean isBulkSavingPipelines =
(boolean) stage.getContext().getOrDefault("isBulkSavingPipelines", false);

Map<String, Object> pipeline;
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
boolean staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);

if (isBulkSavingPipelines) {
if (!stage.getContext().containsKey("pipelines")) {
throw new IllegalArgumentException(
"pipelines context must be provided when saving multiple pipelines");
}
pipelines = (List<Map<String, Object>>) stage.decodeBase64("/pipelines", List.class);
log.info(
"Bulk saving the following pipelines: {}",
pipelines.stream().map(p -> p.get("name")).collect(Collectors.toList()));
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
if (!stage.getContext().containsKey("pipeline")) {
throw new IllegalArgumentException(
"pipeline context must be provided when saving a single pipeline");
}
if (!(stage.getContext().get("pipeline") instanceof String)) {
pipeline = (Map<String, Object>) stage.getContext().get("pipeline");
} else {
pipeline = (Map<String, Object>) stage.decodeBase64("/pipeline", Map.class);
}
pipelines.add(pipeline);
log.info("Saving single pipeline {}", pipeline.get("name"));
}

if (!pipeline.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipeline);
if (existingPipeline != null) {
pipeline.put("index", existingPipeline.get("index"));
// Preprocess pipelines before saving
for (Map<String, Object> pipe : pipelines) {
if (!pipe.containsKey("index")) {
Map<String, Object> existingPipeline = fetchExistingPipeline(pipe);
if (existingPipeline != null) {
pipe.put("index", existingPipeline.get("index"));
}
}
}
String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount");
if (serviceAccount != null) {
updateServiceAccount(pipeline, serviceAccount);
}
final Boolean isSavingMultiplePipelines =
(Boolean)
Optional.ofNullable(stage.getContext().get("isSavingMultiplePipelines")).orElse(false);
final Boolean staleCheck =
(Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false);
if (stage.getContext().get("pipeline.id") != null
&& pipeline.get("id") == null
&& !isSavingMultiplePipelines) {
pipeline.put("id", stage.getContext().get("pipeline.id"));

// We need to tell front50 to regenerate cron trigger id's
pipeline.put("regenerateCronTriggerIds", true);
}
String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount");
if (serviceAccount != null) {
updateServiceAccount(pipe, serviceAccount);
}

if (stage.getContext().get("pipeline.id") != null
&& pipe.get("id") == null
&& !isSavingMultiplePipelines) {
pipe.put("id", stage.getContext().get("pipeline.id"));

pipelineModelMutators.stream()
.filter(m -> m.supports(pipeline))
.forEach(m -> m.mutate(pipeline));
// We need to tell front50 to regenerate cron trigger id's
pipe.put("regenerateCronTriggerIds", true);
}

pipelineModelMutators.stream().filter(m -> m.supports(pipe)).forEach(m -> m.mutate(pipe));
}

Response response = front50Service.savePipeline(pipeline, staleCheck);
Response response;
if (isBulkSavingPipelines) {
response = front50Service.savePipelines(pipelines, staleCheck);
} else {
response = front50Service.savePipeline(pipeline, staleCheck);
}

Map<String, Object> outputs = new HashMap<>();
outputs.put("notification.type", "savepipeline");
outputs.put("application", pipeline.get("application"));
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("application", stage.getContext().get("application"));

Map<String, Object> saveResult = new HashMap<>();
try {
Map<String, Object> savedPipeline =
(Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
outputs.put("pipeline.id", savedPipeline.get("id"));
saveResult = (Map<String, Object>) objectMapper.readValue(response.getBody().in(), Map.class);
} catch (Exception e) {
log.error("Unable to deserialize saved pipeline, reason: ", e.getMessage());
log.error("Unable to deserialize save pipeline(s) result, reason: ", e);
}

if (pipeline.containsKey("id")) {
outputs.put("pipeline.id", pipeline.get("id"));
}
if (isBulkSavingPipelines) {
outputs.put("bulksave", saveResult);
} else {
outputs.put("pipeline.name", pipeline.get("name"));
outputs.put("pipeline.id", saveResult.getOrDefault("id", pipeline.getOrDefault("id", "")));
}

final ExecutionStatus status;
Expand Down Expand Up @@ -161,14 +193,16 @@ private void updateServiceAccount(Map<String, Object> pipeline, String serviceAc
}

private Map<String, Object> fetchExistingPipeline(Map<String, Object> newPipeline) {
String applicationName = (String) newPipeline.get("application");
String newPipelineID = (String) newPipeline.get("id");
if (!StringUtils.isEmpty(newPipelineID)) {
return front50Service.getPipelines(applicationName).stream()
.filter(m -> m.containsKey("id"))
.filter(m -> m.get("id").equals(newPipelineID))
.findFirst()
.orElse(null);
if (StringUtils.isNotEmpty(newPipelineID)) {
try {
return front50Service.getPipeline(newPipelineID);
} catch (SpinnakerHttpException e) {
// Return a null if pipeline with expected id not found
if (e.getResponseCode() == HTTP_NOT_FOUND) {
log.debug("Existing pipeline with id {} not found. Returning null.", newPipelineID);
}
}
}
return null;
}
Expand Down
Loading

0 comments on commit 1d85e7c

Please sign in to comment.