Skip to content

Instantly share code, notes, and snippets.

@artificialexit
Created May 31, 2013 04:56
Show Gist options
  • Save artificialexit/5683011 to your computer and use it in GitHub Desktop.
Save artificialexit/5683011 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
import sys
sys.path.insert(1, '/xray/progs/Python/libraries')
from threading import Thread
from dcss import Server
from beamline import variables
from beamline import redis
class RedisDHS(Server):
def __init__(self):
super(RedisDHS, self).__init__('redis')
self.redis_thread = Thread(target=self.redis_watcher)
self.redis_thread.setDaemon(True)
self.redis_thread.start()
self.strings = {}
def redis_watcher(self):
s = redis.pubsub()
s.subscribe('updates')
for msg in s.listen():
if msg['type'] == 'message':
key = msg['data']
if key in self.strings.keys():
self.set_redis_string(key)
def set_redis_string(self, key):
value = redis.get(key)
if value:
self.set_string(self.strings[key], value)
def set_string(self, string, value):
self.send_xos3('htos_set_string_completed %s normal %s' % (string, value))
def stoh_register_string(self, device, redis_key):
self.strings[redis_key] = device
self.set_redis_string(redis_key)
if __name__ == '__main__':
r = RedisDHS()
r.debug = True
r.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment