Created
June 15, 2022 23:16
-
-
Save tsurdilo/370c52e1b9de44b0ed7a13ecadcc8eaf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.temporal.samples.hello; | |
import io.temporal.activity.*; | |
import io.temporal.api.common.v1.WorkflowExecution; | |
import io.temporal.client.*; | |
import io.temporal.failure.ActivityFailure; | |
import io.temporal.failure.ChildWorkflowFailure; | |
import io.temporal.serviceclient.WorkflowServiceStubs; | |
import io.temporal.worker.Worker; | |
import io.temporal.worker.WorkerFactory; | |
import io.temporal.workflow.*; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.slf4j.Logger; | |
public class HellFromUser { | |
public static final String CANCEL_CONST = "CANCEL"; | |
// Define the task queue name | |
static final String TASK_QUEUE = "HelloCancellationScopeTaskQueue"; | |
// Define our workflow unique id | |
static final String WORKFLOW_ID = "HelloCancellationScopeWorkflow"; | |
private static final int ACTIVITY_MAX_SLEEP_SECONDS = 30; | |
private static final int ACTIVITY_MAX_CLEANUP_SECONDS = 5; | |
private static final int ACTIVITY_START_TO_CLOSE_TIMEOUT = | |
ACTIVITY_MAX_SLEEP_SECONDS + ACTIVITY_MAX_CLEANUP_SECONDS + 10; | |
private static final String[] greetings = | |
new String[] { | |
"Hallo1", "Hallo2", "Hallo3", "Hallo4", "Hallo5", "Hallo6", "Hallo7", "Hallo8", "Hallo9", | |
"Hallo10", "Hallo11", "Hallo12", "Hallo13", "Hallo14", "Hallo15", "Hallo16", "Hallo17", | |
"Hallo18", "Hallo19", "Hallo20", "Hallo21", "Hallo22", "Hallo23", "Hallo24" | |
}; | |
@WorkflowInterface | |
public interface ParentWorkflow { | |
@WorkflowMethod | |
String runParentWorkflow(String name); | |
@SignalMethod | |
void cancelWorkflow(); | |
} | |
@WorkflowInterface | |
public interface ChildWorkflow { | |
@WorkflowMethod | |
String runChildWorkflow(String parentName); | |
} | |
public static class ChildWorkflowImpl implements ChildWorkflow { | |
private Logger logger = Workflow.getLogger(ChildWorkflowImpl.class); | |
@Override | |
public String runChildWorkflow(String parentName) { | |
final GreetingActivities activities = | |
Workflow.newActivityStub( | |
GreetingActivities.class, | |
ActivityOptions.newBuilder() | |
.setStartToCloseTimeout(Duration.ofSeconds(30)) | |
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) | |
.setHeartbeatTimeout(Duration.ofSeconds(3)) | |
.build()); | |
logger.info("Inside child workflow: Start activities"); | |
List<Promise<String>> results = new ArrayList<>(); | |
// trigger a bunch of parallel activities and wait for their completion | |
CancellationScope activityScope = | |
Workflow.newCancellationScope( | |
() -> { | |
for (String greeting : greetings) { | |
results.add(Async.function(activities::greetingActivity, greeting, "dummy")); | |
} | |
}); | |
activityScope.run(); | |
try { | |
Promise.allOf(results).get(); | |
return "RAN CHILD WORKFLOW SUCCESSFULLY"; | |
} catch (ActivityFailure e) { | |
logger.info("In Child, received ActivityFailure."); | |
logger.info("In Child, activity cause: " + e.getCause().getClass().getName()); | |
// do some cleanup work if needed | |
Workflow.sleep(Duration.ofSeconds(10)); | |
throw e; | |
} | |
} | |
} | |
@ActivityInterface | |
public interface GreetingActivities { | |
String greetingActivity(String greeting, String name); | |
} | |
// Define the workflow implementation which implements our getGreeting workflow method. | |
public static class ParentWorkflowImpl implements ParentWorkflow { | |
public ParentWorkflowImpl() {} | |
// this string will be null but will be set to CANCEL_CONST str when the cancel signal method is | |
// called | |
// once this cancelString is set to CANCEL_CONST, the cancellationscope that is executing will | |
// be cancelled | |
// the cancellation scope is executing child workflow(which inturn has scheduled activities) and | |
// the cancellation | |
// scope should stop its execution | |
private boolean cancelled = false; | |
private Logger logger = Workflow.getLogger(ParentWorkflowImpl.class); | |
private Promise<Void> childPromise; | |
@Override | |
public String runParentWorkflow(String name) { | |
// System.out.println("Inside Parent workflow: inside runParentGreeting method"); | |
logger.info("Inside Parent workflow: inside runParentGreeting method"); | |
/* | |
* Create our CancellationScope. Within this scope we call the workflow | |
* composeGreeting method asynchronously for each of our defined greetings in different | |
* languages. | |
*/ | |
ChildWorkflow cwf = | |
Workflow.newChildWorkflowStub( | |
ChildWorkflow.class, | |
ChildWorkflowOptions.newBuilder() | |
.setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED) | |
.build()); | |
CancellationScope scope = | |
Workflow.newCancellationScope( | |
() -> { | |
childPromise = | |
Async.procedure(cwf::runChildWorkflow, "ChildWorkflowGreetingArgument"); | |
Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(cwf); | |
childExecution.get(); | |
}); | |
/* | |
* Execute all activities within the CancellationScope. Note that this execution is | |
* non-blocking as the code inside our cancellation scope is also non-blocking. | |
*/ | |
scope.run(); | |
System.out.println("Inside Parent workflow: scope run started"); | |
// wait for the cancel() signal method to be called on the parentworkflow. the signal method | |
// will set the CANCEL_CONST string | |
Workflow.await(() -> cancelled); | |
System.out.println("RECEIVED CANCEL"); | |
scope.cancel(); | |
System.out.println("Inside Parent workflow: scope cancel done"); | |
try { | |
childPromise.get(); | |
return "WORKFLOW COMPLETE"; | |
} catch (ChildWorkflowFailure e) { | |
logger.info("In Parent, received ChildWorkflowFailure"); | |
logger.info("In Parent, cause: " + e.getCause().getClass().getName()); | |
// do some cleanup work | |
logger.info("In Parent - performing some cleanup..."); | |
Workflow.sleep(Duration.ofSeconds(2)); | |
return "Parent done..."; | |
} | |
} | |
public void cancelWorkflow() { | |
System.out.println("CANCEL SIGNAL METHOD CALLED"); | |
cancelled = true; | |
} | |
} | |
public static class GreetingActivitiesImpl implements GreetingActivities { | |
@Override | |
public String greetingActivity(String greeting, String name) { | |
// Get the activity execution context | |
ActivityExecutionContext context = Activity.getExecutionContext(); | |
for (int i = 0; i < 100; i++) { | |
try { | |
context.heartbeat(i); | |
sleep(500); | |
} catch (ActivityCompletionException e) { | |
System.out.println("Activity for " + greeting + " cancelled"); | |
// complete any cleanup | |
System.out.println("start cleanup " + greeting); | |
System.out.println("End cleanup" + greeting); | |
return "Activity for " + greeting + " done cleanup..returning"; | |
} | |
} | |
// return results of activity invocation | |
System.out.println("Activity for " + greeting + " completed"); | |
return greeting + " " + name + "!"; | |
} | |
public static void sleep(int millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException ee) { | |
// Empty | |
} | |
} | |
} | |
public static void main(String[] args) { | |
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | |
WorkflowClient client = WorkflowClient.newInstance(service); | |
WorkerFactory factory = WorkerFactory.newInstance(client); | |
Worker worker = factory.newWorker(TASK_QUEUE); | |
// TODO | |
// Worker worker = | |
// factory.newWorker( | |
// TASK_QUEUE, | |
// WorkerOptions.newBuilder().setMaxConcurrentActivityExecutionSize(2).build()); | |
worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class, ChildWorkflowImpl.class); | |
worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); | |
factory.start(); | |
// Create the workflow client stub. It is used to start our workflow execution asynchronously. | |
ParentWorkflow workflow = | |
client.newWorkflowStub( | |
ParentWorkflow.class, | |
WorkflowOptions.newBuilder() | |
.setWorkflowId(WORKFLOW_ID) | |
.setTaskQueue(TASK_QUEUE) | |
.build()); | |
System.out.println("Starting workflow"); | |
WorkflowClient.execute(workflow::runParentWorkflow, "World"); | |
System.out.println("WORKFLOW IN PROGRESS"); | |
System.out.println("sleep start..."); | |
try { | |
Thread.sleep(10 * 1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println("sleep end..."); | |
System.out.println("About to cancel PARENT WORKFLOW"); | |
workflow.cancelWorkflow(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment