Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Notion task manager toy example
import datetime
import random
import time
from multiprocessing import Lock
from notion.client import *
from notion.block import *
mutex = Lock()
token_v2 = "<redacted>"
client = NotionClient(token_v2=token_v2, start_monitoring=False)
page = client.get_block("")
log = client.get_block("")
collection = client.get_block("").collection
log_lines = []
happy_results = {
"Delete all data": "The data was all deleted.",
"Fold paper airplane": "Your airplane is the BEST!",
"Feed the penguins": "Penguins are suffoncified.",
sad_results = {
"Delete all data": ["Insufficient permissions.", "Computer says no.", "I'm sorry, Dave..."],
"Fold paper airplane": ["Spontaneous combustion.", "OWWWW PAPERCUT!!!", "Oops, that was your diploma."],
"Feed the penguins": ["Penguins have a tummyache. :(", "We ran out of food.", "Polar bear invasion!"],
def write_to_log(line):
global log_lines
line = "{}\t{}".format("%H:%M:%S"), line)
log_lines = log_lines[-10:]
log.title = "\n".join(log_lines)
def sleep_with_updates(record, start_time, sleep=0.001, interval=0.3):
while sleep > 0:
record.elapsed = int(time.time() - start_time)
time.sleep(sleep if sleep < interval else interval)
sleep -= interval
def run_job(record):
start_time = time.time()
record.result = ""
record.elapsed = 0
record.status = "Starting"
write_to_log("Starting task '{}'...".format(record.task_name))
sleep_with_updates(record, start_time, random.randint(1, 3))
record.status = "Active"
write_to_log("Task '{}' is now running!".format(record.task_name))
sleep_with_updates(record, start_time, random.randint(3, 6))
diceroll = random.randint(0, 6)
sadness = sad_results[record.task_name]
if diceroll >= len(sadness):
record.result = happy_results[record.task_name]
record.status = "Completed"
write_to_log("Completed '{}'. {}".format(record.task_name, record.result))
record.result = sadness[diceroll]
record.status = "Error"
write_to_log("Error running '{}': {}".format(record.task_name, record.result))
def row_callback(record, changes):
with mutex:
start = record.start
record.start = False
if start:
if record.status in [None, "Completed", "Error"]:
def register_row_callbacks(collection):
for row in collection.get_rows():
row.add_callback(row_callback, callback_id="row_callback")
def collection_callback(record, difference, changes):
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment