Created
May 12, 2016 19:58
-
-
Save mminella/1057aa906db62d795d7072c0b07544c6 to your computer and use it in GitHub Desktop.
CloudFoundryTaskLauncher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2016 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.cloud.deployer.spi.cloudfoundry; | |
import org.cloudfoundry.client.CloudFoundryClient; | |
import org.cloudfoundry.client.v3.servicebindings.CreateServiceBindingRequest; | |
import org.cloudfoundry.client.v2.spaces.ListSpacesRequest; | |
import org.cloudfoundry.client.v3.Relationship; | |
import org.cloudfoundry.client.v3.applications.Application; | |
import org.cloudfoundry.client.v3.applications.CreateApplicationRequest; | |
import org.cloudfoundry.client.v3.applications.DeleteApplicationRequest; | |
import org.cloudfoundry.client.v3.applications.ListApplicationDropletsRequest; | |
import org.cloudfoundry.client.v3.applications.ListApplicationDropletsResponse; | |
import org.cloudfoundry.client.v3.applications.ListApplicationsRequest; | |
import org.cloudfoundry.client.v3.applications.ListApplicationsResponse; | |
import org.cloudfoundry.client.v3.droplets.Droplet; | |
import org.cloudfoundry.client.v3.droplets.GetDropletRequest; | |
import org.cloudfoundry.client.v3.packages.CreatePackageRequest; | |
import org.cloudfoundry.client.v3.packages.GetPackageRequest; | |
import org.cloudfoundry.client.v3.packages.Package; | |
import org.cloudfoundry.client.v3.packages.StagePackageRequest; | |
import org.cloudfoundry.client.v3.packages.UploadPackageRequest; | |
import org.cloudfoundry.client.v3.tasks.CancelTaskRequest; | |
import org.cloudfoundry.client.v3.tasks.CreateTaskRequest; | |
import org.cloudfoundry.client.v3.tasks.GetTaskRequest; | |
import org.cloudfoundry.client.v3.tasks.GetTaskResponse; | |
import org.cloudfoundry.client.v3.tasks.Task; | |
import org.cloudfoundry.operations.CloudFoundryOperations; | |
import org.cloudfoundry.operations.services.ServiceInstance; | |
import org.cloudfoundry.util.PaginationUtils; | |
import org.cloudfoundry.util.ResourceUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; | |
import org.springframework.cloud.deployer.spi.task.LaunchState; | |
import org.springframework.cloud.deployer.spi.task.TaskLauncher; | |
import org.springframework.cloud.deployer.spi.task.TaskStatus; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.HashSet; | |
import java.util.Map; | |
import java.util.Set; | |
import static org.cloudfoundry.util.DelayUtils.exponentialBackOff; | |
import static org.cloudfoundry.util.tuple.TupleUtils.function; | |
import static org.springframework.util.StringUtils.commaDelimitedListToSet; | |
/** | |
* @author Greg Turnquist | |
* @author Michael Minella | |
*/ | |
public class CloudFoundryTaskLauncher implements TaskLauncher { | |
private static final Logger logger = LoggerFactory | |
.getLogger(CloudFoundryTaskLauncher.class); | |
private final CloudFoundryClient client; | |
private final CloudFoundryOperations operations; | |
private final CloudFoundryDeployerProperties properties; | |
public static final String SERVICES_PROPERTY_KEY = "spring.cloud.deployer.cloudfoundry.defaults.services"; | |
public CloudFoundryTaskLauncher(CloudFoundryClient client, CloudFoundryOperations operations, CloudFoundryDeployerProperties properties) { | |
this.client = client; | |
this.operations = operations; | |
this.properties = properties; | |
} | |
@Override | |
public void cancel(String id) { | |
asyncCancel(id).subscribe(); | |
} | |
/** | |
* Set up a reactor pipeline to launch a task. Before launch, check if it exists. If not, deploy. Then launch. | |
* | |
* @param request | |
* @return name of the launched task, returned without waiting for reactor pipeline to complete | |
*/ | |
@Override | |
public String launch(AppDeploymentRequest request) { | |
asyncLaunch(request).subscribe(); | |
/* | |
* The blocking API does NOT wait for async operations to complete before | |
* returning | |
*/ | |
return request.getDefinition().getName(); | |
} | |
/** | |
* Lookup the current status based on task id. | |
* | |
* @param id | |
* @return | |
*/ | |
@Override | |
public TaskStatus status(String id) { | |
return asyncStatus(id).get(Duration.ofSeconds(30)); | |
} | |
Mono<Void> asyncCancel(String id) { | |
return client.applicationsV3() | |
.list(ListApplicationsRequest.builder() | |
.name(id) | |
.page(1) | |
.build()) | |
.log("stream.listApplications") | |
.flatMap(response -> Flux.fromIterable(response.getResources())) | |
.log("stream.applications") | |
.singleOrEmpty() | |
.log("stream.singleOrEmpty") | |
.map(Application::getId) | |
.log("stream.taskIds") | |
.then(taskId -> client.tasks() | |
.cancel(CancelTaskRequest.builder() | |
.taskId(taskId) | |
.build()) | |
.log("stream.cancelTask")) | |
.after(); | |
} | |
Mono<String> asyncLaunch(AppDeploymentRequest request) { | |
return deploy(request) | |
.log("stream.deploy") | |
.then(applicationId -> bindServices(request, applicationId)) | |
.log("stream.bindServices") | |
.then(applicationId -> launchTask(applicationId)) | |
.log("stream.launched"); | |
} | |
Mono<String> bindServices(AppDeploymentRequest request, String applicationId) { | |
return operations.services() | |
.listInstances() | |
.log("stream.serviceInstances") | |
.filter(instance -> servicesToBind(request).contains(instance.getName())) | |
.log("stream.filteredInstances") | |
.map(ServiceInstance::getId) | |
.log("stream.serviceInstanceId") | |
.flatMap(serviceInstanceId -> client.serviceBindingsV3() | |
.create(CreateServiceBindingRequest.builder() | |
.relationships(CreateServiceBindingRequest.Relationships.builder() | |
.application(Relationship.builder().id(applicationId).build()) | |
.serviceInstance(Relationship.builder().id(serviceInstanceId).build()) | |
.build()) | |
.type(CreateServiceBindingRequest.ServiceBindingType.APP) | |
.build()) | |
.log("created")) | |
.log("stream.serviceBindingCreated") | |
.map(a -> applicationId) | |
.log("stream.applicationId") | |
.single(); | |
} | |
private Set<String> servicesToBind(AppDeploymentRequest request) { | |
Set<String> services = new HashSet<>(); | |
services.addAll(properties.getServices()); | |
services.addAll(commaDelimitedListToSet(request.getEnvironmentProperties().get(SERVICES_PROPERTY_KEY))); | |
return services; | |
} | |
Mono<TaskStatus> asyncStatus(String id) { | |
return client.applicationsV3() | |
.list(ListApplicationsRequest.builder() | |
.name(id) | |
.page(1) | |
.build()) | |
.log("stream.listApplications") | |
.flatMap(response -> Flux.fromIterable(response.getResources())) | |
.single() | |
.map(Application::getId) | |
.then(taskId -> client.tasks() | |
.get(GetTaskRequest.builder() | |
.taskId(taskId) | |
.build())) | |
.map(this::mapTaskToStatus) | |
.otherwise(throwable -> { | |
logger.error(throwable.getMessage()); | |
return Mono.just(new TaskStatus(id, LaunchState.unknown, null)); | |
}); | |
} | |
/** | |
* Create a new application using supplied {@link AppDeploymentRequest}. | |
* | |
* @param request | |
* @return {@link Mono} containing the newly created {@link Droplet}'s id | |
*/ | |
Mono<String> createAndUploadApplication(AppDeploymentRequest request) { | |
return createApplication(request.getDefinition().getName(), getSpaceId(request)) | |
.then(applicationId -> createPackage(applicationId) | |
.and(Mono.just(applicationId))) | |
.then(function((packageId, applicationId) -> uploadPackage(packageId, request) | |
.and(Mono.just(applicationId)))) | |
.then(function((packageId, applicationId) -> waitForPackageProcessing(client, packageId) | |
.and(Mono.just(applicationId)))) | |
.then(function((packageId, applicationId) -> createDroplet(packageId) | |
.and(Mono.just(applicationId)))) | |
.then(function((dropletId, applicationId) -> waitForDropletProcessing(client, dropletId) | |
.and(Mono.just(applicationId)))) | |
.map(function((dropletId, applicationId) -> applicationId)); | |
} | |
private static Mono<String> waitForDropletProcessing(CloudFoundryClient cloudFoundryClient, String dropletId) { | |
return cloudFoundryClient.droplets() | |
.get(GetDropletRequest.builder() | |
.dropletId(dropletId) | |
.build()) | |
.log("stream.waitingForDroplet") | |
.where(response -> !response.getState().equals("PENDING")) | |
.repeatWhenEmpty(50, exponentialBackOff(Duration.ofSeconds(10), Duration.ofMinutes(1), Duration.ofMinutes(10))) | |
.map(response -> dropletId); | |
} | |
private static Mono<String> waitForPackageProcessing(CloudFoundryClient cloudFoundryClient, String packageId) { | |
return cloudFoundryClient.packages() | |
.get(GetPackageRequest.builder() | |
.packageId(packageId) | |
.build()) | |
.where(response -> response.getState().equals("READY")) | |
.repeatWhenEmpty(50, exponentialBackOff(Duration.ofSeconds(5), Duration.ofMinutes(1), Duration.ofMinutes(10))) | |
.map(response -> packageId); | |
} | |
/** | |
* Create a new Cloud Foundry application by name | |
* | |
* @param name | |
* @param spaceId | |
* @return applicationId | |
*/ | |
Mono<String> createApplication(String name, Mono<String> spaceId) { | |
return spaceId | |
.flatMap(spaceId2 -> client.applicationsV3() | |
.create(CreateApplicationRequest.builder() | |
.name(name) | |
.relationship("space", Relationship.builder() | |
.id(spaceId2) | |
.build()) | |
.build())) | |
.single() | |
.log("stream.createApplication") | |
.map(Application::getId) | |
.log("stream.getApplicationId"); | |
} | |
/** | |
* Create Cloud Foundry package by applicationId | |
* | |
* @param applicationId | |
* @return packageId | |
*/ | |
Mono<String> createPackage(String applicationId) { | |
return client.packages() | |
.create(CreatePackageRequest.builder() | |
.applicationId(applicationId) | |
.type(CreatePackageRequest.PackageType.BITS) | |
.build()) | |
.log("stream.createPackage") | |
.map(Package::getId) | |
.log("stream.getPackageId"); | |
} | |
/** | |
* Create an application with a package, then upload the bits into a staging. | |
* | |
* @param request | |
* @return {@link Mono} with the applicationId | |
*/ | |
Mono<String> deploy(AppDeploymentRequest request) { | |
return getApplicationId(client, request.getDefinition().getName()) | |
.then(applicationId -> getReadyApplicationId(client, applicationId) | |
.otherwiseIfEmpty(deleteExistingApplication(client, applicationId))) | |
.otherwiseIfEmpty(createAndUploadApplication(request)); | |
} | |
Mono<String> getSpaceId(AppDeploymentRequest request) { | |
return Mono | |
.just(request.getEnvironmentProperties().get("organization")) | |
.flatMap(organization -> PaginationUtils | |
.requestResources(page -> client.spaces() | |
.list(ListSpacesRequest.builder() | |
.name(request.getEnvironmentProperties().get("space")) | |
.page(page) | |
.build()))) | |
.log("stream.listSpaces") | |
.single() | |
.log("stream.space") | |
.map(ResourceUtils::getId) | |
.log("stream.spaceId") | |
.cache() | |
.log("stream.cacheSpaceId"); | |
} | |
/** | |
* Create a new {@link Task} based on applicationId. | |
* | |
* @param applicationId | |
* @return {@link Mono} containing name of the task that was launched | |
*/ | |
Mono<String> launchTask(String applicationId) { | |
return getDroplet(applicationId) | |
.then(droplet -> createTask(droplet, applicationId)); | |
} | |
Mono<String> createTask(ListApplicationDropletsResponse.Resource resource, String applicationId) { | |
System.out.println(">> command = " + ((Map<String, Object>) resource.getResults().get("process_types")).get("web")); | |
return client.tasks() | |
.create(CreateTaskRequest.builder() | |
.applicationId(applicationId) | |
.dropletId(resource.getId()) | |
.name("timestamp") | |
// .command((String) ((Map<String, Object>) resource.getResults().get("process_types")).get("web")) | |
// .command("eval exec $PWD/.java-buildpack/open_jdk_jre/bin/java -Xmx2048m -Xms1024m -cp . org.springframework.boot.loader.JarLauncher --spring.profiles.active=cloud") | |
.command("eval exec $PWD/.java-buildpack/open_jdk_jre/bin/java -Xmx2048m -Xms1024m -cp . org.springframework.boot.loader.JarLauncher") | |
.build()) | |
.log("stream.createTask") | |
.map(Task::getId) | |
.log("stream.taskName"); | |
} | |
Mono<ListApplicationDropletsResponse.Resource> getDroplet(String applicationId) { | |
return client.applicationsV3() | |
.listDroplets(ListApplicationDropletsRequest.builder() | |
.applicationId(applicationId) | |
.build()) | |
.log("stream.listDroplet") | |
.flatMap(response -> Flux.fromIterable(response.getResources())) | |
.single(); | |
} | |
/** | |
* Upload bits to a Cloud Foundry application by packageId. | |
* | |
* @param packageId | |
* @param request | |
* @return packageId | |
*/ | |
Mono<String> uploadPackage(String packageId, AppDeploymentRequest request) { | |
try { | |
return client.packages() | |
.upload(UploadPackageRequest.builder() | |
.packageId(packageId) | |
.bits(request.getResource().getInputStream()) | |
.build()) | |
.log("stream.uploadPackage") | |
.map(Package::getId) | |
.log("stream.uploadedPackageId"); | |
} catch (IOException e) { | |
return Mono.error(e); | |
} | |
} | |
private static Mono<String> deleteExistingApplication(CloudFoundryClient client, String applicationId) { | |
return requestDeleteApplication(client, applicationId) | |
.after(Mono::empty); | |
} | |
/** | |
* Look up the applicationId for a given app and confine results to 0 or 1 instance | |
* | |
* @param client | |
* @param name | |
* @return {@link Mono} with the application's id | |
*/ | |
private static Mono<String> getApplicationId(CloudFoundryClient client, String name) { | |
return requestListApplications(client, name) | |
.singleOrEmpty() | |
.map(Application::getId); | |
} | |
private static Mono<String> getReadyApplicationId(CloudFoundryClient client, String applicationId) { | |
return requestApplicationDroplets(client, applicationId) | |
.filter(resource -> "STAGED" .equals(resource.getState())) | |
.next() | |
.map(resource -> applicationId); | |
} | |
private static Flux<ListApplicationDropletsResponse.Resource> requestApplicationDroplets(CloudFoundryClient client, String applicationId) { | |
return client.applicationsV3() | |
.listDroplets(ListApplicationDropletsRequest.builder() | |
.applicationId(applicationId) | |
.page(1) | |
.build()) | |
.flatMap(response -> Flux.fromIterable(response.getResources())); | |
} | |
private static Mono<Void> requestDeleteApplication(CloudFoundryClient client, String applicationId) { | |
return client.applicationsV3() | |
.delete(DeleteApplicationRequest.builder() | |
.applicationId(applicationId) | |
.build()); | |
} | |
/** | |
* List ALL application entries filtered to the provided name | |
* | |
* @param client | |
* @param name | |
* @return {@link Flux} of application resources {@link ListApplicationsResponse.Resource} | |
*/ | |
private static Flux<ListApplicationsResponse.Resource> requestListApplications( | |
CloudFoundryClient client, String name) { | |
return client.applicationsV3() | |
.list(ListApplicationsRequest.builder() | |
.name(name) | |
.page(1) | |
.build()) | |
.log("stream.listApplications") | |
.flatMap(response -> Flux.fromIterable(response.getResources())) | |
.log("stream.applications"); | |
} | |
/** | |
* Create a new {@link Droplet} based upon packageId. | |
* | |
* @param packageId | |
* @return {@link Mono} containing the {@link Droplet}'s ID. | |
*/ | |
private Mono<String> createDroplet(String packageId) { | |
return client.packages() | |
.stage(StagePackageRequest.builder() | |
.packageId(packageId) | |
.build()) | |
.log("stream.stageDroplet") | |
.map(Droplet::getId) | |
.log("stream.dropletId"); | |
} | |
private TaskStatus mapTaskToStatus(GetTaskResponse getTaskResponse) { | |
switch (getTaskResponse.getState()) { | |
case Task.SUCCEEDED_STATE: | |
return new TaskStatus(getTaskResponse.getId(), LaunchState.complete, null); | |
case Task.RUNNING_STATE: | |
return new TaskStatus(getTaskResponse.getId(), LaunchState.running, null); | |
case Task.PENDING_STATE: | |
return new TaskStatus(getTaskResponse.getId(), LaunchState.launching, null); | |
case Task.CANCELING_STATE: | |
return new TaskStatus(getTaskResponse.getId(), LaunchState.cancelled, null); | |
case Task.FAILED_STATE: | |
return new TaskStatus(getTaskResponse.getId(), LaunchState.failed, null); | |
default: | |
throw new IllegalStateException( | |
"Unsupported CF task state " + getTaskResponse.getState()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2016 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.cloud.deployer.spi.cloudfoundry; | |
import org.cloudfoundry.operations.CloudFoundryOperations; | |
import org.cloudfoundry.operations.CloudFoundryOperationsBuilder; | |
import org.cloudfoundry.util.test.TestSubscriber; | |
import org.junit.Before; | |
import org.junit.ClassRule; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.IntegrationTest; | |
import org.springframework.boot.test.SpringApplicationConfiguration; | |
import org.springframework.cloud.deployer.spi.core.AppDefinition; | |
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; | |
import org.springframework.cloud.deployer.spi.task.LaunchState; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
import java.time.Duration; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Map; | |
import static org.hamcrest.CoreMatchers.is; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
/** | |
* Runs integration tests for {@link CloudFoundryTaskLauncher}, using the production configuration, | |
* that may be configured via {@link CloudFoundryDeployerProperties}. | |
* | |
* Tests are only run if a successful connection can be made at startup. | |
* | |
* @author Eric Bottard | |
* @author Greg Turnquist | |
*/ | |
@RunWith(SpringJUnit4ClassRunner.class) | |
@SpringApplicationConfiguration(classes = CloudFoundryDeployerProperties.class) | |
@IntegrationTest | |
public class CloudFoundryTaskLauncherIntegrationTests { | |
private static final Logger log = LoggerFactory.getLogger(CloudFoundryTaskLauncherIntegrationTests.class); | |
private CloudFoundryTaskLauncher taskLauncher; | |
@Autowired | |
ApplicationContext context; | |
@Autowired | |
CloudFoundryDeployerProperties properties; | |
AppDeploymentRequest request; | |
/** | |
* Execution environments may override this default value to have tests wait longer for a deployment, for example if | |
* running in an environment that is known to be slow. | |
*/ | |
protected double timeoutMultiplier = 1.0D; | |
@Before | |
public void init() { | |
String multiplier = System.getenv("CF_DEPLOYER_TIMEOUT_MULTIPLIER"); | |
if (multiplier != null) { | |
timeoutMultiplier = Double.parseDouble(multiplier); | |
} | |
Map<String, String> envProperties = new HashMap<>(); | |
envProperties.put("organization", "pcfdev-org"); | |
envProperties.put("space", "pcfdev-space"); | |
envProperties.put("spring.cloud.deployer.cloudfoundry.defaults.services", "my_mysql"); | |
request = new AppDeploymentRequest( | |
new AppDefinition("timestamp", Collections.emptyMap()), | |
context.getResource("classpath:batch-job-1.0.0.BUILD-SNAPSHOT.jar"), | |
envProperties); | |
CloudFoundryOperations cloudFoundryOperations = new CloudFoundryOperationsBuilder() | |
.cloudFoundryClient(cfAvailable.getResource()) | |
.target("pcfdev-org", "pcfdev-space") | |
.build(); | |
taskLauncher = new CloudFoundryTaskLauncher(cfAvailable.getResource(), cloudFoundryOperations, properties); | |
} | |
@Test | |
public void testNonExistentAppsStatus() { | |
assertThat(taskLauncher.status("foo").getState(), is(LaunchState.unknown)); | |
} | |
@Test | |
public void testSimpleLaunch() throws InterruptedException { | |
TestSubscriber<String> subscriber = new TestSubscriber<>(); | |
String taskId = taskLauncher.asyncLaunch(request).get(30000000); | |
// System.out.println(">> taskId"); | |
// | |
// TaskStatus status = taskLauncher.asyncStatus(taskId).get(); | |
// | |
// while (status.getState().equals(LaunchState.unknown)) { | |
// Thread.sleep(5000); | |
// status = taskLauncher.asyncStatus(taskId).get(); | |
// } | |
// | |
// assertEquals(LaunchState.complete, status.getState()); | |
// exists.subscribe(subscriber | |
// .assertCount(1) | |
// .assertEquals(false)); | |
subscriber.verify(Duration.ofDays(1)); | |
} | |
/** | |
* Return the timeout to use for repeatedly querying a module while it is being deployed. | |
* Default value is one minute, being queried every 5 seconds. | |
*/ | |
protected Attempts deploymentTimeout() { | |
return new Attempts(12, (int) (5000 * timeoutMultiplier)); | |
} | |
/** | |
* Return the timeout to use for repeatedly querying a module while it is being un-deployed. | |
* Default value is one minute, being queried every 5 seconds. | |
*/ | |
protected Attempts undeploymentTimeout() { | |
return new Attempts(20, (int) (5000 * timeoutMultiplier)); | |
} | |
/** | |
* Represents a timeout for querying status, with repetitive queries until a certain number have been made. | |
* @author Eric Bottard | |
*/ | |
protected static class Attempts { | |
public final int noAttempts; | |
public final int pause; | |
public Attempts(int noAttempts, int pause) { | |
this.noAttempts = noAttempts; | |
this.pause = pause; | |
} | |
} | |
@ClassRule | |
public static CloudFoundryTestSupport cfAvailable = new CloudFoundryTestSupport(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment