Skip to content

Instantly share code, notes, and snippets.

@liuyibo
Created February 1, 2020 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save liuyibo/6fb24680a02b3735b5d50c850b59673d to your computer and use it in GitHub Desktop.
Save liuyibo/6fb24680a02b3735b5d50c850b59673d to your computer and use it in GitHub Desktop.
a streamlit app
import time
import importlib
from threading import Thread
from multiprocessing import Process, Queue
import click
import socket
import streamlit
import streamlit.bootstrap as bootstrap
from streamlit.cli import _apply_config_options_from_cli, NEW_VERSION_TEXT
from streamlit.credentials import check_credentials
from streamlit import version
from ripplit.caching import get_cache
class StandaloneServer(object):
def __init__(self):
self.input_queue = Queue()
self.output_queue = Queue()
self.process = Process(target=self.serve)
self.process.daemon = True
def start(self):
self.process.start()
def serve(self):
kwargs = {'browser_serverAddress': socket.getfqdn()}
_apply_config_options_from_cli(kwargs)
# Set a global flag indicating that we're "within" streamlit.
streamlit._is_running_with_streamlit = True
# Check credentials.
check_credentials()
# Notify if streamlit is out of date.
if version.should_show_new_version_notice():
click.echo(NEW_VERSION_TEXT)
self.communicate_thread = Thread(target=self.communicate)
self.communicate_thread.daemon = True
self.communicate_thread.start()
bootstrap.run(importlib.util.find_spec("ripplit.app").origin, '', [])
def enqueue_operation(self, opr):
self.input_queue.put(("opr", opr))
def wait(self):
self.input_queue.put(("wait", 0))
self.output_queue.get()
def communicate(self):
while True:
action, params = self.input_queue.get()
if action == "opr":
get_cache().enqueue_operation(params)
elif action == "wait":
get_cache().set_wait_flag()
while get_cache().has_wait_flag():
time.sleep(0.01)
self.output_queue.put(0)
server = StandaloneServer()
server.start()
imshow_queue = []
def imshow(name, img):
imshow_queue.append((name, img))
def waitKey(v=0):
if len(imshow_queue) > 0:
captions, imgs = zip(*imshow_queue)
server.enqueue_operation(("image", [list(imgs)], {'caption': list(captions)}))
imshow_queue.clear()
server.wait()
def _from_streamlit_delta(method):
def warpper(*args, **kwargs):
return server.enqueue_operation((method.__name__, args, kwargs))
return warpper
for name in dir(streamlit):
elem = getattr(streamlit, name)
if not name.startswith('_') and callable(elem):
globals()[name] = _from_streamlit_delta(elem)
import time
import threading
import numpy as np
import streamlit as st
from ripplit.caching import get_cache
st.markdown(
"""
<style>
.reportview-container .main .block-container{
max-width: 2000px !important;
padding: 0rem 1rem !important;
}
.stale-element {
opacity: 1.0 !important;
}
.modal {
display: none !important;
}
.modal-backdrop {
display: none !important;
}
</style>
""",
unsafe_allow_html=True,
)
i = 0
def apply_oprs():
global i
oprs = cache.get_all_operations()
for func_name, args, kwargs in oprs[i:]:
func = getattr(st, func_name)
func(*args, **kwargs)
i = len(oprs)
cache = get_cache()
button_disable_style = st.markdown(
"""
<style>
.stButton {
cursor: not-allowed !important;
pointer-events: none !important;
opacity: 0.3 !important;
}
</style>
""",
unsafe_allow_html=True,
)
if st.button('continue'):
cache.clear()
while not cache.has_wait_flag():
apply_oprs()
time.sleep(0.01)
apply_oprs()
button_disable_style.markdown("")
class Cache(object):
def __init__(self):
self._wait_flag = False
self._oprs = []
def set_wait_flag(self):
self._wait_flag = True
def has_wait_flag(self):
return self._wait_flag
def enqueue_operation(self, opr):
self._oprs.append(opr)
def get_all_operations(self):
return list(self._oprs)
def clear(self):
self._wait_flag = False
self._oprs.clear()
_cache = Cache()
def get_cache():
return _cache
import inspect
import numpy as np
import ripplit as rp
def main():
for i in range(0, 255, 30):
rp.code(inspect.getsource(main), language='python')
rp.imshow('img1', np.zeros((128, 128), 'uint8') + i)
rp.imshow('img2', np.zeros((64, 64), 'uint8') + 255 - i)
rp.json({'123': '456'})
rp.image([
np.zeros((64, 64, 3), 'uint8') + (255, 0, 0),
np.zeros((64, 64, 3), 'uint8') + (0, 255, 0),
np.zeros((64, 64, 3), 'uint8') + (0, 0, 255),
], caption=['r', 'g', 'b'])
rp.text("Iteration: " + str(i))
rp.latex(r'''
a + ar + a r^2 + a r^3 + \cdots + a r^{n-1} =
\sum_{k=0}^{n-1} ar^k =
a \left(\frac{1-r^{n}}{1-r}\right)
''')
rp.write('1 + 1 = ', 2)
rp.waitKey(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment