Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahopkins/c6517ffa5c9d5bc9a1e3fc7fb25ad015 to your computer and use it in GitHub Desktop.
Save ahopkins/c6517ffa5c9d5bc9a1e3fc7fb25ad015 to your computer and use it in GitHub Desktop.
For syncing data across multiple running Sanic processes

For syncing data across multiple running Sanic processes

from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text
import json
import os
import aioredis
from pprint import pprint as print
PUBSUB_HOST = "localhost"
PUBSUB_PORT = 6379
CHANNEL = "SCHEMA_STORE"
app = Sanic("some_name")
app.config.KEEP_ALIVE_TIMEOUT = 1
async def receiver(app):
print(f"Receiver on {os.getpid()}")
while True:
try:
await app.pubsub.wait_message()
raw = await app.pubsub.get(encoding="utf-8")
except aioredis.errors.ChannelClosedError:
break
else:
print(f">>> PUBSUB rcvd: {raw}")
schema_data = json.loads(raw)
for view, data in schema_data.items():
cls = globals().get(view)
if not view:
print(f"Could not find {view}")
cls.schema = data
@app.listener("before_server_start")
async def setup_redis(app, loop):
app.redis = await aioredis.create_redis_pool(
address=f"redis://{PUBSUB_HOST}:{PUBSUB_PORT}",
minsize=2,
maxsize=500,
encoding="utf-8",
)
channels = await app.redis.subscribe(CHANNEL)
if channels:
app.pubsub = channels[0]
app.add_task(receiver(app))
@app.listener("after_server_stop")
async def close_redis(app, loop):
print(f"CLOSING {CHANNEL}")
await app.redis.unsubscribe(CHANNEL)
app.redis.close()
await app.redis.wait_closed()
class DummyView(HTTPMethodView):
schema = None
def get(self, request):
return text(f"On {os.getpid()}: schema={self.__class__.schema}")
async def post(self, request):
schema = request.json.get("schema")
await request.app.redis.publish(
CHANNEL, json.dumps({self.__class__.__name__: schema})
)
return text(f"Updated {os.getpid()}")
app.add_route(DummyView.as_view(), "/")
if __name__ == "__main__":
app.run(debug=True, workers=4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment