Skip to content

Instantly share code, notes, and snippets.

@bin-san

bin-san/main.py Secret

Created June 24, 2024 11:02
Show Gist options
  • Save bin-san/ede5d876aba43eff290a776915659345 to your computer and use it in GitHub Desktop.
Save bin-san/ede5d876aba43eff290a776915659345 to your computer and use it in GitHub Desktop.
A Nicegui UI Demo for broadcasting videos with multiuser control
import os
from pydantic import BaseModel
from nicegui import ui, app, events
from uuid import uuid4
from time import time_ns
from typing import Literal
# from sandipan import simplify_file_size
from starlette.requests import Request
website = 'http://localhost:8000/'
try:
os.mkdir('./vid')
except:
pass
def simplify_file_size(file_size: int):
prefixes = ['B','KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'YB', 'ZB']
current_prefix = 0
while file_size >= 1000 and current_prefix < len(prefixes):
file_size/=1000
current_prefix+=1
return f'{int(file_size)}{prefixes[current_prefix]}'
app.add_media_files('/vid', './vid')
# storage startup
if app.storage.general.get('rooms') is None:
app.storage.general['rooms'] = {}
def create_new_room(room_name:str)->str:
room_code = f'{uuid4()}-{time_ns()}'
# # creating directories
# try:
# os.mkdir(f'./rooms/{room_code}')
# except Exception as e:
# ui.notify('Room creation failed')
# print(e)
# raise e
# updating virtual storage
app.storage.general['rooms'][room_code] = {
'name': room_name,
'posts': [], # list of posts in the room,
'command': [], # for video command system, list of int: current command on the room,
'linear-command': '', # string version
'current_playing_video': 0, # video refernce of current playing video
}
app.storage.user['owned_rooms'].append(room_code)
return room_code
class WorkerMessage(BaseModel):
status: bool
message: str
room_name: str = 'Another world'
file_src: str = ''
def join_an_existing_room(room_code: str)->WorkerMessage:
r = app.storage.general['rooms'].get(room_code)
if not r:
return WorkerMessage(status=False, message='Room does not exists')
if not app.storage.user.get('owned_rooms'):
app.storage.user['owned_rooms'] = []
if not app.storage.user.get('joined_rooms'):
app.storage.user['joined_rooms'] = []
if room_code in app.storage.user.get('owned_rooms'):
return WorkerMessage(status=False, message='Can not join to owned room')
if room_code in app.storage.user.get('joined_rooms'):
return WorkerMessage(status=False, message='Already joined to this room')
# update user data
app.storage.user['joined_rooms'].append(room_code)
return WorkerMessage(status=True, message='Successfully Joined', room_name=r['name'])
def delete_room(room_code):
try:
# user
app.storage.user['joined_rooms'].remove(room_code)
except:
pass
try:
app.storage.user['owned_rooms'].remove(room_code)
except:
pass
# general
try:
app.storage.general['rooms'].pop(room_code)
except:
pass
def room_card(room_name, room_code):
with ui.card().classes('w-full') as x:
with ui.row().classes('w-full'):
ui.label(room_name).style('font-size: 2rem')
ui.space()
def del_handler():
delete_room(room_code)
x.delete()
ui.button(icon='delete', on_click=del_handler).props('flat color=red')
ui.button(icon='arrow_forward', on_click=lambda: ui.navigate.to(f'/room/{room_code}')
).props('rounded color=accent')
with ui.row().classes('w-full'):
def copy_code_to_clipboard():
ui.run_javascript(f'navigator.clipboard.writeText("{room_code}")')
ui.notify('Code copied to clipboard')
def copy_link_to_clipboard():
ui.run_javascript(f'navigator.clipboard.writeText("{website.rstrip('/')}/join/{room_code}")')
ui.notify('Link copied to clipboard')
ui.chip(room_code, color='inversePrimary', on_click=copy_code_to_clipboard)
ui.space()
with ui.row():
ui.button(
'copy code', icon='content_copy', on_click=copy_code_to_clipboard).props('flat rounded')
ui.button(
'share link', icon='link', on_click=copy_link_to_clipboard).props('flat rounded color=green')
return x
@ui.page('/join/{room_code}')
def join_with_room_code_link(room_code):
workerMsg = join_an_existing_room(room_code)
if workerMsg.status:
ui.notify(workerMsg.message, color='green')
else:
ui.notify(workerMsg.message, color='red')
return ui.navigate.to('/')
@ui.page('/')
def index():
# user storage setup
if app.storage.user.get('owned_rooms') is None:
app.storage.user['owned_rooms'] = []
if app.storage.user.get('joined_rooms') is None:
app.storage.user['joined_rooms'] = []
ui.page_title('Party Rooms')
with ui.column().classes('fullscreen'):
with ui.row().classes('full-width shadow-4 bg-primary text-white').style('padding: 1rem'):
ui.label('🎉 Party Rooms').style('font-size:1.5rem')
# Create UI: to create a room
with ui.dialog() as create_form:
with ui.card():
ui.label('Create a new party 🎉').style('font-size: 1.5rem')
crf_inp = ui.input('Party Name', placeholder='Marvel Movie Night', validation={
'Party name must be between 3-20': lambda v: 3 <= len(v) <= 20
}).classes('w-full')
def create_btn_handler():
if not crf_inp.validate():
return
rn1 = crf_inp.value
rc1 = create_new_room(rn1)
with own_rooms_scroll_area:
room_card(rn1, rc1)
own_rooms_scroll_area.scroll_to(percent=100)
tab_panels.set_value(owned_tab_panel)
create_form.set_value(False)
ui.button('Create', on_click=create_btn_handler).classes('w-full')
# Join UI: to enter room code for join
with ui.dialog() as join_form:
with ui.card():
ui.label('Join an existing party 🥳').style('font-size: 1.5rem')
jfinp = ui.input('Enter Room Code', validation={
'Enter a valid code': lambda v: len(v) != 0
}).classes('w-full')
def join_form_handler():
if not jfinp.validate():
return
rmcode = jfinp.value
worker_msg = join_an_existing_room(rmcode)
if not worker_msg.status:
ui.notify(worker_msg.message, color='red')
return
# here
with joined_rooms_scroll_area:
room_card(worker_msg.room_name, rmcode)
joined_rooms_scroll_area.scroll_to(percent=100)
ui.notify(worker_msg.message)
tab_panels.set_value(owned_tab_panel)
join_form.set_value(False)
ui.button('Join', on_click=join_form_handler).classes('w-full')
# floating action button
with ui.page_sticky(position='bottom-right').style('z-index:1000'):
with ui.element('q-fab').props('icon=add direction=up').style('margin:2rem'):
ui.element('q-fab-action').props('label=Create').on('click', lambda: create_form.set_value(True))
ui.element('q-fab-action').props('label=Join').on('click', lambda: join_form.set_value(True))
with ui.tabs().props('inline-label align=justify').classes('w-full') as tabs:
tab1 = ui.tab('owned', 'Owned', 'person')
tab2 = ui.tab('joined', 'Joined', 'public')
with ui.tab_panels(tabs, value=tab1).classes('full-width full-height') as tab_panels:
# owned rooms list
with ui.tab_panel('owned') as owned_tab_panel:
with ui.scroll_area().classes('full-height') as own_rooms_scroll_area:
for room_code in app.storage.user['owned_rooms']:
d = app.storage.general['rooms'][room_code]
room_name = d['name']
room_card(room_name, room_code)
# list of rooms joined by user
with ui.tab_panel('joined') as joined_tab_panel:
with ui.scroll_area().classes('full-height') as joined_rooms_scroll_area:
for room_code in app.storage.user['joined_rooms']:
d = app.storage.general['rooms'][room_code]
room_name = d['name']
room_card(room_name, room_code)
class ValueWrapper(BaseModel):
value:bool = False
file_name:str = ''
file_encoded_name:str = ''
file_src: str = ''
class GeneralStorageRoomPost(BaseModel):
src: str
length: int = 0
# the video on the page post the command to an endpoint from frontend
# then this api handles the data and update it in the general storage
@app.post('/post-command')
async def handle_posted_command(request: Request):
form = await request.form()
room_code = form.get('room_code')
cmd = form.get('cmd')
vid = form.get('vid')
pos = form.get('pos')
if room_code and cmd and vid and pos:
vid = int(vid)
pos = float(pos)
app.storage.general['rooms'][room_code]['command'] = [cmd, vid, pos]
app.storage.general['rooms'][room_code]['linear-command'] = f'{cmd}-{vid}-{pos}'
@ui.page('/room/{room_code}')
def party_room(room_code):
class UploadWorkerMessage(BaseModel):
status: bool = False
message: str = ''
file_src: str = ''
file_type: str = ''
file_name: str = ''
file_size: str = ''
room_code: str = ''
caption: str = ''
def upload_video_to_room(e: events.UploadEventArguments, file_upload_status: UploadWorkerMessage):
file_name = e.name
unique_file_name = f'{uuid4()}-{time_ns()}-{file_name}'
file_src = f'./vid/{room_code}-{unique_file_name}'
try:
with open(file_src, 'wb') as f:
f.write(e.content.read())
file_size = simplify_file_size(e.content.tell())
except Exception as e:
print(e)
file_upload_status.status = False
return
ui.notify('File successfully uploaded', color='green')
file_upload_status.status = True
file_upload_status.file_src = file_src
file_upload_status.file_type = e.type
file_upload_status.file_name = file_name
file_upload_status.file_size = file_size
def create_new_post_in_the_room(post_upload_status: UploadWorkerMessage):
# update virtual storage
app.storage.general['rooms'][post_upload_status.room_code]['posts'].append(
{
'src': post_upload_status.file_src,
'type': post_upload_status.file_type,
'caption': post_upload_status.caption,
'file_name': post_upload_status.file_name,
'file_size': post_upload_status.file_size
}
)
# ui for new post
def post_card(post_upload_status: UploadWorkerMessage):
card = ui.card().style('width: 90vmin; margin-bottom: 2rem')
with card:
ui.label(post_upload_status.caption).style('font-size:1.5rem')
if post_upload_status.file_type.startswith('video'):
vrflength = len(video_reference_list)
video_reference_list.append(
ui.video(post_upload_status.file_src).classes('w-full').on(
'play',
js_handler=f'''
(event)=>{{
let f = new FormData();
f.append('room_code', '{room_code}');
f.append('cmd', 'play');
f.append('vid', {vrflength});
f.append('pos', event.target.currentTime);
fetch('/post-command', {{
method: 'POST', body: f
}});
}}
'''
).on(
'pause',
js_handler=f'''
(event)=>{{
let f = new FormData();
f.append('room_code', '{room_code}');
f.append('cmd', 'pause');
f.append('vid', {vrflength});
f.append('pos', event.target.currentTime);
fetch('/post-command', {{
method: 'POST', body: f
}});
}}
'''
).on(
'seeked',
js_handler=f'''
(event)=>{{
let f = new FormData();
f.append('room_code', '{room_code}');
f.append('cmd', 'seek');
f.append('vid', {vrflength});
f.append('pos', event.target.currentTime);
fetch('/post-command', {{
method: 'POST', body: f
}});
}}
'''
)
)
with ui.row().classes('w-full'):
ui.chip(post_upload_status.file_type).props('outline')
ui.chip(post_upload_status.file_size).props('outline color=secondary')
ui.space()
ui.button(icon='delete').props('rounded flat color=red')
# file upload form ui
def file_form(file_type: Literal['video', 'image', 'audio']):
d = ui.dialog(value=True)
with d:
with ui.card().classes('absolute-center').classes('w-full'):
ui.label(f'Post an {file_type.title()}').style('font-size: 1.5rem')
post_upload_status = UploadWorkerMessage(status=False, message='', room_code=room_code)
def pu_h(e: events.UploadEventArguments):
upload_video_to_room(e, post_upload_status)
post_btn.enable()
post_upload = ui.upload(
on_upload=pu_h,
auto_upload=True,
label=f'Select {file_type} file').classes('w-full')
post_input = ui.input('Caption', placeholder='An amazing post').classes('w-full')
def post_btn_h():
# verify the upload has been done successfuly
if not post_upload_status.status:
ui.notify('Can not create post', color='red')
print(post_upload_status.message)
return
post_upload_status.caption = post_input.value
create_new_post_in_the_room(post_upload_status)
ui.notify('Successfully created post.')
# updating the post container
with post_container:
post_card(post_upload_status)
post_container_scroll_area.scroll_to(percent=100)
d.set_value(False)
post_btn = ui.button('Post', on_click=post_btn_h).classes('w-full')
post_btn.disable()
# video command system
video_reference_list = [] # to hold the references of the vidoes on the page
def cmd_play_at(video_reference_index: int, video_position: float):
video_reference_list[video_reference_index].seek(video_position)
video_reference_list[video_reference_index].play()
print(f'Playing at {video_position}')
# ui.video()
def cmd_pause_at(video_reference_index: int, video_position: float):
video_reference_list[video_reference_index].seek(video_position)
video_reference_list[video_reference_index].pause()
print(f'Pausing at {video_position}')
def cmd_seek_to(video_reference_index: int, video_position: float):
video_reference_list[video_reference_index].seek(video_position)
print(f'Seeked to {video_position}')
video_command_references = {
'play': cmd_play_at,
'pause': cmd_pause_at,
'seek': cmd_seek_to
}
# continuously monitor the command
app.storage.user['last-command'] = ''
ui.notify(app.storage.user)
room_name = app.storage.general['rooms'][room_code]['name']
ui.page_title(room_name)
with ui.column().classes('fullscreen'):
with ui.row(align_items='center').classes('full-width shadow-4 bg-primary text-white').style('padding: 1rem'):
ui.label(f'🎉 {room_name} 🎉').style('font-size:1.5rem')
ui.space()
def monitor_cmd(v):
try:
current_cmd = app.storage.general['rooms'][room_code]['linear-command']
except:
return v
if not app.storage.user.get('last-command'):
app.storage.user['last-command'] = ''
if app.storage.user['last-command'] != current_cmd:
print(current_cmd)
# execute the command
cmd, vid, pos = app.storage.general['rooms'][room_code]['command']
try:
video_command_references[cmd](vid, pos)
except Exception as e:
print(e)
app.storage.user['last-command'] = current_cmd
return v
ui.chip().bind_text_from(app.storage.general['rooms'][room_code], 'linear-command', backward=monitor_cmd)
ui.space()
ui.button('Exit').props('icon-right=logout flat color=inversePrimary rounded')
# action button to post
with ui.page_sticky('bottom-right').style('z-index:1000'):
with ui.element('q-fab').props('icon=post_add direction=left').style('margin:2rem'):
ui.element('q-fab-action').props('icon=movie label=Video').on('click', lambda: file_form('video'))
ui.element('q-fab-action').props('icon=image label=Image').on('click', lambda: file_form('image'))
ui.element('q-fab-action').props('icon=music_note label=Audio').on('click', lambda: file_form('audio'))
ui.element('q-fab-action').props('icon="img:https://upload.wikimedia.org/wikipedia/commons/0/09/YouTube_full-color_icon_%282017%29.svg" label="youtube"')
with ui.scroll_area().classes('full-width full-height') as post_container_scroll_area:
with ui.row() as post_container:
'''
{
'src': file_src,
'type': file_type,
'caption': post_caption,
'file_name': file_name,
'file_size': file_size
}
'''
for i in app.storage.general['rooms'][room_code]['posts']:
post_card(
UploadWorkerMessage(
room_code=room_code,
caption=i['caption'],
file_type=i['type'],
file_src=i['src'],
file_size=i['file_size']
)
)
host = '0.0.0.0'
port = 8999
website = f'http://223.229.180.169:{port}/'
ui.run(host=host, port=port, show=False, storage_secret='bin-san-2002', favicon='🎉')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment