Skip to content

Instantly share code, notes, and snippets.

View Miksus's full-sized avatar

Mikael Koli Miksus

View GitHub Profile
# In-memory Repository
from redbird.repos import MemoryRepo
repo = MemoryRepo()
# SQL Repository
from redbird.repos import SQLRepo
from sqlalchemy import create_engine
repo = SQLRepo(engine=create_engine('sqlite:///path/to/database.db'), table="my_items")
# Mongo Repository
# Create a Pydantic model
from pydantic import BaseModel
class MyItem(BaseModel):
id: str
name: str
age: int
# In-memory Repository
from redbird.repos import MemoryRepo
# Create some new items to the repo
repo.add(Item(id="a", name="Jack", age=30))
repo.add(Item(id="b", name="John", age=30))
repo.add(Item(id="c", name="James", age=40))
# Read one item from the repo
repo.filter_by(age=30).first()
# (returns one item)
# Read multiple items from the repo
from rocketry import Rocketry
app = Rocketry()
@app.task('daily')
def do_things():
...
if __name__ == "__main__":
app.run()
@Miksus
Miksus / redengine_pipelining.py
Last active July 12, 2022 21:19
Red Engine pipelining examples
from rocketry.args import Return
@app.task("daily after 07:00")
def do_first():
...
return 'Hello World'
@app.task("after task 'do_first'")
def do_second(arg=Return('do_first')):
# arg contains the value of the task do_first's return
@Miksus
Miksus / redengine_scheduling.py
Created July 3, 2022 12:57
Red Engine (time) scheduling example
@app.task("every 10 seconds")
def do_continuously():
...
@app.task("daily after 07:00")
def do_daily_after_seven():
...
@app.task("hourly & time of day between 22:00 and 06:00")
def do_hourly_at_night():
@Miksus
Miksus / redengine_execution.py
Created July 4, 2022 20:22
Example of the execution options
@app.task("daily", execution="main")
def do_unparallel():
...
@app.task("daily", execution="thread")
def do_on_separate_thread():
...
@app.task("daily", execution="process")
def do_on_separate_process():
@Miksus
Miksus / redengine_metatask.py
Last active July 12, 2022 21:19
A metatask example of Red Engine
from rocketry.args import Session
@app.task("daily", execution="main")
def manipulate_runtime(session=Session()):
# Modify tasks
for task in session.tasks:
task.disable = True
# Call the scheduler to restart
session.restart()
import asyncio
import logging
import uvicorn
from api import app as app_fastapi
from scheduler import app as app_rocketry
class Server(uvicorn.Server):
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
def read_item(item_id: int):