Skip to content

Instantly share code, notes, and snippets.

@richardingham
Created January 24, 2014 15:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save richardingham/0a58a291bad2e3b9009f to your computer and use it in GitHub Desktop.
Save richardingham/0a58a291bad2e3b9009f to your computer and use it in GitHub Desktop.
Control sequence for single step hydrolysis
from octopus.transport.basic import serial, tcp
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt
from octopus.data.manipulation import Max, Min
from octopus.sequence.control import StateMonitor
from octopus.sequence.util import Looping, Dependent
from octopus.image import source, tracker
from octopus.runtime import *
title("Zirconia Hydrolysis")
id("zirconia-hydrolysis")
r = vapourtec.R2R4(tcp("192.168.15.156", 9002))
ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"])
#
# Experiment Parameters
#
ir_start_trigger = variable(100, "start", "start")
ir_low_trigger = variable(80, "low", "low")
vapourtec_flow_rate = 200
column_dead_volume = 5000
sms_phone_no = "447941123456"
#
# Logic for SMS notification
#
smsnotifier = sms.ClockworkSMS("enter-api-key-here")
def sms_notify (msg):
msg = "2 step hydrog experiment: %s" % msg
smsnotifier.notify(sms_phone_no, msg)
#
# Logic to discard output when parameters are not within acceptable levels.
#
# StateFlip is a monitor which observes a set of expressions. If one of these
# tests fails, then fail_fn is called. When all of the expressions pass again,
# pass_fn is called. This sequence runs in a loop.
class StateFlip (Looping, Dependent):
interval = 0.5
def __init__ (self, pass_fn = None, fail_fn = None):
Dependent.__init__(self)
Looping.__init__(self)
# Depending on the implementation of Python,
# __builtins__ can be a module or its dict.
try:
b_set = __builtins__.set
except AttributeError:
b_set = __builtins__["set"]
self.tests = b_set()
self.pass_fn = pass_fn
self.fail_fn = fail_fn
self._step = None
self._ok = False
def add (self, test):
self.tests.add(test)
def remove (self, test):
self.tests.discard(test)
def _iterate (self):
step = None
if (self._ok) and (not all(self.tests)):
try:
step = self.fail_fn()
except TypeError:
pass
self._ok = False
elif (not self._ok) and all(self.tests):
try:
step = self.pass_fn()
except TypeError:
pass
self._ok = True
if step is not None:
if self._step is not None:
try:
self._step.cancel()
self._step.log -= self.log
except:
pass
step.log += self.log
step.reset()
step.run()
self._step = step
def _cancel (self):
Looping._cancel(self)
try:
self._step.log -= self.log
return d.append(self._step.cancel())
except:
pass
#
# State Monitor Definition
#
collect_monitor = StateFlip(
pass_fn = sequence(
set(r.output, "collect"),
log("Vapourtec parameters OK"),
),
fail_fn = sequence(
set(r.output, "waste"),
log("Vapourtec parameters not OK"),
)
)
# Collection should resume as soon as IR response is OK.
collect_monitor.add(ir.product > ir_low_trigger)
# Temperature and pressure must have been OK for one column volume for collection to
# be allowed.
collect_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95)
collect_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10000)
# The Vapourtec R2 publishes an "airlock" parameter which goes very high
# if there is a drop in pressure - usually this indicates an air bubble
# in the pump.
vapourtec_monitor = StateMonitor()
vapourtec_monitor.add(r.pump2.airlock < 10000)
vapourtec_monitor.step = sequence(
log("Airlock problem in R2"),
call(sms_notify, "Airlock problem in R2"),
wait("5m") # Send SMS max once every 5 min!
)
#
# Experiment control sequence
#
long_wait = wait("8h")
monitored_sequence = sequence(
log("Injecting reagent"),
set(r.pump2.input, "reagent"),
long_wait,
log("Stop injecting reagent"),
set(r.pump2.input, "solvent")
)
monitored_sequence.dependents.add(vapourtec_monitor)
monitored_sequence.dependents.add(collect_monitor)
rxn = sequence(
log("Starting up..."),
set(r.pressure_limit, 25000),
set(r.heater3.target, 100),
set(r.pump2.target, vapourtec_flow_rate),
set(r.power, "on"),
wait_until(r.heater3.temp > 95),
wait_until(ir.product > ir_start_trigger),
set(r.output, "collect"),
monitored_sequence,
wait_until(ir.product < ir_low_trigger),
wait("20m"),
set(r.power, "off"),
log("Complete")
)
run(rxn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment