Skip to content

Instantly share code, notes, and snippets.

@Matjaz-B
Last active June 4, 2023 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Matjaz-B/d1c632371851df8399e6341272dcbfaa to your computer and use it in GitHub Desktop.
Save Matjaz-B/d1c632371851df8399e6341272dcbfaa to your computer and use it in GitHub Desktop.
click clean-up after abort
import asyncio
import click
async def do_some_work(duration):
print("Function performs some initialization, runs some command, waits and then stops")
## some init functions
## start operation
try:
print("Working...")
await asyncio.sleep(duration)
print("Work done")
# except click.Abort:
except Exception as e:
print(f"do_some_work exception {e}")
## clean up - stop operation
print("Clean up done")
@click.command()
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]')
def worker_cli(duration):
print(f'Will work for {duration} [s]')
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(do_some_work(duration))
except Exception as e:
print(f"worker_cli exception {e}")
if __name__ == "__main__":
worker_cli()
import time
import click
class Runner:
def __init__(self, delay):
print("init called")
self.delay = delay
def __del__(self):
print("deleted")
def __enter__(self):
print("Enter contex")
self.init = 1
return self
def __exit__(self, exc_type, exc_value, exc_tb):
print("\r\n*** Cleaning up ***\r\n")
def action(self, a):
print(f'much action: {a*2}')
print(f'delaying for {self.delay}')
time.sleep(self.delay)
@click.command()
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]')
@click.pass_context
def worker_contex(ctx, duration):
print(f'Will work for {duration} [s]')
obj = ctx.with_resource(Runner(duration))
obj.action(1)
@click.command()
@click.option('--duration', type=click.IntRange(1, 1000), default=10, help='Duration in [s]')
@click.pass_context
def worker_cli(ctx, duration):
@ctx.call_on_close
def completed():
print("*** Cleaning up ***")
print(f'Will work for {duration} [s]')
print(f"Delaying for {duration}")
time.sleep(duration)
if __name__ == "__main__":
# example with adding a callback...
worker_cli()
# example with contex
# worker_contex()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment