This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# In-memory Repository | |
from redbird.repos import MemoryRepo | |
repo = MemoryRepo() | |
# SQL Repository | |
from redbird.repos import SQLRepo | |
from sqlalchemy import create_engine | |
repo = SQLRepo(engine=create_engine('sqlite:///path/to/database.db'), table="my_items") | |
# Mongo Repository |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create a Pydantic model | |
from pydantic import BaseModel | |
class MyItem(BaseModel): | |
id: str | |
name: str | |
age: int | |
# In-memory Repository | |
from redbird.repos import MemoryRepo |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create some new items to the repo | |
repo.add(Item(id="a", name="Jack", age=30)) | |
repo.add(Item(id="b", name="John", age=30)) | |
repo.add(Item(id="c", name="James", age=40)) | |
# Read one item from the repo | |
repo.filter_by(age=30).first() | |
# (returns one item) | |
# Read multiple items from the repo |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry import Rocketry | |
app = Rocketry() | |
@app.task('daily') | |
def do_things(): | |
... | |
if __name__ == "__main__": | |
app.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry.args import Return | |
@app.task("daily after 07:00") | |
def do_first(): | |
... | |
return 'Hello World' | |
@app.task("after task 'do_first'") | |
def do_second(arg=Return('do_first')): | |
# arg contains the value of the task do_first's return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.task("every 10 seconds") | |
def do_continuously(): | |
... | |
@app.task("daily after 07:00") | |
def do_daily_after_seven(): | |
... | |
@app.task("hourly & time of day between 22:00 and 06:00") | |
def do_hourly_at_night(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@app.task("daily", execution="main") | |
def do_unparallel(): | |
... | |
@app.task("daily", execution="thread") | |
def do_on_separate_thread(): | |
... | |
@app.task("daily", execution="process") | |
def do_on_separate_process(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rocketry.args import Session | |
@app.task("daily", execution="main") | |
def manipulate_runtime(session=Session()): | |
# Modify tasks | |
for task in session.tasks: | |
task.disable = True | |
# Call the scheduler to restart | |
session.restart() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import uvicorn | |
from api import app as app_fastapi | |
from scheduler import app as app_rocketry | |
class Server(uvicorn.Server): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI | |
app = FastAPI() | |
@app.get("/") | |
def read_root(): | |
return {"Hello": "World"} | |
@app.get("/items/{item_id}") | |
def read_item(item_id: int): |
OlderNewer