Skip to content

Instantly share code, notes, and snippets.

@filipinascimento
Created May 26, 2021 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filipinascimento/9c8688722cdef9fd8d3ff64a73e9e18e to your computer and use it in GitHub Desktop.
Save filipinascimento/9c8688722cdef9fd8d3ff64a73e9e18e to your computer and use it in GitHub Desktop.
from time import sleep
import multiprocessing
import numpy as np
createWTRC = True
online = True
flipImg = True # feito em cpu
options = {
#"frame_size_reduction": 25,
}
imgSize = (600, 600)
dtype = 'uint8'
def process(s_arr):
import uvicorn
from vidgear.gears.asyncio import WebGear_RTC
import uvicorn, asyncio, cv2
from av import VideoFrame
from aiortc import VideoStreamTrack
from vidgear.gears.asyncio.helper import reducer
class Custom_RTCServer(VideoStreamTrack):
"""
Custom Media Server using OpenCV, an inherit-class
to aiortc's VideoStreamTrack.
"""
def __init__(self, source=None):
super().__init__()
self.image = np.random.randint(
0, 255, (imgSize[0], imgSize[1], 3),
dtype=np.uint8)
self.ang = 0
async def recv(self):
pts, time_base = await self.next_timestamp()
self.image = np.frombuffer(s_arr, dtype).reshape(
(imgSize[0], imgSize[1], 3)).astype('uint8')
if flipImg:
# self.image = np.fliplr(self.image)
self.image = np.flipud(self.image)
av_frame = VideoFrame.from_ndarray(self.image)
av_frame.pts = pts
av_frame.time_base = time_base
return av_frame
def terminate(self):
try:
if not (self.stream is None):
self.stream.release()
self.stream = None
except AttributeError:
pass
web = WebGear_RTC(logging=True, **options)
web.config["server"] = Custom_RTCServer()
uvicorn.run(web(), host="localhost", port=8000)
web.shutdown()
if __name__ == '__main__':
import vtk
from vtk.util.numpy_support import vtk_to_numpy
from os.path import join as pjoin
from fury import actor, window, colormap as cmap
import numpy as np
from fury.data.fetcher import fetch_viz_wiki_nw
arr = np.random.randint(0, 255, size=imgSize[0]*imgSize[1]*3).astype(dtype)
arr = np.ones(imgSize[0]*imgSize[1]*3).astype(dtype)
print(arr.shape)
s_arr = multiprocessing.RawArray('B', arr)
arr = None
if createWTRC:
p = multiprocessing.Process(target=process, args=(s_arr,))
p.start()
createRTMP = False
def rtmp(s_arr):
pass
if createRTMP:
p = multiprocessing.Process(target=rtmp, args=(s_arr,))
p.start()
np_array = np.frombuffer(s_arr, dtype)
print('\n', np_array.shape, '\n')
files, folder = fetch_viz_wiki_nw()
categories_file, edges_file, positions_file = sorted(files.keys())
positions = np.loadtxt(pjoin(folder, positions_file))
categories = np.loadtxt(pjoin(folder, categories_file), dtype=str)
edges = np.loadtxt(pjoin(folder, edges_file), dtype=int)
category2index = {category: i
for i, category in enumerate(np.unique(categories))}
index2category = np.unique(categories)
categoryColors = cmap.distinguishable_colormap(nb_colors=len(index2category))
colors = np.array([categoryColors[category2index[category]]
for category in categories])
radii = 1 + np.random.rand(len(positions))
edgesPositions = []
edgesColors = []
for source, target in edges:
edgesPositions.append(np.array([positions[source], positions[target]]))
edgesColors.append(np.array([colors[source], colors[target]]))
edgesPositions = np.array(edgesPositions)
edgesColors = np.average(np.array(edgesColors), axis=1)
sphere_actor = actor.sphere(centers=positions,
colors=colors,
radii=radii*0.5,
theta=8,
phi=8,
)
lines_actor = actor.line(edgesPositions,
colors=edgesColors,
opacity=0.1,
)
scene = window.Scene()
# scene.add(lines_actor)
scene.add(sphere_actor)
scene.set_camera(position=(0, 0, 1000), focal_point=(0.0, 0.0, 0.0),
view_up=(0.0, 0.0, 0.0))
showm = window.ShowManager(scene, reset_camera=False, size=(
imgSize[0], imgSize[1]), order_transparent=False,
# multi_samples=8
)
#render_window = showm.vtkRenderWindow()
render_window = showm.window
if online:
#render_window.SetOffScreenRendering(1)
#render_window.AddRenderer(scene)
#render_window.SetSize(, imgSize[1])
window_to_image_filter = vtk.vtkWindowToImageFilter()
window_to_image_filter.SetInput(render_window)
def callback(caller, timerevent):
#scene.GetActiveCamera().Azimuth(2)
window_to_image_filter.Update()
window_to_image_filter.Modified()
vtk_image = window_to_image_filter.GetOutput()
#h, w, _ = vtk_image.GetDimensions()
vtk_array = vtk_image.GetPointData().GetScalars()
#components = vtk_array.GetNumberOfComponents()
vtkarr = vtk_to_numpy(vtk_array).flatten().astype(dtype)
#print(vtkarr.shape, type(vtkarr), vtkarr.dtype)
np_array[:] = vtkarr
#render_window.Render()
#np_array[:] = np.ones(imgSize[0]*imgSize[1]*3, dtype=dtype)
#np_array[:] = np.ones(imgSize[0]*imgSize[1]*3, dtype=dtype)
showm.initialize()
# Run every 16 milliseconds
showm.add_timer_callback(True, 16, callback)
showm.start()
else:
render_window.AddRenderer(scene)
def callback(caller, timerevent):
scene.GetActiveCamera().Azimuth(2)
render_window.Render()
showm.initialize()
showm.add_timer_callback(True, 16, callback)
showm.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment