Skip to content

Instantly share code, notes, and snippets.

@mguijarr
Created March 28, 2020 16:13
Show Gist options
  • Save mguijarr/aaed379574c0fc51639a19d8c4c5ab39 to your computer and use it in GitHub Desktop.
Save mguijarr/aaed379574c0fc51639a19d8c4c5ab39 to your computer and use it in GitHub Desktop.
Dummy Tango device + gevent thread + writer thread example using TgGevent
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
import threading # assuming threading is **not** monkey-patched
from gevent._threading import Queue # the genuine, unpatched Queue class (thread-safe)
import gevent
import gevent.event
import TgGevent
class WriterThread(threading.Thread):
def __init__(self, job_queue):
super().__init__()
self.job_queue = job_queue
def run(self):
print("hello, I am the writing thread", threading.get_ident())
while True:
job_desc, data = self.job_queue.get()
if job_desc == "CREATE":
print("writing: should create new file", data)
elif job_desc == "WRITE":
print("writing: should write", data)
elif job_desc == "CLOSE":
print("writing: should close")
break
class SessionListener:
def __init__(self, session_name):
self.writer_tasks = Queue() #this is the thread-safe queue that will be shared btw listening thread and writing thread
self.writer_thread = WriterThread(self.writer_tasks) #create the writing thread
self.task = gevent.spawn(self.listen, session_name) #start scan listening greenlet
self._listening_event = gevent.event.Event()
def wait_listening_started(self):
return self._listening_event.wait()
def listen(self, session_name):
self.writer_thread.start()
print("Ok, listening to", session_name, "in thread id=", threading.get_ident())
self._listening_event.set() # synchronise with caller
i = 0
self.writer_tasks.put(("CREATE","/data/blablabla/file.h5"))
while True:
gevent.sleep(0.5) #simulate communication with redis via BLISS API
i+=1
self.writer_tasks.put(("WRITE", i))
if i == 10:
# let's say scan is done
self.writer_tasks.put(("CLOSE", None))
self.writer_thread.join()
break
def scan_done(self):
return self.task.ready()
class NexusWriterTangoDev(Device):
def __init__(self, *args, **kwargs):
Device.__init__(self, *args, **kwargs)
self.session_listener = None
@command(dtype_in=str)
def start_listening(self, session_name):
# make an instance of SessionListener in another thread
self.session_listener = TgGevent.get_proxy(SessionListener, session_name)
self.session_listener.wait_listening_started()
@command(dtype_out=bool)
def check_is_running(self):
return self.session_listener and not self.session_listener.scan_done()
if __name__ == "__main__":
run((NexusWriterTangoDev,))
# corresponding YML config:
#- device:
# - class: NexusWriterTangoDev
# tango_name: id00/tango/nx_writer
# personal_name: test
# server: nexus_writer_test_tango_dev
#
# Start with:
# python nexus_writer_test_tango_dev.py test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment