Skip to content

Instantly share code, notes, and snippets.

@seb-835
Created November 5, 2021 07:40
Show Gist options
  • Save seb-835/c495fafe0307927e4ed7cd587fa355a2 to your computer and use it in GitHub Desktop.
Save seb-835/c495fafe0307927e4ed7cd587fa355a2 to your computer and use it in GitHub Desktop.
zeebe-worker crash reproducer
from typing import Dict
from pyzeebe import ZeebeWorker, Job, create_insecure_channel, create_secure_channel
from urllib.parse import urlparse
import traceback
import sys
import asyncio
import logging
import argparse
import time
import datetime
# Decorators to add functionality before doing tasks.
def enhanced_variables(job: Job) -> Job:
logging.info(job)
job.variables["function"]=job.custom_headers.get("function", "")
job.variables["parameters"]=job.custom_headers.get("parameters", "")
job.variables["cmdline"]=job.custom_headers.get("cmdline", "")
return job
# Define a custom exception_handler for a task
def exception_handler(exception: Exception, job: Job) -> None:
job.set_failure_status(f"Failed to run task {job.type}. Reason: {exception}")
# My faas_invoke mock function
def faas_invoke(func, data):
# Simulate long time run
logging.info("simulate long run %s" % datetime.datetime.now().time() )
time.sleep(120)
logging.info("long run done %s" % datetime.datetime.now().time() )
return {"OK": 1}
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logging.info("WORKER RUN")
parser = argparse.ArgumentParser()
parser.add_argument('--zeebeGW', dest="zeebe_gateway", default="http://localhost:26500")
try:
args, unknown = parser.parse_known_args()
gateway = urlparse(args.zeebe_gateway)
if gateway.scheme.lower() == "https":
channel = create_insecure_channel(hostname=gateway.hostname, port=gateway.port)
else:
channel = create_insecure_channel(hostname=gateway.hostname, port=gateway.port)
worker = ZeebeWorker(channel)
worker.before(enhanced_variables)
@worker.task(task_type="faas", exception_handler=exception_handler, timeout_ms=3600000)
def doFaaS(function, cmdline, parameters) -> Dict:
logging.info("DoFaas")
data = { "cmdline": cmdline , "parameters": parameters}
return faas_invoke(function, data)
#FAILURE 1
#loop = asyncio.get_running_loop()
#loop.run_until_complete(worker.work())
#FAILURE 2
#loop = asyncio.new_event_loop()
#loop.run_until_complete(worker.work())
#THIS WORK WITH SUCCESS
#loop = asyncio.get_event_loop()
#loop.run_until_complete(worker.work())
#FAILURE 3
asyncio.run(worker.work())
except Exception as e:
print(e)
print(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment