Skip to content

Instantly share code, notes, and snippets.

@takenoto
Last active September 5, 2023 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save takenoto/c9204067875f691a10d9220845a793f0 to your computer and use it in GitHub Desktop.
Save takenoto/c9204067875f691a10d9220845a793f0 to your computer and use it in GitHub Desktop.
pinn_la_casei2023 - Reactor Equations - 2023-09-09
# OBS importante
# Fiz algumas modificações na estrutura da função loss (versão 4) para incluir punições por valores abaixo de 0 ou acima de um limite máximo.
# As versões que constam abaixo são as tradicionais, para ficar mais fácil de ler. A nova versão já está no código com a função ODEPreparer
# Equacionamento
rX = (X * mu_max * S / (K_S + S)) * f_x_calc_func() * h_p_calc_func()
rP = alpha * rX + beta * X
rS = -(1 / Y_PS) * rP - ms * X
dVdt_calc = f_in - f_out
# Loss
loss_X = dXdt * V - (V * rX + (f_in * inlet.X - f_out * X))
loss_P = dPdt * V - (V * rP + (f_in * inlet.P - f_out * P))
loss_S = dSdt * V - (V * rS + (f_in * inlet.S - f_out * S))
loss_V = dVdt - dVdt_calc
# O "initial state" de cada reator
altiok_models_to_run = [get_altiok2006_params().get(2)]
# Parâmetros de processo (será usado em todos)
eq_params = altiok_models_to_run[0]
initial_state = CSTRState(
volume=np.array([5]),
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
)
initial_state_cstr = CSTRState(
volume=np.array([1]),
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
)
# Serve pra fed-batch
initial_state_fed_batch = CSTRState(
volume=np.array([1]),
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
)
process_params_feed_off = ProcessParams(
max_reactor_volume=5,
inlet=ConcentrationFlow(
volume=0.0,
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
),
t_final=10.2,
)
Xo=1.15
Po=6
So=21.4
mu_max=0.265
K_S=0.72
alpha=3.3
beta=0.06
Y_PS=0.682
ms=0.03
f=0.5
h=0.5
Pm=90
Xm=8
process_params_feed_fb = ProcessParams(
max_reactor_volume=5,
inlet=ConcentrationFlow(
volume=2, # L/h
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
),
t_final=10.2,
)
process_params_feed_cstr = ProcessParams(
max_reactor_volume=5,
inlet=ConcentrationFlow(
volume=1,
X=eq_params.Xo,
P=eq_params.Po,
S=eq_params.So,
),
t_final=24 * 4,
)
# 2023-12-08 mudei h_p_calc, estava fazendo a avaliação greater_equal pro X mds
# import tensorflow as tf
from deepxde.backend import tf
import deepxde as dde
from domain.params.solver_params import SolverParams
from domain.params.process_params import ProcessParams
from domain.params.altiok_2006_params import Altiok2006Params
import keras.backend as K
from domain.reactor.cstr_state import CSTRState
# A rede tem 1 entrada e 4 saídas, então deve ser do tipo
# [1] + [X] + [4]
# Onde X são as hidden layers
class ODEPreparer:
def __init__(
self,
solver_params: SolverParams,
eq_params: Altiok2006Params,
process_params: ProcessParams,
initial_state: CSTRState,
f_out_value_calc,
):
self.solver_params = solver_params
self.eq_params = eq_params
self.process_params = process_params
self.f_out_value_calc = f_out_value_calc
self.initial_state = initial_state
def prepare(self):
def ode_system(x, y):
"""
Order of outputs:
X, P, S, V
"""
# ---------------------
# PARAMETERS AND UTILITY
# ---------------------
mu_max = self.eq_params.mu_max
K_S = self.eq_params.K_S
alpha = self.eq_params.alpha
beta = self.eq_params.beta
Y_PS = self.eq_params.Y_PS
ms = self.eq_params.ms
f = self.eq_params.f
h = self.eq_params.h
Pm = self.eq_params.Pm
Xm = self.eq_params.Xm
inlet = self.process_params.inlet
process_params = self.process_params
solver_params = self.solver_params
f_out_value_calc = self.f_out_value_calc
inputSimulationType = self.solver_params.inputSimulationType
outputSimulationType = self.solver_params.outputSimulationType
initial_state = self.initial_state
scaler = solver_params.non_dim_scaler
# ------------------------
# ---- DECLARING AS "N" --
# ------------------------
N_nondim = {
"X": X_nondim,
"P": P_nondim,
"S": S_nondim,
"V": V_nondim,
"dXdt": dX_dt_nondim,
"dPdt": dP_dt_nondim,
"dSdt": dS_dt_nondim,
"dVdt": dV_dt_nondim,
}
# Nondim to dim
N = {type: scaler.fromNondim(N_nondim, type) for type in N_nondim}
X = N["X"]
P = N["P"]
S = N["S"]
V = N["V"]
dXdt = N["dXdt"]
dPdt = N["dPdt"]
dSdt = N["dSdt"]
dVdt = N["dVdt"]
# ------------------------
# ---- INLET AND OUTLET --
# ------------------------
f_in = inlet.volume
f_out = f_out_value_calc(
max_reactor_volume=process_params.max_reactor_volume,
f_in_v=f_in,
volume=V,
)
# Declara a List da loss pra já deixar guardado e ir adicionando
# conforme for sendo validado
loss_pde = []
# --------------------------
# Nondim Equations. Daqui pra baixo X,P,S, V (variáveis) etc já tá tudo
# adimensionalizado.
# Ok, são 2 problemas
# 1) Quando X>Xm
# 2) Quando X=Xm, porque 0^algo também dá NaN (https://github.com/tensorflow/tensorflow/issues/16271)
# Por algum motivo todas as tentativas anteriores, com keras.switch,
# tf.wherenão funcionaram
# Só funciona se fizer a atribuição do valor de X ou P por fora
# e usar esses novos valores nos cáculos.
# Se fizer um tf.where tendo como condicional o próprio X ou o valor da expressão não ser NaN, não presta, não adianta.
def f_x_calc_func():
loss_version = solver_params.get_loss_version_for_type("X")
X_for_calc = X
if loss_version > 2:
X_for_calc = tf.where(
tf.less(X, Xm), X, tf.ones_like(X) * 0.9999 * Xm
)
value = tf.pow(1 - X_for_calc / Xm, f)
return value
def h_p_calc_func():
loss_version = solver_params.get_loss_version_for_type("P")
P_for_calc = P
if loss_version > 2:
P_for_calc = tf.where(
tf.less(P, Pm), P, tf.ones_like(P) * 0.9999 * Pm
)
value = tf.pow(
1 - (P_for_calc / Pm),
h,
)
return value
rX = (X * mu_max * S / (K_S + S)) * f_x_calc_func() * h_p_calc_func()
rP = alpha * rX + beta * X
rS = -(1 / Y_PS) * rP - ms * X
# -----------------------
# Calculating the loss
# Procura cada variável registrada como de saída e
# adiciona o cálculo da sua função como componente da loss
# Zerar reações na marra para testar modelo sem reação
# rX = 0
# rP = 0
# rS = 0
for o in outputSimulationType.order:
# Pode dar NaN quando predizer valores abaixo de 0
# Então evite divisões!!!! Por isso o V vem multiplicando no fim...
loss_version = solver_params.get_loss_version_for_type(o)
if o == "X":
loss_derivative = dXdt * V - (V * rX + (f_in * inlet.X - f_out * X))
loss_X = 0
if loss_version == 4:
# if X<0, return X
# else if X>Xm, return X
# else return loss_X
loss_maxmin = tf.where(
tf.less(X, 0),
X,
tf.where(tf.greater(X, Xm), X - Xm, tf.zeros_like(X)),
)
loss_derivative_abs = tf.abs(loss_derivative)
loss_maxmin_abs = tf.abs(loss_maxmin)
loss_X = loss_derivative_abs + loss_maxmin_abs
elif loss_version <= 3:
loss_X = loss_derivative
loss_pde.append(loss_X)
elif o == "P":
loss_derivative = dPdt * V - (V * rP + (f_in * inlet.P - f_out * P))
loss_P = 0
if loss_version == 4:
loss_maxmin = tf.where(
tf.less(P, 0),
P,
tf.where(tf.greater(P, Pm), P - Pm, tf.zeros_like(P)),
)
loss_derivative_abs = tf.abs(loss_derivative)
loss_maxmin_abs = tf.abs(loss_maxmin)
loss_P = loss_derivative_abs + loss_maxmin_abs
elif loss_version <= 3:
loss_P = loss_derivative
loss_pde.append(loss_P)
elif o == "S":
loss_derivative = dSdt * V - (V * rS + (f_in * inlet.S - f_out * S))
loss_S = 0
if loss_version == 4:
loss_maxmin = tf.where(
tf.less(S, 0),
S,
tf.where(
tf.greater(S, initial_state.S[0]),
S - initial_state.S[0],
tf.zeros_like(S),
),
)
loss_derivative_abs = tf.abs(loss_derivative)
loss_maxmin_abs = tf.abs(loss_maxmin)
loss_S = loss_derivative_abs + loss_maxmin_abs
elif loss_version <= 3:
loss_S = loss_derivative
loss_pde.append(loss_S)
elif o == "V":
dVdt_calc = f_in - f_out
loss_derivative = dVdt - dVdt_calc
loss_V = 0
if loss_version == 4:
loss_maxmin = tf.where(
tf.less(V, 0),
V,
tf.zeros_like(V),
)
loss_derivative_abs = tf.abs(loss_derivative)
loss_maxmin_abs = tf.abs(loss_maxmin)
loss_V = loss_derivative_abs + loss_maxmin_abs
elif loss_version <= 3:
loss_V = loss_derivative
loss_pde.append(loss_V)
return loss_pde
return ode_system
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment