Skip to content

Instantly share code, notes, and snippets.

@jsmits
Last active April 18, 2017 07:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsmits/9197749627d087861d7c27bdf6ae6639 to your computer and use it in GitHub Desktop.
Save jsmits/9197749627d087861d7c27bdf6ae6639 to your computer and use it in GitHub Desktop.
"""Controller code design in Python as guideline for implementing this in Fortran."""
# update: 2017-04-05: PID and switch controllers differ enough from the simple and delta controllers
# that a new code structure plan is justified. In my (Sander's) opinion it's better and more
# future-proof to give each controller its own class and implementation. That way, it's easier
# to add new controller implementations.
# action types
# ============
#
# for weirs and orifices
# ----------------------
# - set_crest_level
# - set_discharge_coefficients: to close/open or partially close/open the structure;
# action value = (left fraction, right fraction); values can be NODATA (-9999)
#
# for culverts, channels and pipes
# --------------------------------
# - set_discharge_coefficients: to close/open or partially close/open the structure;
# action value = (left fraction, right fraction); values can be NODATA (-9999)
#
# for pumps
# ---------
# - set_capacity
# - set_levels: to set start and stop levels; action value = (start level, stop level);
# values can be NODATA (-9999)
#
class BaseController(object):
def __init__(self, start_dt=None, start=None, end=None):
self.start_dt = start_dt
self.start = start
self.end = end
self.is_active = False
def run(self, t):
"""
Implement the start_dt, start, end and is_active check in this class, because
it's generic.
If start_dt and start and end are given, we need to check whether the current t
is in between start and end based on the start_dt. Only then should the subclassed
controller run method be called.
:param t - current time
"""
raise NotImplementedError
class DeltaController(BaseController):
def __init__(
self, condition_variable, condition_delta, condition_dt,
measure_group, target_structure, action_type,
action_value):
# condition atributes
self.condition_variable = condition_variable
self.condition_delta = condition_delta
self.condition_dt = condition_dt
# measure_group
self.measure_group = measure_group
# target attributes
self.target_structure = target_structure
self.action_type = action_type
self.action_value = action_value
self.original_value = None # to be set if condition check is true
def check_condition(self):
# TODO: implement
pass
def run(self, t):
"""
:param t - current time
"""
proceed = super(DeltaController, self).run(t) # checks start and end
# TODO: do the self.is_active check and implement a revert to
# original_value
if proceed and self.check_condition():
self.original_value = self.get_original_value(
self.target_structure, self.action_type)
set_action_value(
self.target_structure, self.action_type, self.action_value)
self.is_active = True
class TableController(BaseController):
# init variables and run method are specific for the table controller
# does not require a condition check
#
# The simple controller (one condition check with:
# variable -> operator -> threshold value) will also be handled by this controller.
#
def __init__(
self, condition_variable, condition_operator, measure_group, target_structure,
action_type, control_table):
# condition atributes
self.condition_variable = condition_variable
self.condition_operator = condition_operator
# the control_table contains one or more rows of
# (threshold_value, target_value) pairs.
self.control_table = control_table
# measure_group
self.measure_group = measure_group
# target attributes
self.target_structure = target_structure
self.action_type # action_value is in the control_table
self.original_value = None # to be set if condition check is true
def get_measured_value(self):
# uses the condition_variable to get the measured value from the
# measure_group
# TODO: implement
pass
def check_simple_condition(self):
measured_value = self.get_measured_value()
# condition_value is the first value of the first and only row in the
# control table
condition_value = self.control_table[0][0]
return eval("{} {} {}".format(
measured_value, self.condition_operator, self.condition_value))
def do_action(self):
if len(self.control_table) == 1 and self.check_simple_condition():
# simple condition
self.original_value = self.get_original_value(
self.target_structure, self.action_type)
action_value = self.control_table[0][1]
self.set_action_value(self.target_structure, self.action_type, action_value)
self.is_active = True
else:
# TODO: determine the target value based on the control table and use it
# to set the target_variable on the target_structure
pass
def run(self, t):
proceed = super(TableController, self).run(t) # checks start and end
# TODO: do the self.is_active check and implement a revert to original_value
if proceed:
self.do_action()
class PIDController(BaseController):
# init variables and run method are specific for the PID controller
# condition check and target values comes from a switch table
def __init__(
self, kp, ki, kd, measure_variable, measure_group, setpoint,
target_structure, action_type):
# PID algorithm coefficients
self.kp = kp
self.ki = ki
self.kd = kd
# measure_group
self.measure_variable = measure_variable
self.measure_group = measure_group
self.setpoint = setpoint
# target attributes
self.target_structure = target_structure # e.g. weir
self.action_type = action_type # e.g. set_crest_level
def get_current_value(self):
# TODO: implement; use self.target_structure and self.action_type
pass
def get_action_value(self):
"""
PID magic goes in here.
Calculate an action_value from kp, ki, kd, measure_variable,
measure_group and setpoint. This action_value will be used to correct
the target_variable on the target_structure. The final action_value is
calculated by adding the this action_value to the current_value.
"""
action_value = 1.33 # TODO: this should be calculated
current_value = self.get_current_value()
action_value = current_value + action_value
return action_value
def do_action(self):
action_value = self.get_action_value()
self.set_action_value(
self.target_structure, self.action_type, action_value)
def run(self, t):
"""
:param t - current time
"""
proceed = super(PIDController, self).run(t) # checks start and end
if proceed:
self.do_action()
self.is_active = True
class TimedController(BaseController):
# init variables and run method are specific for the timed controller
def __init__(self, start_dt, control_table, target_structure, action_type):
self.start_dt = start_dt
# control table consists of rows of 3 columns:
# - start in seconds after 01-01 00:00
# - end in seconds after 01-01 00:00
# - action_value
self.control_table = control_table
# the structure variable to apply the target value to
self.target_structure = target_structure
self.action_type = action_type
def get_normalized_t(self, t):
# figure out where we are in the year in seconds based on:
# - self.start_dt (start datetime or seconds since epoch of model calculation) and the
# - t (seconds since start calculation)
# TODO: implement
pass
def get_action_value(self, t):
t_normalized = self.get_normalized_t(t)
for (start, end, action_value) in self.control_table:
if t_normalized > start and t_normalized <= end:
return action_value
# should never come here
def run(self, t):
"""
:param start_dt - start datetime of the calculation
:param t - current time
"""
proceed = super(TimedController, self).run(t) # checks start and end
if proceed:
action_value = self.get_action_value(t)
self.set_action_value(self.target_structure, self.action_type, action_value)
class MemoryController(BaseController):
"""
Memory controller.
For a memory controller we need two thresholds (an activate and a
deactivate threshold) and only one target structure (network id) and an
action type. The control becomes active when the first threshold is
exceeded. However the control becomes inactive when the second threshold is
underrun.
"""
def __init__(
self, condition_variable, measure_group, thresholds, target_structure,
action_type, action_value):
# condition variables
self.condition_variable = condition_variable
self.measure_group = measure_group
# thresholds is a tuple of activate and deactivate threshold values
self.thresholds = thresholds
# action variables
self.target_structure = target_structure
self.action_type = action_type
self.action_value = action_value
def do_action(self):
# TODO: implement
pass
def run(self, t):
proceed = super(MemoryController, self).run(t) # checks start and end
if proceed:
self.do_action()
controllers = []
# simple controller example
measure_group = MeasureGroup(...) # from input files
target_structure = ... # from input files
action_type = ... # from input files
# one element control_table makes this a simple controller
control_table = [(1.0, 2.1)] # 1.0 is condition_value, 2.1 is action_value
simple_controller = TableController(
condition_variable='s1', condition_operator='>', control_table=control_table,
measure_group=measure_group, target_structure=target_structure,
action_type=action_type)
controllers.append(simple_controller)
# delta controller example
action_value = 1.2
delta_controller = DeltaController(
condition_variable='s1', condition_delta=2.6, condition_dt=85,
measure_group=measure_group, target_structure=target_structure,
action_type=action_type, action_value=action_value)
controllers.append(delta_controller)
# table controller example
control_table = [
# (threshold, action_value)
(0.5, 0.4),
(0.6, 0.3),
(0.8, 0.2),
(1.0, 1.0)
]
table_controller = TableController(
condition_variable='s1', condition_operator='>', control_table=control_table,
measure_group=measure_group, target_structure=target_structure,
action_type=action_type)
controllers.append(table_controller)
# PID controller example
pid_controller = PIDController(
kp=1.0, ki=1.0, kd=1.0, measure_variable='s1', measure_group=measure_group,
setpoint=2.3, target_structure=target_structure, action_type=action_type)
controllers.append(pid_controller)
# timed controller example
start_dt = 1491955200 # epoch time for 2017-04-12 00:00
control_table = [
(0, 10518975, 3.0) # 01-01 - 04-04
(10518975, 21037950, 2.5), # 04-04 - 10-09
(21037950, 31559999, 3.0) # 10-09 - 31-12
]
target_structure = weir
action_type = 'set_crest_level'
timed_controller = TimedController(
start_dt=start_dt, control_table=control_table, target_structure=target_structure,
action_type=action_type)
# memory controller example
condition_variable = 's1'
thresholds = (1.3, 0.2)
target_structure = pipe
action_type = 'set_discharge_coefficients'
action_value = (0.5, -9999) # left discharge fraction: 0.5, right: do nothing
memory_controller = MemoryController(
condition_variable, measure_group, thresholds, target_structure, action_type,
action_value)
# every calculation step, do:
t = current_t
for controller in controllers:
controller.run(t)
@jsmits
Copy link
Author

jsmits commented Apr 4, 2017

img_1178

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment