Skip to content

Instantly share code, notes, and snippets.

@jcheng5
Last active June 21, 2023 00:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcheng5/427de09573816c4ce3a8c6ec1839e7c0 to your computer and use it in GitHub Desktop.
Save jcheng5/427de09573816c4ce3a8c6ec1839e7c0 to your computer and use it in GitHub Desktop.
Debounce and throttle for Shiny for Python

Sketch of debounce/throttle decorators for Shiny for Python

Drop ratelimit.py into your app directory to use in your own Shiny for Python app. See app.py for an example of these decorators in action.

Why is this a gist instead of a PR? Because to implement this "properly" for Shiny for Python, this would need to be beefed up to include type annotations, support for async, and unit/integration tests, which would all be more effort than this has taken so far. (To be clear, we want to do all that, but in the meantime, here's this gist.)

from shiny import Inputs, Outputs, Session, App, reactive, render, ui
from ratelimit import debounce, throttle
app_ui = ui.page_fluid(
ui.input_action_button("click", "Increment"),
ui.output_ui("output"),
)
def server(input: Inputs, output: Outputs, session: Session):
@debounce(2)
@reactive.Calc
def debounced_click():
return input.click()
@throttle(2)
@reactive.Calc
def throttled_click():
return input.click()
@output
@render.ui
def output():
return ui.div(
"Raw clicks: ",
input.click(),
ui.br(),
"Debounced clicks: ",
debounced_click(),
ui.br(),
"Throttled clicks: ",
throttled_click(),
)
app = App(app_ui, server)
import functools
import time
from shiny import reactive
def debounce(delay_secs):
def wrapper(f):
when = reactive.Value(None)
trigger = reactive.Value(0)
@reactive.Calc
def cached():
"""
Just in case f isn't a reactive calc already, wrap it in one. This ensures
that f() won't execute any more than it needs to.
"""
return f()
@reactive.Effect(priority=102)
def primer():
"""
Whenever cached() is invalidated, set a new deadline for when to let
downstream know--unless cached() invalidates again
"""
try:
cached()
except Exception:
...
finally:
when.set(time.time() + delay_secs)
@reactive.Effect(priority=101)
def timer():
"""
Watches changes to the deadline and triggers downstream if it's expired; if
not, use invalidate_later to wait the necessary time and then try again.
"""
deadline = when()
if deadline is None:
return
time_left = deadline - time.time()
if time_left <= 0:
# The timer expired
with reactive.isolate():
when.set(None)
trigger.set(trigger() + 1)
else:
reactive.invalidate_later(time_left)
@reactive.Calc
@reactive.event(trigger, ignore_none=False)
@functools.wraps(f)
def debounced():
return cached()
return debounced
return wrapper
def throttle(delay_secs):
def wrapper(f):
last_signaled = reactive.Value(None)
last_triggered = reactive.Value(None)
trigger = reactive.Value(0)
@reactive.Calc
def cached():
return f()
@reactive.Effect(priority=102)
def primer():
try:
cached()
except Exception:
...
finally:
last_signaled.set(time.time())
@reactive.Effect(priority=101)
def timer():
if last_triggered() is not None and last_signaled() < last_triggered():
return
now = time.time()
if last_triggered() is None or (now - last_triggered()) >= delay_secs:
last_triggered.set(now)
with reactive.isolate():
trigger.set(trigger() + 1)
else:
reactive.invalidate_later(delay_secs - (now - last_triggered()))
@reactive.Calc
@reactive.event(trigger, ignore_none=False)
@functools.wraps(f)
def throttled():
return cached()
return throttled
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment