Skip to content

Instantly share code, notes, and snippets.

@ahopkins
Created May 7, 2023 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahopkins/08f6f30c586f73efd1fddf1e93e299c7 to your computer and use it in GitHub Desktop.
Save ahopkins/08f6f30c586f73efd1fddf1e93e299c7 to your computer and use it in GitHub Desktop.
Hello, world.
// Get the latest version of this content from: https://github.com/livereload/livereload-js/raw/master/dist/livereload.js

Sanic Live Reload

Uses livereload.js to run a live reload server with Sanic. Startup, with auto-reload enabled. In this example, we also are adding an extra directory to listen to:

sanic server:app --dev -R ./content

Run the example. Open http://127.0.0.1:8000/ in a browser with a livereload extension installed. Make a change to ./content/hello.txt and save. Your browser should now update to the new text.

from multiprocessing import Manager
from pathlib import Path
from queue import Empty, Queue
from asyncio import sleep
from sanic import Sanic, Request, Websocket, file
import ujson
app = Sanic("TestApp")
app.ext.add_constant("content_path", Path(__file__).parent / "content")
@app.get("/")
async def handler(request: Request, content_path: Path):
return await file(content_path / "hello.txt")
@app.main_process_start
async def main_process_start(app: Sanic):
app.ctx.manager = Manager()
app.shared_ctx.reload_queue = app.ctx.manager.Queue()
@app.main_process_ready
async def main_process_ready(app: Sanic):
app.manager.manage(
"Livereload",
run_reload_server,
{"reload_queue": app.shared_ctx.reload_queue},
)
@app.main_process_stop
async def main_process_stop(app: Sanic):
app.ctx.manager.shutdown()
@app.before_server_start
async def before_server_start(app: Sanic):
app.shared_ctx.reload_queue.put("reload")
class Livereload:
SERVER_NAME = "Reloader"
HELLO = {
"command": "hello",
"protocols": [
"http://livereload.com/protocols/official-7",
],
"serverName": SERVER_NAME,
}
def __init__(self, reload_queue: Queue):
self.reload_queue = reload_queue
self.app = Sanic(self.SERVER_NAME)
self.app.static(
"/livereload.js", Path(__file__).parent / "livereload.js"
)
self.app.add_websocket_route(
self.livereload_handler, "/livereload", name="livereload"
)
self.app.add_task(self._listen_to_queue())
self.app.config.EVENT_AUTOREGISTER = True
def run(self):
self.app.run(port=35729, single_process=True)
async def _listen_to_queue(self):
while True:
try:
self.reload_queue.get_nowait()
except Empty:
await sleep(0.5)
continue
await self.app.dispatch("livereload.file.reload")
async def livereload_handler(self, request: Request, ws: Websocket):
await ws.recv()
await ws.send(ujson.dumps(self.HELLO))
while True:
await request.app.event("livereload.file.reload")
await ws.send(ujson.dumps({"command": "reload", "path": "..."}))
def run_reload_server(reload_queue: Queue):
Livereload(reload_queue).run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment