Skip to content

Instantly share code, notes, and snippets.

@filipinascimento
Last active May 26, 2021 01:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filipinascimento/3f370354fe02e99b0ee1cb8b3389b20e to your computer and use it in GitHub Desktop.
Save filipinascimento/3f370354fe02e99b0ee1cb8b3389b20e to your computer and use it in GitHub Desktop.
# import required libraries
import uvicorn
from vidgear.gears.asyncio import WebGear_RTC
# import necessary libs
import uvicorn, asyncio, cv2
from av import VideoFrame
from aiortc import VideoStreamTrack
from vidgear.gears.asyncio import WebGear_RTC
from vidgear.gears.asyncio.helper import reducer
import numpy as np
import vtk
from vtk.util.numpy_support import vtk_to_numpy
"""
=======================================================
Visualize Interdisciplinary map of the journals network
=======================================================
The goal of this app is to show an overview of the journals network structure
as a complex network. Each journal is shown as a node and their connections
indicates a citation between two of them.
"""
###############################################################################
# First, let's import some useful functions
from os.path import join as pjoin
from fury import actor, window, colormap as cmap
import numpy as np
###############################################################################
# Then let's download some available datasets.
from fury.data.fetcher import fetch_viz_wiki_nw
files, folder = fetch_viz_wiki_nw()
categories_file, edges_file, positions_file = sorted(files.keys())
###############################################################################
# We read our datasets
positions = np.loadtxt(pjoin(folder, positions_file))
categories = np.loadtxt(pjoin(folder, categories_file), dtype=str)
edges = np.loadtxt(pjoin(folder, edges_file), dtype=int)
###############################################################################
# We attribute a color to each category of our dataset which correspond to our
# nodes colors.
category2index = {category: i
for i, category in enumerate(np.unique(categories))}
index2category = np.unique(categories)
categoryColors = cmap.distinguishable_colormap(nb_colors=len(index2category))
colors = np.array([categoryColors[category2index[category]]
for category in categories])
###############################################################################
# We define our node size
radii = 1 + np.random.rand(len(positions))
###############################################################################
# Lets create our edges now. They will indicate a citation between two nodes.
# OF course, the colors of each edges will be an interpolation between the two
# node that it connects.
edgesPositions = []
edgesColors = []
for source, target in edges:
edgesPositions.append(np.array([positions[source], positions[target]]))
edgesColors.append(np.array([colors[source], colors[target]]))
edgesPositions = np.array(edgesPositions)
edgesColors = np.average(np.array(edgesColors), axis=1)
###############################################################################
# Our data preparation is ready, it is time to visualize them all. We start to
# build 2 actors that we represent our data : sphere_actor for the nodes and
# lines_actor for the edges.
sphere_actor = actor.sphere(centers=positions,
colors=colors,
radii=radii*0.5,
theta=8,
phi=8,
)
lines_actor = actor.line(edgesPositions,
colors=edgesColors,
opacity=0.1,
)
###############################################################################
# All actors need to be added in a scene, so we build one and add our
# lines_actor and sphere_actor.
scene = window.Scene()
scene.add(lines_actor)
scene.add(sphere_actor)
###############################################################################
# The final step ! Visualize and save the result of our creation! Please,
# switch interactive variable to True if you want to visualize it.
interactive = False
# if interactive:
# window.show(scene, size=(600, 600))
# window.record(scene, out_path='journal_networks.png', size=(600, 600))
# ###############################################################################
# # This example can be improved by adding some interactivy with slider,
# # picking, etc. Play with it, improve it!
# scene.window
# vtk_rw = vtk.vtkRenderWindow()
# vtk_win_im = vtk.vtkWindowToImageFilter()
# vtk_win_im.SetInput(vtk_rw)
# vtk_win_im.Update()
# vtk_image = vtk_win_im.GetOutput()
# width, height, _ = vtk_image.GetDimensions()
# vtk_array = vtk_image.GetPointData().GetScalars()
# components = vtk_array.GetNumberOfComponents()
# arr = vtk_to_numpy(vtk_array).reshape(height, width, components)
scene.set_camera(position=(0, 0, 1000), focal_point=(0.0, 0.0, 0.0),
view_up=(0.0, 0.0, 0.0))
render_window = vtk.vtkRenderWindow()
render_window.SetOffScreenRendering(1)
imagesize = (1024,768)
render_window.AddRenderer(scene)
render_window.SetSize(imagesize[0], imagesize[1])
window_to_image_filter = vtk.vtkWindowToImageFilter()
window_to_image_filter.SetInput(render_window)
# various performance tweaks
options = {
"frame_size_reduction": 25,
}
# initialize WebGear_RTC app without any source
web = WebGear_RTC(logging=True,**options)
# create your own Bare-Minimum Custom Media Server
class Custom_RTCServer(VideoStreamTrack):
"""
Custom Media Server using OpenCV, an inherit-class
to aiortc's VideoStreamTrack.
"""
def __init__(self, source=None):
# don't forget this line!
super().__init__()
self.image = np.random.randint(0,255,(imagesize[0], imagesize[1], 3), dtype=np.uint8)
self.ang = 0
# initialize global params
async def recv(self):
"""
A coroutine function that yields `av.frame.Frame`.
"""
# don't forget this function!!!
# get next timestamp
pts, time_base = await self.next_timestamp()
# self.image = np.ascontiguousarray(window.snapshot(scene, fname=None, size=(imagesize[0], imagesize[1]), offscreen=True,
# order_transparent=False, stereo='off',
# multi_samples=0, max_peels=0,
# occlusion_ratio=0.0), dtype=np.uint8)
# scene.azimuth(1)
# render_window.Render()
# self.ang+=0.1
scene.GetActiveCamera().Azimuth(2)
# render_window.Render()
# window_to_image_filter = vtk.vtkWindowToImageFilter()
window_to_image_filter.SetInput(render_window)
window_to_image_filter.Update()
window_to_image_filter.Modified()
vtk_image = window_to_image_filter.GetOutput()
h, w, _ = vtk_image.GetDimensions()
vtk_array = vtk_image.GetPointData().GetScalars()
components = vtk_array.GetNumberOfComponents()
self.image = vtk_to_numpy(vtk_array).reshape(w, h, components)
# self.image[:] = np.random.randint(0,255,(imagesize[0], imagesize[1], 3), dtype=np.uint8)
# reducer frames size if you want more performance otherwise comment this line
# frame = await reducer(frame, percentage=30) # reduce frame by 30%
# contruct `av.frame.Frame` from `numpy.nd.array`
# img = np.random.randint(0,255,(300, 300, 3), dtype=np.uint8)
av_frame = VideoFrame.from_ndarray(self.image)
av_frame.pts = pts
av_frame.time_base = time_base
# return `av.frame.Frame`
return av_frame
def terminate(self):
"""
Gracefully terminates VideoGear stream
"""
# don't forget this function!!!
# terminate
try:
if not (self.stream is None):
self.stream.release()
self.stream = None
except AttributeError:
pass
# assign your custom media server to config with adequate source (for e.g. foo.mp4)
web.config["server"] = Custom_RTCServer()
# run this app on Uvicorn server at address http://localhost:8000/
uvicorn.run(web(), host="localhost", port=8000)
# close app safely
web.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment