This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from datetime import datetime, timedelta | |
from typing import Dict, List | |
from contextlib import suppress | |
from pydantic import BaseModel | |
from miniappi import App, content, user_context, app_context, ContextModel | |
class Player(BaseModel): | |
username: str | |
points: int = 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry.conds import daily, weekly | |
@app.cond() | |
def is_foo(): | |
... # Code to determine the state of the condition | |
return True or False | |
@app.task(daily & is_foo) | |
def do_daily_if_foo(): | |
... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def do_first(): | |
print("Running do_first block 1") | |
... | |
# Release execution | |
await asyncio.sleep(0) | |
print("Running do_first block 2") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI | |
# Creating FastAPI application | |
app = FastAPI() | |
# Import scheduler.py so we can modify the scheduler | |
from scheduler import app as app_rocketry | |
session = app_rocketry.session |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry import Rocketry | |
from rocketry.conds import every, after_success | |
# Creating the Rocketry app | |
app = Rocketry(config={"task_execution": "async"}) | |
# Creating some tasks | |
@app.task(every("10 seconds")) | |
async def do_things(): | |
print("Doing a task") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI | |
app = FastAPI() | |
@app.get("/") | |
def read_root(): | |
return {"Hello": "World"} | |
@app.get("/items/{item_id}") | |
def read_item(item_id: int): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import uvicorn | |
from api import app as app_fastapi | |
from scheduler import app as app_rocketry | |
class Server(uvicorn.Server): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry.args import Session | |
@app.task("daily", execution="main") | |
def manipulate_runtime(session=Session()): | |
# Modify tasks | |
for task in session.tasks: | |
task.disable = True | |
# Call the scheduler to restart | |
session.restart() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.task("daily", execution="main") | |
def do_unparallel(): | |
... | |
@app.task("daily", execution="thread") | |
def do_on_separate_thread(): | |
... | |
@app.task("daily", execution="process") | |
def do_on_separate_process(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.task("every 10 seconds") | |
def do_continuously(): | |
... | |
@app.task("daily after 07:00") | |
def do_daily_after_seven(): | |
... | |
@app.task("hourly & time of day between 22:00 and 06:00") | |
def do_hourly_at_night(): |
NewerOlder