Skip to content

Instantly share code, notes, and snippets.

dcc.Interval(id="interval", interval=1_000),
@callback(Output("time", "children"), Input("interval", "n_intervals"))
def refresh_data_at_interval(interval_trigger):
"""
This simple callback demonstrates how to use the Interval component to update data at a regular interval.
This particular example updates time every second, however, you can subsitute this data query with any acquisition method your product requires.
"""
return dt.datetime.now().strftime("%M:%S")
@app.callback(
Output ("demographics", "figure"),
Input("scatter-x", "value"),
Input ("comparison", "value"),
)
def make_scatter(xaxis, comp):
dfscatter = dbx_utils.get_scatter_data(xaxis, comp)
scatterfig = chart_utils.generate_scatter(dfscatter, xaxis, comp)
return scatterfig
def update_graph(input_value):
## ORM-based SQL Query with dynamic filters in the callback
stmt = select([
sensor_table.columns.MeasurementDateTime,
sensor_table.columns.LongMovingAverage,
sensor_table.columns.SensorLocation
]).where(and_(
sensor_table.columns.SensorMeasurement == input_value
))
## Get a full table with SQL Alchemy
sensor_table = Table("gold_all_sensors", MetaData(bind=engine), autoload=True)
## Set up SQL Alchemy engine
engine = create_engine(
f"databricks+connector://token:{token}@{host_name}:443/{database}",
connect_args={
"http_path": http_path,
},
)
from sqlalchemy.engine import create_engine
## Set up SQL Alchemy engine
engine = create_engine(
f"databricks+connector://token:{token}@{host_name}:443/{database}",
connect_args={
"http_path": http_path,
},
)
def generate_scatter(df, xaxis, comp):
axis_labels = {
"age": "Age (years)",
"height": "Height (inches)",
"weight": "Weight (lbs)",
}
fig = px.scatter(
df,
x=xaxis,
y="risk",
def get_scatter_data(xaxis, comp):
"""
Fetches specified columns and an aggregated column from the silver_users table, returns it as a pandas dataframe
Returns
-------
df : pandas dataframe
basic query of data from Databricks as a pandas dataframe
"""
connection0 = sql.connect(
server_hostname=SERVER_HOSTNAME,
def get_listofusers():
connection3 = sql.connect(
server_hostname=SERVER_HOSTNAME,
http_path=HTTP_PATH,
access_token=ACCESS_TOKEN,
)
cursor3 = connection3.cursor()
cursor3.execute(
f"SELECT DISTINCT userid FROM {DB_NAME}.{USER_TABLE} ORDER BY userid ASC"
)