Skip to content

Instantly share code, notes, and snippets.

View Miksus's full-sized avatar

Mikael Koli Miksus

View GitHub Profile
@Miksus
Miksus / redengine_pipelining.py
Last active July 12, 2022 21:19
Red Engine pipelining examples
from rocketry.args import Return
@app.task("daily after 07:00")
def do_first():
...
return 'Hello World'
@app.task("after task 'do_first'")
def do_second(arg=Return('do_first')):
# arg contains the value of the task do_first's return
from rocketry import Rocketry
app = Rocketry()
@app.task('daily')
def do_things():
...
if __name__ == "__main__":
app.run()
# Create some new items to the repo
repo.add(Item(id="a", name="Jack", age=30))
repo.add(Item(id="b", name="John", age=30))
repo.add(Item(id="c", name="James", age=40))
# Read one item from the repo
repo.filter_by(age=30).first()
# (returns one item)
# Read multiple items from the repo
# Create a Pydantic model
from pydantic import BaseModel
class MyItem(BaseModel):
id: str
name: str
age: int
# In-memory Repository
from redbird.repos import MemoryRepo
# In-memory Repository
from redbird.repos import MemoryRepo
repo = MemoryRepo()
# SQL Repository
from redbird.repos import SQLRepo
from sqlalchemy import create_engine
repo = SQLRepo(engine=create_engine('sqlite:///path/to/database.db'), table="my_items")
# Mongo Repository