Created
November 28, 2021 20:18
-
-
Save zikphil/9b0c7d70b789796bcbcc56528e41ea51 to your computer and use it in GitHub Desktop.
AsyncSession support in OSO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import relationship | |
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, Table | |
from sqlalchemy import select | |
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession | |
from oso import Oso | |
from sqlalchemy_oso import register_models | |
from sqlalchemy_oso.session import authorized_async_sessionmaker | |
import asyncio | |
Model = declarative_base(name="Model") | |
class Post(Model): | |
__tablename__ = "posts" | |
id = Column(Integer, primary_key=True) | |
contents = Column(String) | |
access_level = Column(Enum("public", "private"), nullable=False) | |
created_by_id = Column(Integer, ForeignKey("users.id")) | |
created_by = relationship("User") | |
user_manages = Table( | |
"user_manages", | |
Model.metadata, | |
Column("managed_user_id", Integer, ForeignKey("users.id")), | |
Column("manager_user_id", Integer, ForeignKey("users.id")) | |
) | |
class User(Model): | |
__tablename__ = "users" | |
id = Column(Integer, primary_key=True) | |
username = Column(String, nullable=False) | |
is_admin = Column(Boolean, nullable=False, default=False) | |
manages = relationship("User", | |
secondary="user_manages", | |
primaryjoin=(id == user_manages.c.manager_user_id), | |
secondaryjoin=(id == user_manages.c.managed_user_id), | |
backref="managed_by" | |
) | |
async def test(): | |
oso = Oso() | |
register_models(oso, Model) | |
oso.load_files(["/Volumes/Projects/osohq/oso/phil/policy.polar"]) | |
user = User(username='user') | |
manager = User(username='manager', manages=[user]) | |
public_user_post = Post(contents='public_user_post', access_level='public', created_by=user) | |
private_user_post = Post(contents='private_user_post', access_level='private', created_by=user) | |
private_manager_post = Post(contents='private_manager_post', access_level='private', created_by=manager) | |
public_manager_post = Post(contents='public_manager_post', access_level='public', created_by=manager) | |
engine = create_async_engine('sqlite+aiosqlite:///:memory:') | |
async with engine.begin() as conn: | |
await conn.run_sync(Model.metadata.create_all) | |
fixture_session = AsyncSession(bind=engine) | |
fixture_session.add_all([user, manager, public_user_post, private_user_post, private_manager_post, public_manager_post]) | |
await fixture_session.commit() | |
AuthorizedSession = authorized_async_sessionmaker( | |
bind=engine, | |
get_oso=lambda: oso, | |
get_user=lambda: user, | |
get_checked_permissions=lambda: {Post: "read"}, | |
class_=AsyncSession | |
) | |
session = AuthorizedSession() | |
posts = (await session.execute(select(Post))).scalars().all() | |
print([p.contents for p in posts]) >>> ['public_user_post', 'private_user_post', 'public_manager_post'] | |
AuthorizedSession = authorized_async_sessionmaker(bind=engine, get_oso=lambda: oso, get_user=lambda: manager, get_checked_permissions=lambda: { Post: "read" }) | |
manager_session = AuthorizedSession() | |
posts = (await manager_session.execute(select(Post))).scalars().all() | |
print([p.contents for p in posts]) >>> ['public_user_post', 'private_user_post', 'private_manager_post', 'public_manager_post'] | |
await engine.dispose() | |
asyncio.run(test()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment