Skip to content

Instantly share code, notes, and snippets.

@richardingham
Created January 24, 2014 15:17
Show Gist options
  • Save richardingham/31f6f8efa47771c2ed02 to your computer and use it in GitHub Desktop.
Save richardingham/31f6f8efa47771c2ed02 to your computer and use it in GitHub Desktop.
Control sequence for two-step hydrogenation with reservoir
from octopus.transport.basic import serial, tcp
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt
from octopus.data.manipulation import Max, Min
from octopus.sequence.control import StateMonitor
from octopus.sequence.util import Looping, Dependent
from octopus.image import source, tracker
from octopus.notifier import sms
from octopus.runtime import *
title("Two-step Hydration and Hydrogenation")
id("hydrog-twostep")
r = vapourtec.R2R4(tcp("192.168.15.156", 9002))
k = knauer.K120(serial("/dev/ttyUSB0", baudrate = 9600))
h = thalesnano.HCube(serial("/dev/ttyUSB1", baudrate = 9600))
v = vici.MultiValve(serial("/dev/ttyUSB2", baudrate = 9600))
t = tracker.SingleBlobTracker(source.cv_webcam(0))
ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"])
#
# Experiment Parameters
#
ir_start_trigger = 200
ir_low_trigger = 170
min_height_trigger = 70
start_height_trigger = 100
max_height_trigger = 360
knauer_flow_rate = 100
vapourtec_flow_rate = 100
column_dead_volume = 3000
hcube_dead_volume = 3000
connecting_tube_vol = 1000
sms_phone_no = "447941123456"
#
# Logic for SMS notification
#
smsnotifier = sms.ClockworkSMS("enter-api-key-here")
def sms_notify (msg):
msg = "2 step hydrog experiment: %s" % msg
smsnotifier.notify(sms_phone_no, msg)
#
# Logic to discard output when parameters are not within acceptable levels.
#
class StateFlip (Looping, Dependent):
interval = 0.5
def __init__ (self, pass_fn = None, fail_fn = None):
Looping.__init__(self)
# Depending on the implementation of Python,
# __builtins__ can be a module or its dict.
try:
b_set = __builtins__.set
except AttributeError:
b_set = __builtins__["set"]
self.tests = b_set()
self.pass_fn = pass_fn
self.fail_fn = fail_fn
self._steps = []
self._triggered = False
def add (self, test):
self.tests.add(test)
def remove (self, test):
self.tests.discard(test)
def _iterate (self):
if (not self._triggered) and (not all(self.tests)):
try:
step = self.fail_fn()
except TypeError:
pass
step.reset()
step.run()
self._steps.append(step)
self._triggered = True
elif (not self._triggered) and (not all(self.tests)):
try:
step = self.pass_fn()
except TypeError:
pass
step.reset()
step.run()
self._steps.append(step)
self._triggered = False
def _cancel (self):
from twisted.internet import defer
Looping._cancel(self)
d = []
for step in self._steps:
try:
d.append(step.cancel())
except:
pass
return defer.getherResults(d)
#
# State Monitor Definition
#
output_valve_to_waste = set(v.position, 1)
output_valve_to_collect = set(v.position, 2)
intermediate_valve_to_waste = set(r.output, "waste")
intermediate_valve_to_collect = set(r.output, "collect")
h_cube_monitor = StateFlip(
pass_fn = sequence(
output_valve_to_collect,
log("H-Cube parameters OK"),
),
fail_fn = sequence(
output_valve_to_waste,
log("H-Cube parameters not OK"),
)
)
h_cube_monitor.add(Min(h.column_temperature, hcube_dead_volume / knauer_flow_rate * 60) >= 95)
vapourtec_monitor = StateFlip(
pass_fn = sequence(
intermediate_valve_to_collect,
log("Vapourtec parameters OK"),
),
fail_fn = sequence(
intermediate_valve_to_waste,
log("Vapourtec parameters not OK"),
)
)
vapourtec_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95)
vapourtec_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10)
vapourtec_monitor.add(ir.product > ir_low_trigger)
system_monitor = StateMonitor()
system_monitor.add(h.system_pressure >= 4)
system_monitor.add(r.pump2.airlock < 10)
system_monitor.step = sequence(
log("Loss of pressure in system"),
call(sms_notify, "Loss of pressure in system"),
wait("5m")
)
#
# Experiment control sequence
#
hydrog = sequence(
wait_until(r.pump2.input == "solvent"),
log("Washing column"),
cancel(vapourtec_monitor),
wait((column_dead_volume / vapourtec_flow_rate * 60) + (connecting_tube_vol / vapourtec_flow_rate * 60)),
parallel(
sequence(
set(r.output, "waste"),
wait("1m"),
set(r.power, "off")
),
sequence(
log("Using up remaining intermediate"),
wait_until(t.height < min_height_trigger),
cancel(h_cube_monitor),
log("End of hydrogenation"),
call(h.stop_release_hydrogen),
set(k.power, "off")
)
)
)
hydrog.dependents.add(h_cube_monitor)
hydrog.dependents.add(vapourtec_monitor)
hydrog.dependents.add(system_monitor)
rxn = sequence(
log("Starting up..."),
set(r.heater3.target, 100),
set(r.pump2.target, 100),
set(h.hydrogen_mode, "full"),
set(h.column_temperature_target, 100),
set(r.power, "on"),
wait_until(r.heater3.temp > 95),
parallel(
sequence(
log("Injecting reagent"),
set(r.pump2.input, "reagent"),
wait("2h"),
log("Stop injecting reagent"),
set(r.pump2.input, "solvent"),
),
sequence(
log("Waiting for conversion to intermediate"),
wait_until(ir.product > ir_start_trigger),
log("Collecting intermediate"),
set(r.output, "collect"),
wait_until(t.height > start_height_trigger),
log("Starting knauer pump"),
set(k.rate, knauer_flow_rate),
set(k.power, "on"),
wait("30s"),
log("Starting hydrogenation"),
call(h.start_hydrogenation),
log("Waiting for temperature"),
wait_until(h.column_temperature >= 99),
log("Waiting for stability"),
wait_until(h.message == "Stable."),
hydrog,
)
)
)
run(rxn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment