Skip to content

Instantly share code, notes, and snippets.

@mkanoor
Created October 14, 2025 16:31
Show Gist options
  • Select an option

  • Save mkanoor/6cdc90973f6c918909bf2da55291f1b9 to your computer and use it in GitHub Desktop.

Select an option

Save mkanoor/6cdc90973f6c918909bf2da55291f1b9 to your computer and use it in GitHub Desktop.
{
"name": "sample-data-processing-workflow",
"description": "A sample workflow demonstrating data processing with approval steps",
"yaml_definition": {
"schemaVersion": "1.0.0",
"version": 1,
"metadata": {
"name": "sample-data-processing-workflow",
"description": "A sample workflow demonstrating data processing with approval steps",
"tags": [
"data-processing",
"sample",
"approval"
],
"owner": "data-team",
"timeout": "PT2H"
},
"triggers": [
{
"type": "manual"
}
],
"inputs": {
"dataSource": {
"type": "string",
"description": "Source URL for data to process",
"required": true
},
"batchSize": {
"type": "integer",
"description": "Number of records to process in each batch",
"default": 100,
"minimum": 1,
"maximum": 1000
},
"approverEmail": {
"type": "string",
"description": "Email of the person who should approve processing",
"required": true
}
},
"variables": {
"processingThreshold": 500,
"maxRetries": 3
},
"secrets": {
"api_token": {
"secretId": "data-api-token",
"type": "bearer_token"
}
},
"workflow": {
"activities": [
{
"id": "fetch_data",
"name": "Fetch Data from API",
"type": "task",
"task": {
"executor": "api",
"config": {
"method": "GET",
"url": "https://api.example.com/data",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${secrets.api_token}"
},
"queryParams": {
"source": "${input.dataSource}",
"limit": "${input.batchSize}"
}
},
"outputs": {
"records": "$.data.records",
"totalCount": "$.data.total_count"
}
},
"timeout": "PT10M",
"retryPolicy": {
"maxAttempts": 3,
"backoff": "exponential",
"initialInterval": "PT5S",
"retryableErrors": ["NETWORK_ERROR", "TIMEOUT", "500"]
}
},
{
"id": "approval_check",
"name": "Approval for Large Dataset Processing",
"type": "task",
"condition": "${fetch_data.output.totalCount > variables.processingThreshold}",
"requiresApproval": true,
"approval": {
"approvers": ["${input.approverEmail}"],
"prompt": "Dataset contains ${fetch_data.output.totalCount} records. Approve processing?",
"timeout": "PT1H",
"onTimeout": "reject",
"metadata": {
"recordCount": "${fetch_data.output.totalCount}",
"dataSource": "${input.dataSource}",
"estimatedProcessingTime": "PT30M"
}
},
"task": {
"executor": "script",
"config": {
"language": "python",
"code": "print('Approval granted for processing')"
}
}
},
{
"id": "process_data",
"name": "Process Data with AI Agent",
"type": "task",
"task": {
"executor": "agentic",
"config": {
"agent": "mcp://data-processing-agent",
"tools": ["data_validator", "data_transformer", "quality_checker"],
"model": "claude-3-sonnet",
"prompt": "Process the provided data records: validate format, transform to standard schema, and check quality. Return
processed results with quality metrics."
},
"inputs": {
"rawData": "${fetch_data.output.records}",
"batchSize": "${input.batchSize}"
},
"outputs": {
"processedRecords": "$.processed_data",
"qualityScore": "$.quality_metrics.score",
"errors": "$.errors"
}
},
"timeout": "PT30M",
"retryPolicy": {
"maxAttempts": 2,
"backoff": "fixed",
"initialInterval": "PT1M"
}
},
{
"id": "parallel_outputs",
"name": "Generate Outputs in Parallel",
"type": "parallel",
"branches": [
{
"id": "save_to_database",
"name": "Save to Database",
"type": "task",
"task": {
"executor": "connector",
"config": {
"connectorId": "postgres-db",
"operation": "bulk_insert",
"parameters": {
"table": "processed_data",
"records": "${process_data.output.processedRecords}"
}
},
"outputs": {
"insertedCount": "$.rows_affected"
}
}
},
{
"id": "generate_report",
"name": "Generate Summary Report",
"type": "task",
"task": {
"executor": "script",
"config": {
"language": "python",
"code": "import json\nfrom datetime import datetime\n\ndata = json.loads(input_data)\nreport = {\n 'timestamp':
datetime.now().isoformat(),\n 'records_processed': len(data['records']),\n 'quality_score': data['quality_score'],\n 'source':
data['source']\n}\nprint(json.dumps(report))"
},
"inputs": {
"input_data": {
"records": "${process_data.output.processedRecords}",
"quality_score": "${process_data.output.qualityScore}",
"source": "${input.dataSource}"
}
},
"outputs": {
"report": "$"
}
}
}
]
},
{
"id": "join_results",
"name": "Wait for All Outputs",
"type": "join",
"join": {
"branches": ["save_to_database", "generate_report"],
"strategy": "all",
"timeout": "PT10M",
"aggregateOutputs": true
}
}
]
}
},
"is_enabled": true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment