Skip to content

Instantly share code, notes, and snippets.

@devmessias
Created August 25, 2021 12:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devmessias/aade53a647cae602aa2398cd0decba18 to your computer and use it in GitHub Desktop.
Save devmessias/aade53a647cae602aa2398cd0decba18 to your computer and use it in GitHub Desktop.
vtk_queue thread
from queue import Queue
import queue
from threading import Thread
import numpy as np
import time
import vtk
import threading
queue_of_functions = Queue()
_FINISH_SEAMPHORE = 'FINISH'
class VTKExample:
def __init__(self):
self._started = False
pass
def start(self):
renderer = vtk.vtkRenderer()
renderWindow = vtk.vtkRenderWindow()
renderWindow.AddRenderer(renderer)
renderWindowInteractor = vtk.vtkRenderWindowInteractor()
renderWindowInteractor.SetRenderWindow(renderWindow)
renderWindowInteractor.GetInteractorStyle().SetCurrentStyleToTrackballCamera()
cone = vtk.vtkConeSource()
mapper = vtk.vtkPolyDataMapper()
actor = vtk.vtkActor()
mapper.SetInputConnection(cone.GetOutputPort())
actor.SetMapper(mapper)
renderer.AddActor(actor)
renderer.ResetCamera()
renderWindow.Render()
self.renderer = renderer
self.renderWindow = renderWindow
self.renderWindowInteractor = renderWindowInteractor
self._started = True
self.run()
def rotate(self, rotValue):
if not self._started:
return
self.renderer.GetActiveCamera().Azimuth(rotValue)
def run(self):
while self.renderWindowInteractor.GetDone() is False:
start = time.perf_counter()
self.renderWindowInteractor.ProcessEvents()
action = ''
try:
func = queue_of_functions.get(False)
func()
except queue.Empty:
pass
#print('is empty')
else:
pass
# do other stuff here
self.renderWindow.Render()
end = time.perf_counter()
# throttle to 100fps to avoid busy wait
timePerFrame = 0.01
if end - start < timePerFrame:
time.sleep(timePerFrame - (end - start))
if action == 'FINISH':
print('Finalize stuff')
#break
vtkobj = VTKExample()
def start():
vtkobj.start()
vtkobj.run()
def update_rotation(vtkobj):
rotValue = 0
for i in range(100):
rotValue+=1
queue_of_functions.put(lambda : vtkobj.rotate(rotValue))
time.sleep(.1)
#queue_of_functions.put(_FINISH_SEAMPHORE)
t = time.time()
t1 = threading.Thread(target=start, args=())
t2 = threading.Thread(target=update_rotation, args=(vtkobj,))
t1.start()
t2.start()
# doing other stuff here non blocking
for i in range(1000):
print(f'doing {i}')
time.sleep(.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment