Skip to content

Instantly share code, notes, and snippets.

@mrcrilly
Last active August 29, 2015 13:57
Show Gist options
  • Save mrcrilly/9770465 to your computer and use it in GitHub Desktop.
Save mrcrilly/9770465 to your computer and use it in GitHub Desktop.
SQLAlchemy Threading
db_engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
dbo = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=db_engine))
class ThreadedCollector(threading.Thread):
def __init__(self, collector, queue):
self.collector = collector
self.queue = queue
threading.Thread.__init__(self)
def run(self):
while True:
device = self.queue.get(False)
self.compute_stats(device)
self.queue.task_done()
def compute_stats(self, device):
if device:
# This simply calls a method that does a GET against a remote API
cpustats = self.collector.stat(device, 1755)
if cpustats:
for epoch in cpustats['stats']:
query = dbo.query(CPUStatistic).filter(CPUStatistic.epoch==epoch).filter(CPUStatistic.device==device)
if query.first():
continue
else:
dbo.add(CPUStatistic(epoch, cpustats['stats'][epoch], device))
# Commiting here in one step so as not to batter the DB
dbo.commit()
class APICollector(restful.Resource):
def __init__(self):
self.queue = Queue.Queue(0)
@ath.login_required
def get(self):
self.collector = collector.Collector(app.config['SL_API_USER'], app.config['SL_API_PASS'], app.config['SL_API_URI'])
self.collector.connect()
self.collector.collectdevices()
self.collector.collectorganisations()
if self.collector.devices and self.collector.organisations:
# ...
# ..
for dev in self.collector.devices:
d = dbo.query(Device).filter(Device.name==dev['description']).first()
self.queue.put(d)
for i in xrange(10):
t = ThreadedCollector(self.collector, self.queue)
#t.setDaemon(True)
t.start()
# self.queue.join()
return 'Done', 200
else:
None, 404
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment