-
-
Save bin-san/ede5d876aba43eff290a776915659345 to your computer and use it in GitHub Desktop.
A Nicegui UI Demo for broadcasting videos with multiuser control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pydantic import BaseModel | |
from nicegui import ui, app, events | |
from uuid import uuid4 | |
from time import time_ns | |
from typing import Literal | |
# from sandipan import simplify_file_size | |
from starlette.requests import Request | |
website = 'http://localhost:8000/' | |
try: | |
os.mkdir('./vid') | |
except: | |
pass | |
def simplify_file_size(file_size: int): | |
prefixes = ['B','KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'YB', 'ZB'] | |
current_prefix = 0 | |
while file_size >= 1000 and current_prefix < len(prefixes): | |
file_size/=1000 | |
current_prefix+=1 | |
return f'{int(file_size)}{prefixes[current_prefix]}' | |
app.add_media_files('/vid', './vid') | |
# storage startup | |
if app.storage.general.get('rooms') is None: | |
app.storage.general['rooms'] = {} | |
def create_new_room(room_name:str)->str: | |
room_code = f'{uuid4()}-{time_ns()}' | |
# # creating directories | |
# try: | |
# os.mkdir(f'./rooms/{room_code}') | |
# except Exception as e: | |
# ui.notify('Room creation failed') | |
# print(e) | |
# raise e | |
# updating virtual storage | |
app.storage.general['rooms'][room_code] = { | |
'name': room_name, | |
'posts': [], # list of posts in the room, | |
'command': [], # for video command system, list of int: current command on the room, | |
'linear-command': '', # string version | |
'current_playing_video': 0, # video refernce of current playing video | |
} | |
app.storage.user['owned_rooms'].append(room_code) | |
return room_code | |
class WorkerMessage(BaseModel): | |
status: bool | |
message: str | |
room_name: str = 'Another world' | |
file_src: str = '' | |
def join_an_existing_room(room_code: str)->WorkerMessage: | |
r = app.storage.general['rooms'].get(room_code) | |
if not r: | |
return WorkerMessage(status=False, message='Room does not exists') | |
if not app.storage.user.get('owned_rooms'): | |
app.storage.user['owned_rooms'] = [] | |
if not app.storage.user.get('joined_rooms'): | |
app.storage.user['joined_rooms'] = [] | |
if room_code in app.storage.user.get('owned_rooms'): | |
return WorkerMessage(status=False, message='Can not join to owned room') | |
if room_code in app.storage.user.get('joined_rooms'): | |
return WorkerMessage(status=False, message='Already joined to this room') | |
# update user data | |
app.storage.user['joined_rooms'].append(room_code) | |
return WorkerMessage(status=True, message='Successfully Joined', room_name=r['name']) | |
def delete_room(room_code): | |
try: | |
# user | |
app.storage.user['joined_rooms'].remove(room_code) | |
except: | |
pass | |
try: | |
app.storage.user['owned_rooms'].remove(room_code) | |
except: | |
pass | |
# general | |
try: | |
app.storage.general['rooms'].pop(room_code) | |
except: | |
pass | |
def room_card(room_name, room_code): | |
with ui.card().classes('w-full') as x: | |
with ui.row().classes('w-full'): | |
ui.label(room_name).style('font-size: 2rem') | |
ui.space() | |
def del_handler(): | |
delete_room(room_code) | |
x.delete() | |
ui.button(icon='delete', on_click=del_handler).props('flat color=red') | |
ui.button(icon='arrow_forward', on_click=lambda: ui.navigate.to(f'/room/{room_code}') | |
).props('rounded color=accent') | |
with ui.row().classes('w-full'): | |
def copy_code_to_clipboard(): | |
ui.run_javascript(f'navigator.clipboard.writeText("{room_code}")') | |
ui.notify('Code copied to clipboard') | |
def copy_link_to_clipboard(): | |
ui.run_javascript(f'navigator.clipboard.writeText("{website.rstrip('/')}/join/{room_code}")') | |
ui.notify('Link copied to clipboard') | |
ui.chip(room_code, color='inversePrimary', on_click=copy_code_to_clipboard) | |
ui.space() | |
with ui.row(): | |
ui.button( | |
'copy code', icon='content_copy', on_click=copy_code_to_clipboard).props('flat rounded') | |
ui.button( | |
'share link', icon='link', on_click=copy_link_to_clipboard).props('flat rounded color=green') | |
return x | |
@ui.page('/join/{room_code}') | |
def join_with_room_code_link(room_code): | |
workerMsg = join_an_existing_room(room_code) | |
if workerMsg.status: | |
ui.notify(workerMsg.message, color='green') | |
else: | |
ui.notify(workerMsg.message, color='red') | |
return ui.navigate.to('/') | |
@ui.page('/') | |
def index(): | |
# user storage setup | |
if app.storage.user.get('owned_rooms') is None: | |
app.storage.user['owned_rooms'] = [] | |
if app.storage.user.get('joined_rooms') is None: | |
app.storage.user['joined_rooms'] = [] | |
ui.page_title('Party Rooms') | |
with ui.column().classes('fullscreen'): | |
with ui.row().classes('full-width shadow-4 bg-primary text-white').style('padding: 1rem'): | |
ui.label('🎉 Party Rooms').style('font-size:1.5rem') | |
# Create UI: to create a room | |
with ui.dialog() as create_form: | |
with ui.card(): | |
ui.label('Create a new party 🎉').style('font-size: 1.5rem') | |
crf_inp = ui.input('Party Name', placeholder='Marvel Movie Night', validation={ | |
'Party name must be between 3-20': lambda v: 3 <= len(v) <= 20 | |
}).classes('w-full') | |
def create_btn_handler(): | |
if not crf_inp.validate(): | |
return | |
rn1 = crf_inp.value | |
rc1 = create_new_room(rn1) | |
with own_rooms_scroll_area: | |
room_card(rn1, rc1) | |
own_rooms_scroll_area.scroll_to(percent=100) | |
tab_panels.set_value(owned_tab_panel) | |
create_form.set_value(False) | |
ui.button('Create', on_click=create_btn_handler).classes('w-full') | |
# Join UI: to enter room code for join | |
with ui.dialog() as join_form: | |
with ui.card(): | |
ui.label('Join an existing party 🥳').style('font-size: 1.5rem') | |
jfinp = ui.input('Enter Room Code', validation={ | |
'Enter a valid code': lambda v: len(v) != 0 | |
}).classes('w-full') | |
def join_form_handler(): | |
if not jfinp.validate(): | |
return | |
rmcode = jfinp.value | |
worker_msg = join_an_existing_room(rmcode) | |
if not worker_msg.status: | |
ui.notify(worker_msg.message, color='red') | |
return | |
# here | |
with joined_rooms_scroll_area: | |
room_card(worker_msg.room_name, rmcode) | |
joined_rooms_scroll_area.scroll_to(percent=100) | |
ui.notify(worker_msg.message) | |
tab_panels.set_value(owned_tab_panel) | |
join_form.set_value(False) | |
ui.button('Join', on_click=join_form_handler).classes('w-full') | |
# floating action button | |
with ui.page_sticky(position='bottom-right').style('z-index:1000'): | |
with ui.element('q-fab').props('icon=add direction=up').style('margin:2rem'): | |
ui.element('q-fab-action').props('label=Create').on('click', lambda: create_form.set_value(True)) | |
ui.element('q-fab-action').props('label=Join').on('click', lambda: join_form.set_value(True)) | |
with ui.tabs().props('inline-label align=justify').classes('w-full') as tabs: | |
tab1 = ui.tab('owned', 'Owned', 'person') | |
tab2 = ui.tab('joined', 'Joined', 'public') | |
with ui.tab_panels(tabs, value=tab1).classes('full-width full-height') as tab_panels: | |
# owned rooms list | |
with ui.tab_panel('owned') as owned_tab_panel: | |
with ui.scroll_area().classes('full-height') as own_rooms_scroll_area: | |
for room_code in app.storage.user['owned_rooms']: | |
d = app.storage.general['rooms'][room_code] | |
room_name = d['name'] | |
room_card(room_name, room_code) | |
# list of rooms joined by user | |
with ui.tab_panel('joined') as joined_tab_panel: | |
with ui.scroll_area().classes('full-height') as joined_rooms_scroll_area: | |
for room_code in app.storage.user['joined_rooms']: | |
d = app.storage.general['rooms'][room_code] | |
room_name = d['name'] | |
room_card(room_name, room_code) | |
class ValueWrapper(BaseModel): | |
value:bool = False | |
file_name:str = '' | |
file_encoded_name:str = '' | |
file_src: str = '' | |
class GeneralStorageRoomPost(BaseModel): | |
src: str | |
length: int = 0 | |
# the video on the page post the command to an endpoint from frontend | |
# then this api handles the data and update it in the general storage | |
@app.post('/post-command') | |
async def handle_posted_command(request: Request): | |
form = await request.form() | |
room_code = form.get('room_code') | |
cmd = form.get('cmd') | |
vid = form.get('vid') | |
pos = form.get('pos') | |
if room_code and cmd and vid and pos: | |
vid = int(vid) | |
pos = float(pos) | |
app.storage.general['rooms'][room_code]['command'] = [cmd, vid, pos] | |
app.storage.general['rooms'][room_code]['linear-command'] = f'{cmd}-{vid}-{pos}' | |
@ui.page('/room/{room_code}') | |
def party_room(room_code): | |
class UploadWorkerMessage(BaseModel): | |
status: bool = False | |
message: str = '' | |
file_src: str = '' | |
file_type: str = '' | |
file_name: str = '' | |
file_size: str = '' | |
room_code: str = '' | |
caption: str = '' | |
def upload_video_to_room(e: events.UploadEventArguments, file_upload_status: UploadWorkerMessage): | |
file_name = e.name | |
unique_file_name = f'{uuid4()}-{time_ns()}-{file_name}' | |
file_src = f'./vid/{room_code}-{unique_file_name}' | |
try: | |
with open(file_src, 'wb') as f: | |
f.write(e.content.read()) | |
file_size = simplify_file_size(e.content.tell()) | |
except Exception as e: | |
print(e) | |
file_upload_status.status = False | |
return | |
ui.notify('File successfully uploaded', color='green') | |
file_upload_status.status = True | |
file_upload_status.file_src = file_src | |
file_upload_status.file_type = e.type | |
file_upload_status.file_name = file_name | |
file_upload_status.file_size = file_size | |
def create_new_post_in_the_room(post_upload_status: UploadWorkerMessage): | |
# update virtual storage | |
app.storage.general['rooms'][post_upload_status.room_code]['posts'].append( | |
{ | |
'src': post_upload_status.file_src, | |
'type': post_upload_status.file_type, | |
'caption': post_upload_status.caption, | |
'file_name': post_upload_status.file_name, | |
'file_size': post_upload_status.file_size | |
} | |
) | |
# ui for new post | |
def post_card(post_upload_status: UploadWorkerMessage): | |
card = ui.card().style('width: 90vmin; margin-bottom: 2rem') | |
with card: | |
ui.label(post_upload_status.caption).style('font-size:1.5rem') | |
if post_upload_status.file_type.startswith('video'): | |
vrflength = len(video_reference_list) | |
video_reference_list.append( | |
ui.video(post_upload_status.file_src).classes('w-full').on( | |
'play', | |
js_handler=f''' | |
(event)=>{{ | |
let f = new FormData(); | |
f.append('room_code', '{room_code}'); | |
f.append('cmd', 'play'); | |
f.append('vid', {vrflength}); | |
f.append('pos', event.target.currentTime); | |
fetch('/post-command', {{ | |
method: 'POST', body: f | |
}}); | |
}} | |
''' | |
).on( | |
'pause', | |
js_handler=f''' | |
(event)=>{{ | |
let f = new FormData(); | |
f.append('room_code', '{room_code}'); | |
f.append('cmd', 'pause'); | |
f.append('vid', {vrflength}); | |
f.append('pos', event.target.currentTime); | |
fetch('/post-command', {{ | |
method: 'POST', body: f | |
}}); | |
}} | |
''' | |
).on( | |
'seeked', | |
js_handler=f''' | |
(event)=>{{ | |
let f = new FormData(); | |
f.append('room_code', '{room_code}'); | |
f.append('cmd', 'seek'); | |
f.append('vid', {vrflength}); | |
f.append('pos', event.target.currentTime); | |
fetch('/post-command', {{ | |
method: 'POST', body: f | |
}}); | |
}} | |
''' | |
) | |
) | |
with ui.row().classes('w-full'): | |
ui.chip(post_upload_status.file_type).props('outline') | |
ui.chip(post_upload_status.file_size).props('outline color=secondary') | |
ui.space() | |
ui.button(icon='delete').props('rounded flat color=red') | |
# file upload form ui | |
def file_form(file_type: Literal['video', 'image', 'audio']): | |
d = ui.dialog(value=True) | |
with d: | |
with ui.card().classes('absolute-center').classes('w-full'): | |
ui.label(f'Post an {file_type.title()}').style('font-size: 1.5rem') | |
post_upload_status = UploadWorkerMessage(status=False, message='', room_code=room_code) | |
def pu_h(e: events.UploadEventArguments): | |
upload_video_to_room(e, post_upload_status) | |
post_btn.enable() | |
post_upload = ui.upload( | |
on_upload=pu_h, | |
auto_upload=True, | |
label=f'Select {file_type} file').classes('w-full') | |
post_input = ui.input('Caption', placeholder='An amazing post').classes('w-full') | |
def post_btn_h(): | |
# verify the upload has been done successfuly | |
if not post_upload_status.status: | |
ui.notify('Can not create post', color='red') | |
print(post_upload_status.message) | |
return | |
post_upload_status.caption = post_input.value | |
create_new_post_in_the_room(post_upload_status) | |
ui.notify('Successfully created post.') | |
# updating the post container | |
with post_container: | |
post_card(post_upload_status) | |
post_container_scroll_area.scroll_to(percent=100) | |
d.set_value(False) | |
post_btn = ui.button('Post', on_click=post_btn_h).classes('w-full') | |
post_btn.disable() | |
# video command system | |
video_reference_list = [] # to hold the references of the vidoes on the page | |
def cmd_play_at(video_reference_index: int, video_position: float): | |
video_reference_list[video_reference_index].seek(video_position) | |
video_reference_list[video_reference_index].play() | |
print(f'Playing at {video_position}') | |
# ui.video() | |
def cmd_pause_at(video_reference_index: int, video_position: float): | |
video_reference_list[video_reference_index].seek(video_position) | |
video_reference_list[video_reference_index].pause() | |
print(f'Pausing at {video_position}') | |
def cmd_seek_to(video_reference_index: int, video_position: float): | |
video_reference_list[video_reference_index].seek(video_position) | |
print(f'Seeked to {video_position}') | |
video_command_references = { | |
'play': cmd_play_at, | |
'pause': cmd_pause_at, | |
'seek': cmd_seek_to | |
} | |
# continuously monitor the command | |
app.storage.user['last-command'] = '' | |
ui.notify(app.storage.user) | |
room_name = app.storage.general['rooms'][room_code]['name'] | |
ui.page_title(room_name) | |
with ui.column().classes('fullscreen'): | |
with ui.row(align_items='center').classes('full-width shadow-4 bg-primary text-white').style('padding: 1rem'): | |
ui.label(f'🎉 {room_name} 🎉').style('font-size:1.5rem') | |
ui.space() | |
def monitor_cmd(v): | |
try: | |
current_cmd = app.storage.general['rooms'][room_code]['linear-command'] | |
except: | |
return v | |
if not app.storage.user.get('last-command'): | |
app.storage.user['last-command'] = '' | |
if app.storage.user['last-command'] != current_cmd: | |
print(current_cmd) | |
# execute the command | |
cmd, vid, pos = app.storage.general['rooms'][room_code]['command'] | |
try: | |
video_command_references[cmd](vid, pos) | |
except Exception as e: | |
print(e) | |
app.storage.user['last-command'] = current_cmd | |
return v | |
ui.chip().bind_text_from(app.storage.general['rooms'][room_code], 'linear-command', backward=monitor_cmd) | |
ui.space() | |
ui.button('Exit').props('icon-right=logout flat color=inversePrimary rounded') | |
# action button to post | |
with ui.page_sticky('bottom-right').style('z-index:1000'): | |
with ui.element('q-fab').props('icon=post_add direction=left').style('margin:2rem'): | |
ui.element('q-fab-action').props('icon=movie label=Video').on('click', lambda: file_form('video')) | |
ui.element('q-fab-action').props('icon=image label=Image').on('click', lambda: file_form('image')) | |
ui.element('q-fab-action').props('icon=music_note label=Audio').on('click', lambda: file_form('audio')) | |
ui.element('q-fab-action').props('icon="img:https://upload.wikimedia.org/wikipedia/commons/0/09/YouTube_full-color_icon_%282017%29.svg" label="youtube"') | |
with ui.scroll_area().classes('full-width full-height') as post_container_scroll_area: | |
with ui.row() as post_container: | |
''' | |
{ | |
'src': file_src, | |
'type': file_type, | |
'caption': post_caption, | |
'file_name': file_name, | |
'file_size': file_size | |
} | |
''' | |
for i in app.storage.general['rooms'][room_code]['posts']: | |
post_card( | |
UploadWorkerMessage( | |
room_code=room_code, | |
caption=i['caption'], | |
file_type=i['type'], | |
file_src=i['src'], | |
file_size=i['file_size'] | |
) | |
) | |
host = '0.0.0.0' | |
port = 8999 | |
website = f'http://223.229.180.169:{port}/' | |
ui.run(host=host, port=port, show=False, storage_secret='bin-san-2002', favicon='🎉') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment