Skip to content

Instantly share code, notes, and snippets.

@tsurdilo
Created June 15, 2022 23:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tsurdilo/370c52e1b9de44b0ed7a13ecadcc8eaf to your computer and use it in GitHub Desktop.
Save tsurdilo/370c52e1b9de44b0ed7a13ecadcc8eaf to your computer and use it in GitHub Desktop.
package io.temporal.samples.hello;
import io.temporal.activity.*;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.*;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ChildWorkflowFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
public class HellFromUser {
public static final String CANCEL_CONST = "CANCEL";
// Define the task queue name
static final String TASK_QUEUE = "HelloCancellationScopeTaskQueue";
// Define our workflow unique id
static final String WORKFLOW_ID = "HelloCancellationScopeWorkflow";
private static final int ACTIVITY_MAX_SLEEP_SECONDS = 30;
private static final int ACTIVITY_MAX_CLEANUP_SECONDS = 5;
private static final int ACTIVITY_START_TO_CLOSE_TIMEOUT =
ACTIVITY_MAX_SLEEP_SECONDS + ACTIVITY_MAX_CLEANUP_SECONDS + 10;
private static final String[] greetings =
new String[] {
"Hallo1", "Hallo2", "Hallo3", "Hallo4", "Hallo5", "Hallo6", "Hallo7", "Hallo8", "Hallo9",
"Hallo10", "Hallo11", "Hallo12", "Hallo13", "Hallo14", "Hallo15", "Hallo16", "Hallo17",
"Hallo18", "Hallo19", "Hallo20", "Hallo21", "Hallo22", "Hallo23", "Hallo24"
};
@WorkflowInterface
public interface ParentWorkflow {
@WorkflowMethod
String runParentWorkflow(String name);
@SignalMethod
void cancelWorkflow();
}
@WorkflowInterface
public interface ChildWorkflow {
@WorkflowMethod
String runChildWorkflow(String parentName);
}
public static class ChildWorkflowImpl implements ChildWorkflow {
private Logger logger = Workflow.getLogger(ChildWorkflowImpl.class);
@Override
public String runChildWorkflow(String parentName) {
final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.setHeartbeatTimeout(Duration.ofSeconds(3))
.build());
logger.info("Inside child workflow: Start activities");
List<Promise<String>> results = new ArrayList<>();
// trigger a bunch of parallel activities and wait for their completion
CancellationScope activityScope =
Workflow.newCancellationScope(
() -> {
for (String greeting : greetings) {
results.add(Async.function(activities::greetingActivity, greeting, "dummy"));
}
});
activityScope.run();
try {
Promise.allOf(results).get();
return "RAN CHILD WORKFLOW SUCCESSFULLY";
} catch (ActivityFailure e) {
logger.info("In Child, received ActivityFailure.");
logger.info("In Child, activity cause: " + e.getCause().getClass().getName());
// do some cleanup work if needed
Workflow.sleep(Duration.ofSeconds(10));
throw e;
}
}
}
@ActivityInterface
public interface GreetingActivities {
String greetingActivity(String greeting, String name);
}
// Define the workflow implementation which implements our getGreeting workflow method.
public static class ParentWorkflowImpl implements ParentWorkflow {
public ParentWorkflowImpl() {}
// this string will be null but will be set to CANCEL_CONST str when the cancel signal method is
// called
// once this cancelString is set to CANCEL_CONST, the cancellationscope that is executing will
// be cancelled
// the cancellation scope is executing child workflow(which inturn has scheduled activities) and
// the cancellation
// scope should stop its execution
private boolean cancelled = false;
private Logger logger = Workflow.getLogger(ParentWorkflowImpl.class);
private Promise<Void> childPromise;
@Override
public String runParentWorkflow(String name) {
// System.out.println("Inside Parent workflow: inside runParentGreeting method");
logger.info("Inside Parent workflow: inside runParentGreeting method");
/*
* Create our CancellationScope. Within this scope we call the workflow
* composeGreeting method asynchronously for each of our defined greetings in different
* languages.
*/
ChildWorkflow cwf =
Workflow.newChildWorkflowStub(
ChildWorkflow.class,
ChildWorkflowOptions.newBuilder()
.setCancellationType(ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED)
.build());
CancellationScope scope =
Workflow.newCancellationScope(
() -> {
childPromise =
Async.procedure(cwf::runChildWorkflow, "ChildWorkflowGreetingArgument");
Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(cwf);
childExecution.get();
});
/*
* Execute all activities within the CancellationScope. Note that this execution is
* non-blocking as the code inside our cancellation scope is also non-blocking.
*/
scope.run();
System.out.println("Inside Parent workflow: scope run started");
// wait for the cancel() signal method to be called on the parentworkflow. the signal method
// will set the CANCEL_CONST string
Workflow.await(() -> cancelled);
System.out.println("RECEIVED CANCEL");
scope.cancel();
System.out.println("Inside Parent workflow: scope cancel done");
try {
childPromise.get();
return "WORKFLOW COMPLETE";
} catch (ChildWorkflowFailure e) {
logger.info("In Parent, received ChildWorkflowFailure");
logger.info("In Parent, cause: " + e.getCause().getClass().getName());
// do some cleanup work
logger.info("In Parent - performing some cleanup...");
Workflow.sleep(Duration.ofSeconds(2));
return "Parent done...";
}
}
public void cancelWorkflow() {
System.out.println("CANCEL SIGNAL METHOD CALLED");
cancelled = true;
}
}
public static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String greetingActivity(String greeting, String name) {
// Get the activity execution context
ActivityExecutionContext context = Activity.getExecutionContext();
for (int i = 0; i < 100; i++) {
try {
context.heartbeat(i);
sleep(500);
} catch (ActivityCompletionException e) {
System.out.println("Activity for " + greeting + " cancelled");
// complete any cleanup
System.out.println("start cleanup " + greeting);
System.out.println("End cleanup" + greeting);
return "Activity for " + greeting + " done cleanup..returning";
}
}
// return results of activity invocation
System.out.println("Activity for " + greeting + " completed");
return greeting + " " + name + "!";
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ee) {
// Empty
}
}
}
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
// TODO
// Worker worker =
// factory.newWorker(
// TASK_QUEUE,
// WorkerOptions.newBuilder().setMaxConcurrentActivityExecutionSize(2).build());
worker.registerWorkflowImplementationTypes(ParentWorkflowImpl.class, ChildWorkflowImpl.class);
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
factory.start();
// Create the workflow client stub. It is used to start our workflow execution asynchronously.
ParentWorkflow workflow =
client.newWorkflowStub(
ParentWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE)
.build());
System.out.println("Starting workflow");
WorkflowClient.execute(workflow::runParentWorkflow, "World");
System.out.println("WORKFLOW IN PROGRESS");
System.out.println("sleep start...");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sleep end...");
System.out.println("About to cancel PARENT WORKFLOW");
workflow.cancelWorkflow();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment