Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple Camunda example: Handling external tasks with PHP
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_0w41w6f" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="1.16.2">
<bpmn:process id="asset-ingestion" name="Asset Ingestion" isExecutable="true">
<bpmn:startEvent id="StartEvent_1" name="Asset to be ingested">
<bpmn:outgoing>SequenceFlow_16kf6eb</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="SequenceFlow_16kf6eb" sourceRef="StartEvent_1" targetRef="extract-metadata" />
<bpmn:serviceTask id="extract-metadata" name="Extract metadata" camunda:type="external" camunda:topic="asset-extract-metadata">
<bpmn:incoming>SequenceFlow_16kf6eb</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_1ns7sj4</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:exclusiveGateway id="ExclusiveGateway_1j6qgpz" name="Is it an image file?">
<bpmn:incoming>SequenceFlow_1ns7sj4</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_02lackp</bpmn:outgoing>
<bpmn:outgoing>SequenceFlow_0bbvz28</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="SequenceFlow_1ns7sj4" sourceRef="extract-metadata" targetRef="ExclusiveGateway_1j6qgpz" />
<bpmn:sequenceFlow id="SequenceFlow_02lackp" name="yes" sourceRef="ExclusiveGateway_1j6qgpz" targetRef="create-thumbnail">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${mediatype == 'image'}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:serviceTask id="create-thumbnail" name="Create thumbnail image" camunda:type="external" camunda:topic="asset-create-thumbnail">
<bpmn:incoming>SequenceFlow_02lackp</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_1ralpb9</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:exclusiveGateway id="ExclusiveGateway_0drgm95">
<bpmn:incoming>SequenceFlow_0bbvz28</bpmn:incoming>
<bpmn:incoming>SequenceFlow_1ralpb9</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_0onc369</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="SequenceFlow_0bbvz28" name="no" sourceRef="ExclusiveGateway_1j6qgpz" targetRef="ExclusiveGateway_0drgm95">
<bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">${mediatype != 'image'}</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="SequenceFlow_1ralpb9" sourceRef="create-thumbnail" targetRef="ExclusiveGateway_0drgm95" />
<bpmn:sequenceFlow id="SequenceFlow_0onc369" sourceRef="ExclusiveGateway_0drgm95" targetRef="ingest-asset" />
<bpmn:endEvent id="EndEvent_1se5hc0" name="Asset ingested">
<bpmn:incoming>SequenceFlow_0bmlt71</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="SequenceFlow_0bmlt71" sourceRef="ingest-asset" targetRef="EndEvent_1se5hc0" />
<bpmn:serviceTask id="ingest-asset" name="Ingest asset" camunda:type="external" camunda:topic="asset-ingest">
<bpmn:incoming>SequenceFlow_0onc369</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_0bmlt71</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="asset-ingestion">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="173" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="163" y="145" width="56" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_16kf6eb_di" bpmnElement="SequenceFlow_16kf6eb">
<di:waypoint x="209" y="120" />
<di:waypoint x="259" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="ServiceTask_04q8nbp_di" bpmnElement="extract-metadata">
<dc:Bounds x="259" y="80" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="ExclusiveGateway_1j6qgpz_di" bpmnElement="ExclusiveGateway_1j6qgpz" isMarkerVisible="true">
<dc:Bounds x="409" y="95" width="50" height="50" />
<bpmndi:BPMNLabel>
<dc:Bounds x="390" y="152" width="89" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_1ns7sj4_di" bpmnElement="SequenceFlow_1ns7sj4">
<di:waypoint x="359" y="120" />
<di:waypoint x="409" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_02lackp_di" bpmnElement="SequenceFlow_02lackp">
<di:waypoint x="434" y="145" />
<di:waypoint x="434" y="250" />
<di:waypoint x="510" y="250" />
<bpmndi:BPMNLabel>
<dc:Bounds x="441" y="195" width="18" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="ServiceTask_0u1bkke_di" bpmnElement="create-thumbnail">
<dc:Bounds x="510" y="210" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="ExclusiveGateway_0drgm95_di" bpmnElement="ExclusiveGateway_0drgm95" isMarkerVisible="true">
<dc:Bounds x="671" y="95" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_0bbvz28_di" bpmnElement="SequenceFlow_0bbvz28">
<di:waypoint x="459" y="120" />
<di:waypoint x="671" y="120" />
<bpmndi:BPMNLabel>
<dc:Bounds x="559" y="102" width="13" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_1ralpb9_di" bpmnElement="SequenceFlow_1ralpb9">
<di:waypoint x="610" y="250" />
<di:waypoint x="696" y="250" />
<di:waypoint x="696" y="145" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="SequenceFlow_0onc369_di" bpmnElement="SequenceFlow_0onc369">
<di:waypoint x="721" y="120" />
<di:waypoint x="791" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="EndEvent_1se5hc0_di" bpmnElement="EndEvent_1se5hc0">
<dc:Bounds x="961" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="943" y="145" width="73" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_0bmlt71_di" bpmnElement="SequenceFlow_0bmlt71">
<di:waypoint x="891" y="120" />
<di:waypoint x="961" y="120" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="ServiceTask_0qpthz4_di" bpmnElement="ingest-asset">
<dc:Bounds x="791" y="80" width="100" height="80" />
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
{
"require": {
"php": ">=7.1.0",
"ext-pcntl": "*",
"endpot/camunda-rest-client": "^1.1"
}
}
<?php
namespace Tistre\CamundaDem;
require __DIR__ . '/vendor/autoload.php';
use Camunda\Entity\Request\ExternalTaskRequest;
use Camunda\Service\ExternalTaskService;
class Worker
{
/** @var string */
protected $camundaUrl;
/** @var string */
protected $externalTaskTopic;
/** @var int */
protected $lockDuration = 10000;
/** @var string */
protected $workerId;
/** @var ExternalTaskService */
protected $externalTaskService;
/**
* Initialize and run in endless loop
*/
public function run(): void
{
$this->initSignalHandler();
$this->getOptions();
$this->workerId = 'worker' . getmypid();
$this->externalTaskService = new ExternalTaskService($this->camundaUrl);
while (true) {
// Quit on Ctrl+C
pcntl_signal_dispatch();
foreach ($this->fetchExternalTasks() as $externalTask) {
printf(
"Fetched and locked <%s> task <%s> of <%s> process instance <%s>.\n",
$externalTask->topicName,
$externalTask->id,
$externalTask->processDefinitionKey,
$externalTask->processInstanceId
);
call_user_func([$this, $this->topicNameToMethodName($externalTask->topicName)], $externalTask);
}
usleep(1000000);
}
}
/**
* Fetch and lock external tasks from Camunda
*
* @return array
*/
protected function fetchExternalTasks(): array
{
// Fetch one external task of the given topic
$externalTaskQueryRequest = (new ExternalTaskRequest())
->set('topics', [['topicName' => $this->externalTaskTopic, 'lockDuration' => $this->lockDuration]])
->set('workerId', $this->workerId)
->set('maxTasks', 1);
$result = $this->externalTaskService->fetchAndLock($externalTaskQueryRequest);
if (!is_array($result)) {
$result = [];
}
return $result;
}
/**
* Set an external task in Camunda to completed
*
* @param object $externalTask
* @param array $updateVariables
*/
protected function completeTask($externalTask, array $updateVariables): void
{
$externalTaskRequest = (new ExternalTaskRequest())
->set('variables', $updateVariables)
->set('workerId', $this->workerId);
$this->externalTaskService->complete($externalTask->id, $externalTaskRequest);
printf("Completed task <%s>\n", $externalTask->id);
}
/**
* Set an external task in Camunda to failed
*
* @param object $externalTask
* @param string $errorMessage
* @param int $retries
* @param int $retryTimeout
*/
protected function failTask($externalTask, string $errorMessage, int $retries, int $retryTimeout): void
{
$externalTaskRequest = (new ExternalTaskRequest())
->set('errorMessage', $errorMessage)
->set('retries', $retries)
->set('retryTimeout', $retryTimeout)
->set('workerId', $this->workerId);
$this->externalTaskService->handleFailure($externalTask->id, $externalTaskRequest);
printf("Failed task <%s>: '%s'\n", $externalTask->id, $errorMessage);
}
/**
* Get the name of the method to use for handling an external task topic
*
* @param string $topicName
* @return string
*/
protected function topicNameToMethodName(string $topicName): string
{
return 'handleTask_' . strtr($topicName, ['-' => '_']);
}
/**
* Initialize Unix signal handler
*
* ... to make sure we quit on Ctrl+C whenever we call pcntl_signal_dispatch().
*/
protected function initSignalHandler(): void
{
pcntl_signal(SIGINT, function () {
fwrite(STDERR, "Caught SIGINT - exitting\n");
exit(128 + SIGINT);
});
pcntl_signal(SIGTERM, function () {
fwrite(STDERR, "Caught SIGTERM - exitting\n");
exit(128 + SIGTERM);
});
}
/**
* Get command line options
*/
protected function getOptions(): void
{
// Get command line options
$usageHelp = 'Usage: php worker.php --camunda-url="http://localhost:8080/engine-rest" --task-topic="asset-ingest"';
$options = getopt('', ['camunda-url:', 'task-topic:', 'lock-duration:']);
foreach (['camunda-url', 'task-topic'] as $optionName) {
if (empty($options[$optionName])) {
fwrite(STDERR, "Error: Missing option --$optionName.\n");
fwrite(STDERR, $usageHelp . "\n");
exit(1);
}
}
$this->camundaUrl = $options['camunda-url'];
$this->externalTaskTopic = $options['task-topic'];
$methodName = $this->topicNameToMethodName($this->externalTaskTopic);
if (!method_exists($this, $methodName)) {
fwrite(STDERR, "Error: Wrong value for --task-topic. Method $methodName does not exist.\n");
exit(1);
}
if (isset($options['lock-duration']) && (intval($options['lock-duration']) > 0)) {
$this->lockDuration = intval($options['lock-duration']);
}
}
/** Task implementation below - in real life, you'd put them into separate files */
/**
* Extract asset file metadata using ExifTool
*
* @param object $externalTask
*/
protected function handleTask_asset_extract_metadata($externalTask): void
{
$assetFilePath = $externalTask->variables->path->value;
if (!file_exists($assetFilePath)) {
$this->failTask(
$externalTask,
"File to ingest not found: <$assetFilePath>",
0,
0
);
return;
}
// Prepare unique temp file for ExifTool output
$metadataFilePath = tempnam('/tmp', 'asset-metadata-');
rename($metadataFilePath, $metadataFilePath . '.xml');
$metadataFilePath .= '.xml';
// Write ExifTool XML to file
$cmd = sprintf(
'/usr/bin/exiftool -X %s > %s',
escapeshellarg($assetFilePath),
escapeshellarg($metadataFilePath)
);
exec($cmd);
// Determine media type
exec(sprintf('/usr/bin/file -b --mime-type %s', escapeshellarg($assetFilePath)), $lines);
list($mediatype, $mediasubtype) = explode('/', trim($lines[0]));
// Set Camunda process variables
$updateVariables = [
'metadataFilePath' => ['value' => $metadataFilePath],
'mediatype' => ['value' => $mediatype],
'mediasubtype' => ['value' => $mediasubtype],
];
$this->completeTask($externalTask, $updateVariables);
}
/**
* Generate a thumbnail image using ImageMagick convert
*
* @param object $externalTask
*/
protected function handleTask_asset_create_thumbnail($externalTask): void
{
$assetFilePath = $externalTask->variables->path->value;
if (!file_exists($assetFilePath)) {
$this->failTask(
$externalTask,
"File to ingest not found: <$assetFilePath>",
0,
0
);
return;
}
// Prepare unique temp file for thumbnail image
$thumbnailFilePath = tempnam('/tmp', 'asset-thumbnail-');
unlink($thumbnailFilePath);
$thumbnailFilePath .= '.jpg';
// Call ImageMagick
$cmd = sprintf(
'/usr/bin/convert -geometry 400x400 -colorspace RGB %s %s',
escapeshellarg($assetFilePath),
escapeshellarg($thumbnailFilePath)
);
exec($cmd);
// Set Camunda process variables
$updateVariables = [
'thumbnailFilePath' => ['value' => $thumbnailFilePath]
];
$this->completeTask($externalTask, $updateVariables);
}
/**
* Simulate asset ingestion
*
* @param object $externalTask
*/
protected function handleTask_asset_ingest($externalTask): void
{
foreach (['path', 'metadataFilePath', 'thumbnailFilePath'] as $key) {
if (empty($externalTask->variables->$key->value)) {
continue;
}
$checkPath = $externalTask->variables->$key->value;
if (!file_exists($checkPath)) {
$this->failTask(
$externalTask,
"File to ingest not found: <$key> = <$checkPath>",
0,
0
);
return;
}
// We do not really ingest. Clean up temp files instead.
if (($key === 'metadataFilePath') || ($key === 'thumbnailFilePath')) {
unlink($externalTask->variables->$key->value);
}
}
$updateVariables = [
'ingestMessage' => ['value' => 'Not really ingested. This is just as simulation.'],
];
$this->completeTask($externalTask, $updateVariables);
}
}
$worker = new Worker();
$worker->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.