Skip to content

Instantly share code, notes, and snippets.

@richardingham
Created January 24, 2014 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save richardingham/f2117b9dc7504d6e1942 to your computer and use it in GitHub Desktop.
Save richardingham/f2117b9dc7504d6e1942 to your computer and use it in GitHub Desktop.
Control sequence for two-step experiments.
from octopus.transport.basic import serial, tcp
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt
from octopus.data.manipulation import Max, Min
from octopus.sequence.control import StateMonitor
from octopus.sequence.util import Looping, Dependent
from octopus.image import source, tracker
from octopus.runtime import *
r = vapourtec.R2R4(tcp("192.168.15.156", 9002))
h = thalesnano.HCube(tcp("192.168.15.156", 9004))
v = vici.MultiValve(tcp("192.168.15.153", 9006))
k = knauer.K120(tcp("192.168.15.156", 9002))
t = tracker.SingleBlobTracker(source.cv_webcam(0))
ir = mt.ICIR(tcp("192.168.15.242", 8124), stream_names = ["product"])
title("Two-step Hydration and Hydrogenation Experiments")
id("hydrog-twostep-reservoir")
#
# Experiment Parameters
#
ir_start_trigger = 200
ir_low_trigger = 170
min_height_trigger = 70
start_height_trigger = 100
max_height_trigger = 360
knauer_flow_rate = 100
vapourtec_flow_rate = 100
column_dead_volume = 3000
hcube_dead_volume = 3000
connecting_tube_vol = 1000
sms_phone_no = "447941123456"
#
# Logic for SMS notification
#
smsnotifier = sms.ClockworkSMS("enter-api-key-here")
def sms_notify (msg):
msg = "2 step hydrog experiment: %s" % msg
smsnotifier.notify(sms_phone_no, msg)
#
# Define parameters to be displayed on remote monitoring interface.
#
ui(
traces = [{
"title": "Vapourtec Pressures",
"unit": "",
"traces": [r.pressure, r.pump2.pressure],
"maxtime": 30 * 60
}, {
"title": "H-Cube Pressures",
"unit": "",
"traces": [h.inlet_pressure, h.system_pressure],
"maxtime": 30 * 60
}, {
"title": "IR output",
"unit": "mAU",
"traces": [ir.product],
"maxtime": 30 * 60
}],
properties = [
r.power,
r.pump2.target,
r.heater3.temp,
r.output,
t.height,
t.image,
k.rate,
h.hydrogen_mode,
h.system_pressure_target,
h.system_pressure,
h.column_temperature_target,
h.column_temperature,
h.state,
h.message,
v.position
]
)
#
# Logic to discard output when parameters are not within acceptable levels.
#
class StateFlip (Looping, Dependent):
interval = 0.5
def __init__ (self, pass_fn = None, fail_fn = None):
Looping.__init__(self)
# Depending on the implementation of Python,
# __builtins__ can be a module or its dict.
try:
b_set = __builtins__.set
except AttributeError:
b_set = __builtins__["set"]
self.tests = b_set()
self.pass_fn = pass_fn
self.fail_fn = fail_fn
self._steps = []
self._triggered = False
def add (self, test):
self.tests.add(test)
def remove (self, test):
self.tests.discard(test)
def _iterate (self):
if (not self._triggered) and (not all(self.tests)):
try:
step = self.fail_fn()
except TypeError:
pass
step.reset()
step.run()
self._steps.append(step)
self._triggered = True
elif (not self._triggered) and (not all(self.tests)):
try:
step = self.pass_fn()
except TypeError:
pass
step.reset()
step.run()
self._steps.append(step)
self._triggered = False
def _cancel (self):
from twisted.internet import defer
Looping._cancel(self)
d = []
for step in self._steps:
try:
d.append(step.cancel())
except:
pass
return defer.getherResults(d)
#
# State Monitor Definition
#
collect_monitor = StateFlip(
pass_fn = sequence(
set(r.output, "collect"),
log("Vapourtec parameters OK"),
),
fail_fn = sequence(
set(r.output, "waste"),
log("Vapourtec parameters not OK"),
)
)
# Collection should resume as soon as IR response is OK.
collect_monitor.add(ir.product > ir_low_trigger)
# Temperature and pressure must have been OK for one column volume for collection to
# be allowed.
collect_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95)
collect_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10000)
# The Vapourtec R2 publishes an "airlock" parameter which goes very high
# if there is a drop in pressure - usually this indicates an air bubble
# in the pump.
vapourtec_monitor = StateMonitor()
vapourtec_monitor.add(r.pump2.airlock < 10000)
vapourtec_monitor.step = sequence(
log("Airlock problem in R2"),
call(sms_notify, "Airlock problem in R2"),
wait("5m")
)
hcube_monitor = StateMonitor()
hcube_monitor.add(h.system_pressure >= 4)
hcube_monitor.step = sequence(
log("Pressure loss at H-Cube"),
call(sms_notify, "Pressure loss at H-Cube"),
wait("5m")
)
#
# Generate the control sequence for a hydrogenation reaction with
# a particular set of conditions.
#
_current_state = [25, None, 200] # temp, pressure, rate
def reaction (name, pressure, temp, rate, valve_pos):
global _current_state
old_state, _current_state = _current_state, [temp, pressure, rate]
# Determine which parameters need to change
change_t = (old_state[0] != temp)
change_p = (old_state[1] != pressure)
change_r = (old_state[2] != rate)
# A pressure change requires the H-Cube to be switched off and on again.
if change_p:
# Define a sequence to carry out the pressure change
if pressure == "full":
change_h2 = sequence(
set(h.system_pressure_target, 0),
set(h.hydrogen_mode, "full"),
)
pressure_wait = (h.state == "running")
else:
change_h2 = sequence(
set(h.hydrogen_mode, "controlled"),
set(h.system_pressure_target, pressure),
)
pressure_wait = (h.system_pressure >= pressure)
change_step = sequence(
log("Switch off hydrogen to change pressure"),
call(h.stop_keep_hydrogen),
log("Change pressure to %s ..." % pressure),
change_h2,
log("Change temp to %s" % temp),
set(h.column_temperature_target, temp),
log("Change rate to %s" % rate),
set(k.target, rate),
log("Starting H-Cube"),
parallel(
wait(((hcube_dead_volume / rate) + 1) * 60),
sequence(
call(h.start_hydrogenation),
do_if(h.state != "running",
log("Please check that the H-cube is running OK")
),
wait_until(pressure_wait),
),
),
)
else:
# Changing rate is easy - just wait one column volume
if change_r:
a = sequence(
log("Change rate to %s" % rate),
set(k.target, rate),
log("Wait one dead volume"),
wait(((hcube_dead_volume / rate) + 1) * 60),
)
else:
a = log("Rate not changed")
# Alter temperature - need to wait until temperature has
# changed before waiting a column volume.
if change_t:
if temp > old_state[0]:
# Wait to heat up
temp_wait = h.column_temperature >= temp
else:
# Wait to cool down
temp_wait = h.column_temperature < temp + 5
b = sequence(
log("Change temp to %s" % temp),
set(h.column_temperature_target, temp),
log("Waiting for temp. stability"),
wait_until(temp_wait),
)
else:
b = log("Temp not changed")
# These two can be carried out in parallel
# to minimise waiting.
change_step = parallel(a, b)
return sequence(
log("Checking volume of reservoir before next run"),
do_if(t.height < start_height_trigger, sequence(
log("Stopping H-cube until reservoir refills"),
call(h.stop_keep_hydrogen),
call(hcube_monitor.pause), # Stopping pump will trigger error.
set(k.power, "off"),
wait_until(t.height > start_height_trigger),
log("Restarting"),
set(k.power, "on"),
wait("30s"),
call(hcube_monitor.resume),
)),
log_output(name),
log("Preparing reaction #%s" % name),
change_step,
log("Starting reaction #%s" % name),
wait(((dead_volume / rate) + 1) * 60),
log("Taking a sample now"),
set(v.position, valve_pos),
wait("10m"),
log("Collection complete"),
# Move valve back to waste
set(v.position, 1),
)
#
# Experiment control sequence
#
# A wait timer that will be used in the sequence
# Defining it here as a variable allows it to be
# cancelled from a different part of the sequence.
wait_for_reagent_to_be_used_up = wait("12h")
setup = sequence(
log("Starting up..."),
set(r.pressure_limit, 25000),
set(r.pump2.target, 200),
set(r.heater3.target, 100),
set(r.power, "on"),
wait_until(r.heater3.temp > 95),
)
# Main reaction sequence
rxn = parallel(
sequence(
# Waiting for conversion to intermediate
wait_until(ir.product > ir_start_trigger),
log("Collecting intermediate"),
set(r.output, "collect"),
call(rxn.dependents.add, collect_monitor),
# Flush vapourtec reactor
wait_until(r.pump2.input == "solvent"),
wait((column_dead_volume / vapourtec_flow_rate * 60) + (connecting_tube_vol / vapourtec_flow_rate * 60)),
cancel(collect_monitor),
cancel(vapourtec_monitor),
set(r.output, "waste"),
set(r.heater3.target, 25),
wait("20m"),
set(r.power, "off"),
),
sequence(
log("Injecting reagent"),
set(r.pump2.input, "reagent"),
wait_for_reagent_to_be_used_up,
log("Stop injecting reagent"),
set(r.pump2.input, "solvent"),
),
sequence(
# Wait for reservoir to fill up
wait_until(t.height > start_height_trigger),
set(v.position, 1)
log("Starting knauer pump"),
set(k.rate, 100),
set(k.power, "on"),
wait("30s"),
call(rxn.dependents.add, hcube_monitor),
## First Run:
reaction(1 , "full", 100, 100, 2),
reaction(2 , 20 , 100, 100, 3),
reaction(3 , 20 , 40, 200, 4),
reaction(4 , "full", 40, 100, 5),
reaction(5 , 20 , 40, 200, 6),
reaction(6 , 20 , 40, 100, 7),
reaction(7 , 20 , 100, 200, 8),
reaction(8 , "full", 40, 200, 9),
## Second Run:
#reaction(9 , 20 , 40, 100, 2),
#reaction(10, "full", 100, 100, 3),
#reaction(11, "full", 40, 100, 4),
#reaction(12, "full", 100, 200, 5),
#reaction(13, "full", 100, 200, 6),
#reaction(14, 20 , 100, 100, 7),
#reaction(15, "full", 40, 200, 8),
#reaction(16, 20 , 100, 200, 9),
log("Complete, releasing hydrogen"),
call(h.stop_release_hydrogen),
cancel(wait_for_reagent_to_be_used_up),
log("Flushing"),
set(k.target, 200),
wait("20m"),
set(k.power, "off")
)
)
rxn.dependents.add(vapourtec_monitor)
run(sequence(
setup,
rxn
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment