Skip to content

Instantly share code, notes, and snippets.

@DarcJC
Last active August 2, 2021 11:49
Show Gist options
  • Save DarcJC/4d2cfefe0efbf70015233bb886b24e70 to your computer and use it in GitHub Desktop.
Save DarcJC/4d2cfefe0efbf70015233bb886b24e70 to your computer and use it in GitHub Desktop.
Casbin Policy Auto Reloader
"""
Current version(1.1.7) of AsyncCasbin doesn't support any async orm adapter,
it means adapter such as casbin_tortoise_adapter wouldn't work with the SyncedEnforcer class
(you will recieve "... was never awaited" if you use it ).
So we could just use common Enforcer, and reload the policy periodically(the strategy SyncedEnforcer choose).
"""
import casbin
import casbin_tortoise_adapter
_adapter = casbin_tortoise_adapter.TortoiseAdapter()
enforcer = casbin.Enforcer(_your_model, _adapter)
class PolicyLoaderThread(threading.Thread):
def __init__(self, interval=10):
super(PolicyLoaderThread, self).__init__(daemon=True)
self._interval = interval
self._stop = threading.Event()
self._io_loop = asyncio.get_event_loop()
@staticmethod
async def task():
await enforcer.load_policy()
def run(self):
self._io_loop.create_task(self.task()) # loading policy at startup
while not self.stopped:
self._stop.wait(self._interval)
if self.stopped:
"""
Stopping threading when _stop was set.
You could change to continue if you want to restart it. Clearing the _stop flag to restart it.
"""
break
self._io_loop.create_task(self.task())
def stop(self):
self._stop.set()
self._io_loop.close()
@property
def stopped(self):
return self._stop.is_set()
_thread = PolicyLoaderThread()
_thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment