Skip to content

Instantly share code, notes, and snippets.

@mminella
Created May 12, 2016 19:58
Show Gist options
  • Save mminella/1057aa906db62d795d7072c0b07544c6 to your computer and use it in GitHub Desktop.
Save mminella/1057aa906db62d795d7072c0b07544c6 to your computer and use it in GitHub Desktop.
CloudFoundryTaskLauncher
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.deployer.spi.cloudfoundry;
import org.cloudfoundry.client.CloudFoundryClient;
import org.cloudfoundry.client.v3.servicebindings.CreateServiceBindingRequest;
import org.cloudfoundry.client.v2.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.Relationship;
import org.cloudfoundry.client.v3.applications.Application;
import org.cloudfoundry.client.v3.applications.CreateApplicationRequest;
import org.cloudfoundry.client.v3.applications.DeleteApplicationRequest;
import org.cloudfoundry.client.v3.applications.ListApplicationDropletsRequest;
import org.cloudfoundry.client.v3.applications.ListApplicationDropletsResponse;
import org.cloudfoundry.client.v3.applications.ListApplicationsRequest;
import org.cloudfoundry.client.v3.applications.ListApplicationsResponse;
import org.cloudfoundry.client.v3.droplets.Droplet;
import org.cloudfoundry.client.v3.droplets.GetDropletRequest;
import org.cloudfoundry.client.v3.packages.CreatePackageRequest;
import org.cloudfoundry.client.v3.packages.GetPackageRequest;
import org.cloudfoundry.client.v3.packages.Package;
import org.cloudfoundry.client.v3.packages.StagePackageRequest;
import org.cloudfoundry.client.v3.packages.UploadPackageRequest;
import org.cloudfoundry.client.v3.tasks.CancelTaskRequest;
import org.cloudfoundry.client.v3.tasks.CreateTaskRequest;
import org.cloudfoundry.client.v3.tasks.GetTaskRequest;
import org.cloudfoundry.client.v3.tasks.GetTaskResponse;
import org.cloudfoundry.client.v3.tasks.Task;
import org.cloudfoundry.operations.CloudFoundryOperations;
import org.cloudfoundry.operations.services.ServiceInstance;
import org.cloudfoundry.util.PaginationUtils;
import org.cloudfoundry.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.deployer.spi.task.LaunchState;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.deployer.spi.task.TaskStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.cloudfoundry.util.DelayUtils.exponentialBackOff;
import static org.cloudfoundry.util.tuple.TupleUtils.function;
import static org.springframework.util.StringUtils.commaDelimitedListToSet;
/**
* @author Greg Turnquist
* @author Michael Minella
*/
public class CloudFoundryTaskLauncher implements TaskLauncher {
private static final Logger logger = LoggerFactory
.getLogger(CloudFoundryTaskLauncher.class);
private final CloudFoundryClient client;
private final CloudFoundryOperations operations;
private final CloudFoundryDeployerProperties properties;
public static final String SERVICES_PROPERTY_KEY = "spring.cloud.deployer.cloudfoundry.defaults.services";
public CloudFoundryTaskLauncher(CloudFoundryClient client, CloudFoundryOperations operations, CloudFoundryDeployerProperties properties) {
this.client = client;
this.operations = operations;
this.properties = properties;
}
@Override
public void cancel(String id) {
asyncCancel(id).subscribe();
}
/**
* Set up a reactor pipeline to launch a task. Before launch, check if it exists. If not, deploy. Then launch.
*
* @param request
* @return name of the launched task, returned without waiting for reactor pipeline to complete
*/
@Override
public String launch(AppDeploymentRequest request) {
asyncLaunch(request).subscribe();
/*
* The blocking API does NOT wait for async operations to complete before
* returning
*/
return request.getDefinition().getName();
}
/**
* Lookup the current status based on task id.
*
* @param id
* @return
*/
@Override
public TaskStatus status(String id) {
return asyncStatus(id).get(Duration.ofSeconds(30));
}
Mono<Void> asyncCancel(String id) {
return client.applicationsV3()
.list(ListApplicationsRequest.builder()
.name(id)
.page(1)
.build())
.log("stream.listApplications")
.flatMap(response -> Flux.fromIterable(response.getResources()))
.log("stream.applications")
.singleOrEmpty()
.log("stream.singleOrEmpty")
.map(Application::getId)
.log("stream.taskIds")
.then(taskId -> client.tasks()
.cancel(CancelTaskRequest.builder()
.taskId(taskId)
.build())
.log("stream.cancelTask"))
.after();
}
Mono<String> asyncLaunch(AppDeploymentRequest request) {
return deploy(request)
.log("stream.deploy")
.then(applicationId -> bindServices(request, applicationId))
.log("stream.bindServices")
.then(applicationId -> launchTask(applicationId))
.log("stream.launched");
}
Mono<String> bindServices(AppDeploymentRequest request, String applicationId) {
return operations.services()
.listInstances()
.log("stream.serviceInstances")
.filter(instance -> servicesToBind(request).contains(instance.getName()))
.log("stream.filteredInstances")
.map(ServiceInstance::getId)
.log("stream.serviceInstanceId")
.flatMap(serviceInstanceId -> client.serviceBindingsV3()
.create(CreateServiceBindingRequest.builder()
.relationships(CreateServiceBindingRequest.Relationships.builder()
.application(Relationship.builder().id(applicationId).build())
.serviceInstance(Relationship.builder().id(serviceInstanceId).build())
.build())
.type(CreateServiceBindingRequest.ServiceBindingType.APP)
.build())
.log("created"))
.log("stream.serviceBindingCreated")
.map(a -> applicationId)
.log("stream.applicationId")
.single();
}
private Set<String> servicesToBind(AppDeploymentRequest request) {
Set<String> services = new HashSet<>();
services.addAll(properties.getServices());
services.addAll(commaDelimitedListToSet(request.getEnvironmentProperties().get(SERVICES_PROPERTY_KEY)));
return services;
}
Mono<TaskStatus> asyncStatus(String id) {
return client.applicationsV3()
.list(ListApplicationsRequest.builder()
.name(id)
.page(1)
.build())
.log("stream.listApplications")
.flatMap(response -> Flux.fromIterable(response.getResources()))
.single()
.map(Application::getId)
.then(taskId -> client.tasks()
.get(GetTaskRequest.builder()
.taskId(taskId)
.build()))
.map(this::mapTaskToStatus)
.otherwise(throwable -> {
logger.error(throwable.getMessage());
return Mono.just(new TaskStatus(id, LaunchState.unknown, null));
});
}
/**
* Create a new application using supplied {@link AppDeploymentRequest}.
*
* @param request
* @return {@link Mono} containing the newly created {@link Droplet}'s id
*/
Mono<String> createAndUploadApplication(AppDeploymentRequest request) {
return createApplication(request.getDefinition().getName(), getSpaceId(request))
.then(applicationId -> createPackage(applicationId)
.and(Mono.just(applicationId)))
.then(function((packageId, applicationId) -> uploadPackage(packageId, request)
.and(Mono.just(applicationId))))
.then(function((packageId, applicationId) -> waitForPackageProcessing(client, packageId)
.and(Mono.just(applicationId))))
.then(function((packageId, applicationId) -> createDroplet(packageId)
.and(Mono.just(applicationId))))
.then(function((dropletId, applicationId) -> waitForDropletProcessing(client, dropletId)
.and(Mono.just(applicationId))))
.map(function((dropletId, applicationId) -> applicationId));
}
private static Mono<String> waitForDropletProcessing(CloudFoundryClient cloudFoundryClient, String dropletId) {
return cloudFoundryClient.droplets()
.get(GetDropletRequest.builder()
.dropletId(dropletId)
.build())
.log("stream.waitingForDroplet")
.where(response -> !response.getState().equals("PENDING"))
.repeatWhenEmpty(50, exponentialBackOff(Duration.ofSeconds(10), Duration.ofMinutes(1), Duration.ofMinutes(10)))
.map(response -> dropletId);
}
private static Mono<String> waitForPackageProcessing(CloudFoundryClient cloudFoundryClient, String packageId) {
return cloudFoundryClient.packages()
.get(GetPackageRequest.builder()
.packageId(packageId)
.build())
.where(response -> response.getState().equals("READY"))
.repeatWhenEmpty(50, exponentialBackOff(Duration.ofSeconds(5), Duration.ofMinutes(1), Duration.ofMinutes(10)))
.map(response -> packageId);
}
/**
* Create a new Cloud Foundry application by name
*
* @param name
* @param spaceId
* @return applicationId
*/
Mono<String> createApplication(String name, Mono<String> spaceId) {
return spaceId
.flatMap(spaceId2 -> client.applicationsV3()
.create(CreateApplicationRequest.builder()
.name(name)
.relationship("space", Relationship.builder()
.id(spaceId2)
.build())
.build()))
.single()
.log("stream.createApplication")
.map(Application::getId)
.log("stream.getApplicationId");
}
/**
* Create Cloud Foundry package by applicationId
*
* @param applicationId
* @return packageId
*/
Mono<String> createPackage(String applicationId) {
return client.packages()
.create(CreatePackageRequest.builder()
.applicationId(applicationId)
.type(CreatePackageRequest.PackageType.BITS)
.build())
.log("stream.createPackage")
.map(Package::getId)
.log("stream.getPackageId");
}
/**
* Create an application with a package, then upload the bits into a staging.
*
* @param request
* @return {@link Mono} with the applicationId
*/
Mono<String> deploy(AppDeploymentRequest request) {
return getApplicationId(client, request.getDefinition().getName())
.then(applicationId -> getReadyApplicationId(client, applicationId)
.otherwiseIfEmpty(deleteExistingApplication(client, applicationId)))
.otherwiseIfEmpty(createAndUploadApplication(request));
}
Mono<String> getSpaceId(AppDeploymentRequest request) {
return Mono
.just(request.getEnvironmentProperties().get("organization"))
.flatMap(organization -> PaginationUtils
.requestResources(page -> client.spaces()
.list(ListSpacesRequest.builder()
.name(request.getEnvironmentProperties().get("space"))
.page(page)
.build())))
.log("stream.listSpaces")
.single()
.log("stream.space")
.map(ResourceUtils::getId)
.log("stream.spaceId")
.cache()
.log("stream.cacheSpaceId");
}
/**
* Create a new {@link Task} based on applicationId.
*
* @param applicationId
* @return {@link Mono} containing name of the task that was launched
*/
Mono<String> launchTask(String applicationId) {
return getDroplet(applicationId)
.then(droplet -> createTask(droplet, applicationId));
}
Mono<String> createTask(ListApplicationDropletsResponse.Resource resource, String applicationId) {
System.out.println(">> command = " + ((Map<String, Object>) resource.getResults().get("process_types")).get("web"));
return client.tasks()
.create(CreateTaskRequest.builder()
.applicationId(applicationId)
.dropletId(resource.getId())
.name("timestamp")
// .command((String) ((Map<String, Object>) resource.getResults().get("process_types")).get("web"))
// .command("eval exec $PWD/.java-buildpack/open_jdk_jre/bin/java -Xmx2048m -Xms1024m -cp . org.springframework.boot.loader.JarLauncher --spring.profiles.active=cloud")
.command("eval exec $PWD/.java-buildpack/open_jdk_jre/bin/java -Xmx2048m -Xms1024m -cp . org.springframework.boot.loader.JarLauncher")
.build())
.log("stream.createTask")
.map(Task::getId)
.log("stream.taskName");
}
Mono<ListApplicationDropletsResponse.Resource> getDroplet(String applicationId) {
return client.applicationsV3()
.listDroplets(ListApplicationDropletsRequest.builder()
.applicationId(applicationId)
.build())
.log("stream.listDroplet")
.flatMap(response -> Flux.fromIterable(response.getResources()))
.single();
}
/**
* Upload bits to a Cloud Foundry application by packageId.
*
* @param packageId
* @param request
* @return packageId
*/
Mono<String> uploadPackage(String packageId, AppDeploymentRequest request) {
try {
return client.packages()
.upload(UploadPackageRequest.builder()
.packageId(packageId)
.bits(request.getResource().getInputStream())
.build())
.log("stream.uploadPackage")
.map(Package::getId)
.log("stream.uploadedPackageId");
} catch (IOException e) {
return Mono.error(e);
}
}
private static Mono<String> deleteExistingApplication(CloudFoundryClient client, String applicationId) {
return requestDeleteApplication(client, applicationId)
.after(Mono::empty);
}
/**
* Look up the applicationId for a given app and confine results to 0 or 1 instance
*
* @param client
* @param name
* @return {@link Mono} with the application's id
*/
private static Mono<String> getApplicationId(CloudFoundryClient client, String name) {
return requestListApplications(client, name)
.singleOrEmpty()
.map(Application::getId);
}
private static Mono<String> getReadyApplicationId(CloudFoundryClient client, String applicationId) {
return requestApplicationDroplets(client, applicationId)
.filter(resource -> "STAGED" .equals(resource.getState()))
.next()
.map(resource -> applicationId);
}
private static Flux<ListApplicationDropletsResponse.Resource> requestApplicationDroplets(CloudFoundryClient client, String applicationId) {
return client.applicationsV3()
.listDroplets(ListApplicationDropletsRequest.builder()
.applicationId(applicationId)
.page(1)
.build())
.flatMap(response -> Flux.fromIterable(response.getResources()));
}
private static Mono<Void> requestDeleteApplication(CloudFoundryClient client, String applicationId) {
return client.applicationsV3()
.delete(DeleteApplicationRequest.builder()
.applicationId(applicationId)
.build());
}
/**
* List ALL application entries filtered to the provided name
*
* @param client
* @param name
* @return {@link Flux} of application resources {@link ListApplicationsResponse.Resource}
*/
private static Flux<ListApplicationsResponse.Resource> requestListApplications(
CloudFoundryClient client, String name) {
return client.applicationsV3()
.list(ListApplicationsRequest.builder()
.name(name)
.page(1)
.build())
.log("stream.listApplications")
.flatMap(response -> Flux.fromIterable(response.getResources()))
.log("stream.applications");
}
/**
* Create a new {@link Droplet} based upon packageId.
*
* @param packageId
* @return {@link Mono} containing the {@link Droplet}'s ID.
*/
private Mono<String> createDroplet(String packageId) {
return client.packages()
.stage(StagePackageRequest.builder()
.packageId(packageId)
.build())
.log("stream.stageDroplet")
.map(Droplet::getId)
.log("stream.dropletId");
}
private TaskStatus mapTaskToStatus(GetTaskResponse getTaskResponse) {
switch (getTaskResponse.getState()) {
case Task.SUCCEEDED_STATE:
return new TaskStatus(getTaskResponse.getId(), LaunchState.complete, null);
case Task.RUNNING_STATE:
return new TaskStatus(getTaskResponse.getId(), LaunchState.running, null);
case Task.PENDING_STATE:
return new TaskStatus(getTaskResponse.getId(), LaunchState.launching, null);
case Task.CANCELING_STATE:
return new TaskStatus(getTaskResponse.getId(), LaunchState.cancelled, null);
case Task.FAILED_STATE:
return new TaskStatus(getTaskResponse.getId(), LaunchState.failed, null);
default:
throw new IllegalStateException(
"Unsupported CF task state " + getTaskResponse.getState());
}
}
}
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.deployer.spi.cloudfoundry;
import org.cloudfoundry.operations.CloudFoundryOperations;
import org.cloudfoundry.operations.CloudFoundryOperationsBuilder;
import org.cloudfoundry.util.test.TestSubscriber;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.IntegrationTest;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.deployer.spi.core.AppDefinition;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.deployer.spi.task.LaunchState;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Runs integration tests for {@link CloudFoundryTaskLauncher}, using the production configuration,
* that may be configured via {@link CloudFoundryDeployerProperties}.
*
* Tests are only run if a successful connection can be made at startup.
*
* @author Eric Bottard
* @author Greg Turnquist
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = CloudFoundryDeployerProperties.class)
@IntegrationTest
public class CloudFoundryTaskLauncherIntegrationTests {
private static final Logger log = LoggerFactory.getLogger(CloudFoundryTaskLauncherIntegrationTests.class);
private CloudFoundryTaskLauncher taskLauncher;
@Autowired
ApplicationContext context;
@Autowired
CloudFoundryDeployerProperties properties;
AppDeploymentRequest request;
/**
* Execution environments may override this default value to have tests wait longer for a deployment, for example if
* running in an environment that is known to be slow.
*/
protected double timeoutMultiplier = 1.0D;
@Before
public void init() {
String multiplier = System.getenv("CF_DEPLOYER_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
Map<String, String> envProperties = new HashMap<>();
envProperties.put("organization", "pcfdev-org");
envProperties.put("space", "pcfdev-space");
envProperties.put("spring.cloud.deployer.cloudfoundry.defaults.services", "my_mysql");
request = new AppDeploymentRequest(
new AppDefinition("timestamp", Collections.emptyMap()),
context.getResource("classpath:batch-job-1.0.0.BUILD-SNAPSHOT.jar"),
envProperties);
CloudFoundryOperations cloudFoundryOperations = new CloudFoundryOperationsBuilder()
.cloudFoundryClient(cfAvailable.getResource())
.target("pcfdev-org", "pcfdev-space")
.build();
taskLauncher = new CloudFoundryTaskLauncher(cfAvailable.getResource(), cloudFoundryOperations, properties);
}
@Test
public void testNonExistentAppsStatus() {
assertThat(taskLauncher.status("foo").getState(), is(LaunchState.unknown));
}
@Test
public void testSimpleLaunch() throws InterruptedException {
TestSubscriber<String> subscriber = new TestSubscriber<>();
String taskId = taskLauncher.asyncLaunch(request).get(30000000);
// System.out.println(">> taskId");
//
// TaskStatus status = taskLauncher.asyncStatus(taskId).get();
//
// while (status.getState().equals(LaunchState.unknown)) {
// Thread.sleep(5000);
// status = taskLauncher.asyncStatus(taskId).get();
// }
//
// assertEquals(LaunchState.complete, status.getState());
// exists.subscribe(subscriber
// .assertCount(1)
// .assertEquals(false));
subscriber.verify(Duration.ofDays(1));
}
/**
* Return the timeout to use for repeatedly querying a module while it is being deployed.
* Default value is one minute, being queried every 5 seconds.
*/
protected Attempts deploymentTimeout() {
return new Attempts(12, (int) (5000 * timeoutMultiplier));
}
/**
* Return the timeout to use for repeatedly querying a module while it is being un-deployed.
* Default value is one minute, being queried every 5 seconds.
*/
protected Attempts undeploymentTimeout() {
return new Attempts(20, (int) (5000 * timeoutMultiplier));
}
/**
* Represents a timeout for querying status, with repetitive queries until a certain number have been made.
* @author Eric Bottard
*/
protected static class Attempts {
public final int noAttempts;
public final int pause;
public Attempts(int noAttempts, int pause) {
this.noAttempts = noAttempts;
this.pause = pause;
}
}
@ClassRule
public static CloudFoundryTestSupport cfAvailable = new CloudFoundryTestSupport();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment