Skip to content

Instantly share code, notes, and snippets.

@zikphil
Created November 28, 2021 20:18
Show Gist options
  • Save zikphil/9b0c7d70b789796bcbcc56528e41ea51 to your computer and use it in GitHub Desktop.
Save zikphil/9b0c7d70b789796bcbcc56528e41ea51 to your computer and use it in GitHub Desktop.
AsyncSession support in OSO
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Enum, Table
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from oso import Oso
from sqlalchemy_oso import register_models
from sqlalchemy_oso.session import authorized_async_sessionmaker
import asyncio
Model = declarative_base(name="Model")
class Post(Model):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
contents = Column(String)
access_level = Column(Enum("public", "private"), nullable=False)
created_by_id = Column(Integer, ForeignKey("users.id"))
created_by = relationship("User")
user_manages = Table(
"user_manages",
Model.metadata,
Column("managed_user_id", Integer, ForeignKey("users.id")),
Column("manager_user_id", Integer, ForeignKey("users.id"))
)
class User(Model):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, nullable=False)
is_admin = Column(Boolean, nullable=False, default=False)
manages = relationship("User",
secondary="user_manages",
primaryjoin=(id == user_manages.c.manager_user_id),
secondaryjoin=(id == user_manages.c.managed_user_id),
backref="managed_by"
)
async def test():
oso = Oso()
register_models(oso, Model)
oso.load_files(["/Volumes/Projects/osohq/oso/phil/policy.polar"])
user = User(username='user')
manager = User(username='manager', manages=[user])
public_user_post = Post(contents='public_user_post', access_level='public', created_by=user)
private_user_post = Post(contents='private_user_post', access_level='private', created_by=user)
private_manager_post = Post(contents='private_manager_post', access_level='private', created_by=manager)
public_manager_post = Post(contents='public_manager_post', access_level='public', created_by=manager)
engine = create_async_engine('sqlite+aiosqlite:///:memory:')
async with engine.begin() as conn:
await conn.run_sync(Model.metadata.create_all)
fixture_session = AsyncSession(bind=engine)
fixture_session.add_all([user, manager, public_user_post, private_user_post, private_manager_post, public_manager_post])
await fixture_session.commit()
AuthorizedSession = authorized_async_sessionmaker(
bind=engine,
get_oso=lambda: oso,
get_user=lambda: user,
get_checked_permissions=lambda: {Post: "read"},
class_=AsyncSession
)
session = AuthorizedSession()
posts = (await session.execute(select(Post))).scalars().all()
print([p.contents for p in posts]) >>> ['public_user_post', 'private_user_post', 'public_manager_post']
AuthorizedSession = authorized_async_sessionmaker(bind=engine, get_oso=lambda: oso, get_user=lambda: manager, get_checked_permissions=lambda: { Post: "read" })
manager_session = AuthorizedSession()
posts = (await manager_session.execute(select(Post))).scalars().all()
print([p.contents for p in posts]) >>> ['public_user_post', 'private_user_post', 'private_manager_post', 'public_manager_post']
await engine.dispose()
asyncio.run(test())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment