Skip to content

Instantly share code, notes, and snippets.

@Enigmatic331
Created April 26, 2023 10:02
Show Gist options
  • Save Enigmatic331/5d85f73ec4b8e85ace1a2fa36c86790f to your computer and use it in GitHub Desktop.
Save Enigmatic331/5d85f73ec4b8e85ace1a2fa36c86790f to your computer and use it in GitHub Desktop.
# @version ^0.3.7
# (c) Curve.Fi, 2023
"""
@title CurveTricryptoOptimizedWETH
@license MIT
@author Curve.Fi
@notice A Curve AMM pool for 3 unpegged assets (e.g. ETH, BTC, USD).
@dev All prices in the AMM are with respect to the first token in the pool.
"""
from vyper.interfaces import ERC20
implements: ERC20 # <--------------------- AMM contract is also the LP token.
# --------------------------------- Interfaces -------------------------------
interface Math:
def geometric_mean(_x: uint256[N_COINS]) -> uint256: view
def wad_exp(_power: int256) -> uint256: view
def cbrt(x: uint256) -> uint256: view
def reduction_coefficient(
x: uint256[N_COINS], fee_gamma: uint256
) -> uint256: view
def newton_D(
ANN: uint256,
gamma: uint256,
x_unsorted: uint256[N_COINS],
K0_prev: uint256
) -> uint256: view
def get_y(
ANN: uint256,
gamma: uint256,
x: uint256[N_COINS],
D: uint256,
i: uint256,
) -> uint256[2]: view
def get_p(
_xp: uint256[N_COINS], _D: uint256, _A_gamma: uint256[2],
) -> uint256[N_COINS-1]: view
interface WETH:
def deposit(): payable
def withdraw(_amount: uint256): nonpayable
interface Factory:
def admin() -> address: view
def fee_receiver() -> address: view
def views_implementation() -> address: view
interface Views:
def calc_token_amount(
amounts: uint256[N_COINS], deposit: bool, swap: address
) -> uint256: view
def get_dy(
i: uint256, j: uint256, dx: uint256, swap: address
) -> uint256: view
def get_dx(
i: uint256, j: uint256, dy: uint256, swap: address
) -> uint256: view
# ------------------------------- Events -------------------------------------
event Transfer:
sender: indexed(address)
receiver: indexed(address)
value: uint256
event Approval:
owner: indexed(address)
spender: indexed(address)
value: uint256
event TokenExchange:
buyer: indexed(address)
sold_id: uint256
tokens_sold: uint256
bought_id: uint256
tokens_bought: uint256
fee: uint256
packed_price_scale: uint256
event AddLiquidity:
provider: indexed(address)
token_amounts: uint256[N_COINS]
fee: uint256
token_supply: uint256
packed_price_scale: uint256
event RemoveLiquidity:
provider: indexed(address)
token_amounts: uint256[N_COINS]
token_supply: uint256
event RemoveLiquidityOne:
provider: indexed(address)
token_amount: uint256
coin_index: uint256
coin_amount: uint256
approx_fee: uint256
packed_price_scale: uint256
event CommitNewParameters:
deadline: indexed(uint256)
mid_fee: uint256
out_fee: uint256
fee_gamma: uint256
allowed_extra_profit: uint256
adjustment_step: uint256
ma_time: uint256
event NewParameters:
mid_fee: uint256
out_fee: uint256
fee_gamma: uint256
allowed_extra_profit: uint256
adjustment_step: uint256
ma_time: uint256
event RampAgamma:
initial_A: uint256
future_A: uint256
initial_gamma: uint256
future_gamma: uint256
initial_time: uint256
future_time: uint256
event StopRampA:
current_A: uint256
current_gamma: uint256
time: uint256
event ClaimAdminFee:
admin: indexed(address)
tokens: uint256
# ----------------------- Storage/State Variables ----------------------------
WETH20: immutable(address) # <- Address of wrapper contract for native asset.
N_COINS: constant(uint256) = 3
PRECISION: constant(uint256) = 10**18 # <------- The precision to convert to.
A_MULTIPLIER: constant(uint256) = 10000
packed_precisions: uint256
MATH: public(immutable(Math))
coins: public(address[N_COINS])
factory: public(address)
price_scale_packed: uint256 # <------------------------ Internal price scale.
price_oracle_packed: uint256 # <------- Price target given by moving average.
last_prices_packed: uint256
last_prices_timestamp: public(uint256)
initial_A_gamma: public(uint256)
initial_A_gamma_time: public(uint256)
future_A_gamma: public(uint256)
future_A_gamma_time: public(uint256) # <------ Time when ramping is finished.
# This value is 0 (default) when pool is first deployed, and only gets
# populated by block.timestamp + future_time in `ramp_A_gamma` when the
# ramping process is initiated. After ramping is finished
# (i.e. self.future_A_gamma_time < block.timestamp), the variable is left
# and not set to 0.
balances: public(uint256[N_COINS])
D: public(uint256)
xcp_profit: public(uint256)
xcp_profit_a: public(uint256) # <--- Full profit at last claim of admin fees.
virtual_price: public(uint256) # <------ Cached (fast to read) virtual price.
# The cached `virtual_price` is also used internally.
# -------------- Params that affect how price_scale get adjusted -------------
packed_rebalancing_params: public(uint256) # <---------- Contains rebalancing
# parameters allowed_extra_profit, adjustment_step, and ma_time.
future_packed_rebalancing_params: uint256
# ---------------- Fee params that determine dynamic fees --------------------
packed_fee_params: public(uint256) # <---- Packs mid_fee, out_fee, fee_gamma.
future_packed_fee_params: uint256
ADMIN_FEE: constant(uint256) = 5 * 10**9 # <------------- 50% of earned fees.
MIN_FEE: constant(uint256) = 5 * 10**5 # <-------------------------- 0.5 BPS.
MAX_FEE: constant(uint256) = 10 * 10**9
NOISE_FEE: constant(uint256) = 10**5 # <---------------------------- 0.1 BPS.
# ----------------------- Admin params ---------------------------------------
admin_actions_deadline: public(uint256)
ADMIN_ACTIONS_DELAY: constant(uint256) = 3 * 86400
MIN_RAMP_TIME: constant(uint256) = 86400
MIN_A: constant(uint256) = N_COINS**N_COINS * A_MULTIPLIER / 100
MAX_A: constant(uint256) = 1000 * A_MULTIPLIER * N_COINS**N_COINS
MAX_A_CHANGE: constant(uint256) = 10
MIN_GAMMA: constant(uint256) = 10**10
MAX_GAMMA: constant(uint256) = 5 * 10**16
PRICE_SIZE: constant(int128) = 256 / (N_COINS - 1)
PRICE_MASK: constant(uint256) = 2**PRICE_SIZE - 1
# ----------------------- ERC20 Specific vars --------------------------------
name: public(immutable(String[64]))
symbol: public(immutable(String[32]))
decimals: public(constant(uint8)) = 18
version: public(constant(String[8])) = "v2.0.0"
balanceOf: public(HashMap[address, uint256])
allowance: public(HashMap[address, HashMap[address, uint256]])
totalSupply: public(uint256)
nonces: public(HashMap[address, uint256])
EIP712_TYPEHASH: constant(bytes32) = keccak256(
"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract,bytes32 salt)"
)
EIP2612_TYPEHASH: constant(bytes32) = keccak256(
"Permit(address owner,address spender,uint256 value,uint256 nonce,uint256 deadline)"
)
VERSION_HASH: constant(bytes32) = keccak256(version)
NAME_HASH: immutable(bytes32)
CACHED_CHAIN_ID: immutable(uint256)
salt: public(immutable(bytes32))
CACHED_DOMAIN_SEPARATOR: immutable(bytes32)
# ----------------------- Contract -------------------------------------------
@external
def __init__(
_name: String[64],
_symbol: String[32],
_coins: address[N_COINS],
_math: address,
_weth: address,
_salt: bytes32,
packed_precisions: uint256,
packed_A_gamma: uint256,
packed_fee_params: uint256,
packed_rebalancing_params: uint256,
packed_prices: uint256,
):
WETH20 = _weth
MATH = Math(_math)
self.factory = msg.sender
name = _name
symbol = _symbol
self.coins = _coins
self.packed_precisions = packed_precisions # <------- Precisions of coins
# are calculated as 10**(18 - coin.decimals()).
self.initial_A_gamma = packed_A_gamma # <------------------- A and gamma.
self.future_A_gamma = packed_A_gamma
self.packed_rebalancing_params = packed_rebalancing_params # <-- Contains
# rebalancing params: allowed_extra_profit, adjustment_step,
# and ma_exp_time.
self.packed_fee_params = packed_fee_params # <-------------- Contains Fee
# params: mid_fee, out_fee and fee_gamma.
self.price_scale_packed = packed_prices
self.price_oracle_packed = packed_prices
self.last_prices_packed = packed_prices
self.last_prices_timestamp = block.timestamp
self.xcp_profit_a = 10**18
# Cache DOMAIN_SEPARATOR. If chain.id is not CACHED_CHAIN_ID, then
# DOMAIN_SEPARATOR will be re-calculated each time `permit` is called.
# Otherwise, it will always use CACHED_DOMAIN_SEPARATOR.
# see: `_domain_separator()` for its implementation.
NAME_HASH = keccak256(name)
salt = _salt
CACHED_CHAIN_ID = chain.id
CACHED_DOMAIN_SEPARATOR = keccak256(
_abi_encode(
EIP712_TYPEHASH,
NAME_HASH,
VERSION_HASH,
chain.id,
self,
salt,
)
)
log Transfer(empty(address), self, 0) # <------- Fire empty transfer from
# 0x0 to self for indexers to catch.
# ------------------- Token transfers in and out of the AMM ------------------
@payable
@external
def __default__():
if msg.value > 0:
assert WETH20 in self.coins, "dev: ETH not in pool"
@internal
def _transfer_in(
_coin: address,
dx: uint256,
dy: uint256,
mvalue: uint256,
callbacker: address,
callback_sig: bytes32,
sender: address,
receiver: address,
use_eth: bool
):
"""
@notice Transfers `_coin` from `sender` to `self` and calls `callback_sig`
if it is not empty.
@dev The callback sig must have the following args:
sender: address
receiver: address
coin: address
dx: uint256
dy: uint256
@params _coin address of the coin to transfer in.
@params dx amount of `_coin` to transfer into the pool.
@params dy amount of `_coin` to transfer out of the pool.
@params mvalue msg.value if the transfer is ETH, 0 otherwise.
@params callbacker address to call `callback_sig` on.
@params callback_sig signature of the callback function.
@params sender address to transfer `_coin` from.
@params receiver address to transfer `_coin` to.
@params use_eth True if the transfer is ETH, False otherwise.
"""
if use_eth and _coin == WETH20:
assert mvalue == dx, "dev: incorrect eth amount"
else:
assert mvalue == 0, "dev: nonzero eth amount"
if callback_sig == empty(bytes32):
assert ERC20(_coin).transferFrom(
sender, self, dx, default_return_value=True
)
else:
# --------- This part of the _transfer_in logic is only accessible
# by _exchange.
# First call callback logic and then check if pool
# gets dx amounts of _coins[i], revert otherwise.
b: uint256 = ERC20(_coin).balanceOf(self)
raw_call(
callbacker,
concat(
slice(callback_sig, 0, 4),
_abi_encode(sender, receiver, _coin, dx, dy)
)
)
assert ERC20(_coin).balanceOf(self) - b == dx, "dev: callback didn't give us coins"
# ^------ note: dx cannot
# be 0, so the contract MUST receive some _coin.
if _coin == WETH20:
WETH(WETH20).withdraw(dx) # <--------- if WETH was transferred in
# previous step and `not use_eth`, withdraw WETH to ETH.
@internal
def _transfer_out(
_coin: address, _amount: uint256, use_eth: bool, receiver: address
):
"""
@notice Transfer a single token from the pool to receiver.
@dev This function is called by `remove_liquidity` and
`remove_liquidity_one` and `_exchange` methods.
@params _coin Address of the token to transfer out
@params _amount Amount of token to transfer out
@params use_eth Whether to transfer ETH or not
@params receiver Address to send the tokens to
"""
if use_eth and _coin == WETH20:
raw_call(receiver, b"", value=_amount)
else:
if _coin == WETH20:
WETH(WETH20).deposit(value=_amount)
assert ERC20(_coin).transfer(
receiver, _amount, default_return_value=True
)
# -------------------------- AMM Main Functions ------------------------------
@payable
@external
@nonreentrant("lock")
def exchange(
i: uint256,
j: uint256,
dx: uint256,
min_dy: uint256,
use_eth: bool = False,
receiver: address = msg.sender
) -> uint256:
"""
@notice Exchange using wrapped native token by default
@param i Index value for the input coin
@param j Index value for the output coin
@param dx Amount of input coin being swapped in
@param min_dy Minimum amount of output coin to receive
@param use_eth True if the input coin is native token, False otherwise
@param receiver Address to send the output coin to. Default is msg.sender
@return uint256 Amount of tokens at index j received by the `receiver
"""
return self._exchange(
msg.sender,
msg.value,
i,
j,
dx,
min_dy,
use_eth,
receiver,
empty(address),
empty(bytes32)
)
@payable
@external
@nonreentrant('lock')
def exchange_underlying(
i: uint256,
j: uint256,
dx: uint256,
min_dy: uint256,
receiver: address = msg.sender
) -> uint256:
"""
@notice Exchange using native token transfers.
@param i Index value for the input coin
@param j Index value for the output coin
@param dx Amount of input coin being swapped in
@param min_dy Minimum amount of output coin to receive
@param receiver Address to send the output coin to. Default is msg.sender
@return uint256 Amount of tokens at index j received by the `receiver
"""
return self._exchange(
msg.sender,
msg.value,
i,
j,
dx,
min_dy,
True,
receiver,
empty(address),
empty(bytes32)
)
@external
@nonreentrant('lock')
def exchange_extended(
i: uint256,
j: uint256,
dx: uint256,
min_dy: uint256,
use_eth: bool,
sender: address,
receiver: address,
cb: bytes32
) -> uint256:
"""
@notice Exchange with callback method.
@dev This method does not allow swapping in native token, but does allow
swaps that transfer out native token from the pool.
@param i Index value for the input coin
@param j Index value for the output coin
@param dx Amount of input coin being swapped in
@param min_dy Minimum amount of output coin to receive
@param use_eth True if output is native token, False otherwise
@param sender Address to transfer input coin from
@param receiver Address to send the output coin to
@param cb Callback signature
@return uint256 Amount of tokens at index j received by the `receiver`
"""
assert cb != empty(bytes32), "dev: No callback specified"
return self._exchange(
sender, 0, i, j, dx, min_dy, use_eth, receiver, msg.sender, cb
) # callbacker should never be self ------------------^
@payable
@external
@nonreentrant("lock")
def add_liquidity(
amounts: uint256[N_COINS],
min_mint_amount: uint256,
use_eth: bool = False,
receiver: address = msg.sender
) -> uint256:
"""
@notice Adds liquidity into the pool.
@param amounts Amounts of each coin to add.
@param min_mint_amount Minimum amount of LP to mint.
@param use_eth True if native token is being added to the pool.
@param receiver Address to send the LP tokens to. Default is msg.sender
@return uint256 Amount of LP tokens received by the `receiver
"""
A_gamma: uint256[2] = self._A_gamma()
xp: uint256[N_COINS] = self.balances
amountsp: uint256[N_COINS] = empty(uint256[N_COINS])
xx: uint256[N_COINS] = empty(uint256[N_COINS])
d_token: uint256 = 0
d_token_fee: uint256 = 0
old_D: uint256 = 0
assert amounts[0] + amounts[1] + amounts[2] > 0 #dev: no coins to add
self._claim_admin_fees() # <---- Claiming fees reduces virtual_price. So,
# claim fees before adding liquidity; depositor is not micro-rugged.
# --------------------- Get prices, balances -----------------------------
precisions: uint256[N_COINS] = self._unpack(self.packed_precisions)
packed_price_scale: uint256 = self.price_scale_packed
price_scale: uint256[N_COINS-1] = self._unpack_prices(packed_price_scale)
# -------------------------------------- Update balances and calculate xp.
xp_old: uint256[N_COINS] = xp
for i in range(N_COINS):
bal: uint256 = xp[i] + amounts[i]
xp[i] = bal
self.balances[i] = bal
xx = xp
xp[0] *= precisions[0]
xp_old[0] *= precisions[0]
for i in range(1, N_COINS):
xp[i] = unsafe_div(xp[i] * price_scale[i-1] * precisions[i], PRECISION)
xp_old[i] = unsafe_div(
xp_old[i] * unsafe_mul(price_scale[i-1], precisions[i]),
PRECISION
)
# ---------------- transferFrom token into the pool ----------------------
for i in range(N_COINS):
if amounts[i] > 0:
if self.coins[i] == WETH20:
self._transfer_in(
self.coins[i],
amounts[i],
0, # <-----------------------------------
msg.value, # | No callbacks
empty(address), # <----------------------| for
empty(bytes32), # <----------------------| add_liquidity.
msg.sender, # |
empty(address), # <-----------------------
use_eth
)
else:
self._transfer_in(
self.coins[i],
amounts[i],
0,
0, # <----------------- mvalue = 0 if coin is not WETH20.
empty(address),
empty(bytes32),
msg.sender,
empty(address),
False # <-------- use_eth is False if coin is not WETH20.
)
amountsp[i] = xp[i] - xp_old[i]
# -------------------- Calculate LP tokens to mint -----------------------
if self.future_A_gamma_time > block.timestamp: # <--- A_gamma is ramping.
# ----- Recalculate the invariant if A or gamma are undergoing a ramp.
old_D = MATH.newton_D(A_gamma[0], A_gamma[1], xp_old, 0)
else:
old_D = self.D
D: uint256 = MATH.newton_D(A_gamma[0], A_gamma[1], xp, 0)
token_supply: uint256 = self.totalSupply
if old_D > 0:
d_token = token_supply * D / old_D - token_supply
else:
d_token = self.get_xcp(D) # <------------------------- Making initial
# virtual price equal to 1.
assert d_token > 0 # dev: nothing minted
if old_D > 0:
d_token_fee = (
self._calc_token_fee(amountsp, xp) * d_token / 10**10 + 1
)
d_token -= d_token_fee
token_supply += d_token
self.mint(receiver, d_token)
packed_price_scale = self.tweak_price(A_gamma, xp, D, 0)
else:
self.D = D
self.virtual_price = 10**18
self.xcp_profit = 10**18
self.mint(receiver, d_token)
assert d_token >= min_mint_amount, "Slippage"
log AddLiquidity(
receiver, amounts, d_token_fee, token_supply, packed_price_scale
)
return d_token
@external
@nonreentrant("lock")
def remove_liquidity(
_amount: uint256,
min_amounts: uint256[N_COINS],
use_eth: bool = False,
receiver: address = msg.sender
) -> uint256[N_COINS]:
"""
@notice This withdrawal method is very safe, does no complex math since
tokens are withdrawn in balanced proportions. No fees are charged.
@param _amount Amount of LP tokens to burn
@param min_amounts Minimum amounts of tokens to withdraw
@param use_eth Whether to withdraw ETH or not
@param receiver Address to send the withdrawn tokens to
@return uint256[3] Amount of pool tokens received by the `receiver`
"""
amount: uint256 = _amount
balances: uint256[N_COINS] = self.balances
d_balances: uint256[N_COINS] = empty(uint256[N_COINS])
# -------------------------------------------------------- Burn LP tokens.
total_supply: uint256 = self.totalSupply # <------ Get totalSupply before
self.burnFrom(msg.sender, _amount) # ---- reducing it with self.burnFrom.
# There are two cases for withdrawing tokens from the pool.
# Case 1. Withdrawal does not empty the pool.
# In this situation, D is adjusted proportional to the amount of
# LP tokens burnt. ERC20 tokens transferred is proportional
# to : (AMM balance * LP tokens in) / LP token total supply
# Case 2. Withdrawal empties the pool.
# In this situation, all tokens are withdrawn and the invariant
# is reset.
if amount == total_supply: # <----------------------------------- Case 2.
for i in range(N_COINS):
d_balances[i] = balances[i]
self.balances[i] = 0 # <------------------------- Empty the pool.
else: # <-------------------------------------------------------- Case 1.
amount -= 1 # <---- To prevent rounding errors, favor LPs a tiny bit.
for i in range(N_COINS):
d_balances[i] = balances[i] * amount / total_supply
assert d_balances[i] >= min_amounts[i]
self.balances[i] = balances[i] - d_balances[i]
balances[i] = d_balances[i] # <-- Now it's the amounts going out.
D: uint256 = self.D
self.D = D - unsafe_div(D * amount, total_supply) # <----------- Reduce D
# proportional to the amount of tokens leaving. Since withdrawals are
# balanced, this is a simple subtraction. If amount == total_supply,
# D will be 0.
# ---------------------------------- Transfers ---------------------------
for i in range(N_COINS):
self._transfer_out(self.coins[i], d_balances[i], use_eth, receiver)
log RemoveLiquidity(msg.sender, balances, total_supply - _amount)
return d_balances
@external
@nonreentrant("lock")
def remove_liquidity_one_coin(
token_amount: uint256,
i: uint256,
min_amount: uint256,
use_eth: bool = False,
receiver: address = msg.sender
) -> uint256:
"""
@notice Withdraw liquidity in a single token.
Involves fees (lower than swap fees).
@dev This operation also involves an admin fee claim.
@param token_amount Amount of LP tokens to burn
@param i Index of the token to withdraw
@param min_amount Minimum amount of token to withdraw.
@param use_eth Whether to withdraw ETH or not
@param receiver Address to send the withdrawn tokens to
@return Amount of tokens at index i received by the `receiver`
"""
A_gamma: uint256[2] = self._A_gamma()
dy: uint256 = 0
D: uint256 = 0
p: uint256 = 0
xp: uint256[N_COINS] = empty(uint256[N_COINS])
approx_fee: uint256 = 0
# ------------------------------------------------------------------------
dy, D, xp, approx_fee = self._calc_withdraw_one_coin(
A_gamma,
token_amount,
i,
(self.future_A_gamma_time > block.timestamp), # <------- During ramps
) # we need to update D.
assert dy >= min_amount, "Slippage"
# ------------------------- Transfers ------------------------------------
self.balances[i] -= dy
self.burnFrom(msg.sender, token_amount)
self._transfer_out(self.coins[i], dy, use_eth, receiver)
packed_price_scale: uint256 = self.tweak_price(A_gamma, xp, D, 0)
# Safe to use D from _calc_withdraw_one_coin here ---^
log RemoveLiquidityOne(
msg.sender, token_amount, i, dy, approx_fee, packed_price_scale
)
self._claim_admin_fees()
return dy
@external
@nonreentrant("lock")
def claim_admin_fees():
"""
@notice Claim admin fees. Callable by anyone.
"""
self._claim_admin_fees()
# -------------------------- Packing functions -------------------------------
@internal
@view
def _pack(x: uint256[3]) -> uint256:
"""
@notice Packs 3 integers with values <= 10**18 into a uint256
@param x The uint256[3] to pack
@return uint256 Integer with packed values
"""
return shift(x[0], 128) | shift(x[1], 64) | x[2]
@internal
@view
def _unpack(_packed: uint256) -> uint256[3]:
"""
@notice Unpacks a uint256 into 3 integers (values must be <= 10**18)
@param val The uint256 to unpack
@return uint256[3] A list of length 3 with unpacked integers
"""
return [
shift(_packed, -128) & 18446744073709551615,
shift(_packed, -64) & 18446744073709551615,
_packed & 18446744073709551615,
]
@internal
@view
def _pack_prices(prices_to_pack: uint256[N_COINS-1]) -> uint256:
"""
@notice Packs N_COINS-1 prices into a uint256.
@param prices_to_pack The prices to pack
@return uint256 An integer that packs prices
"""
packed_prices: uint256 = 0
p: uint256 = 0
for k in range(N_COINS - 1):
packed_prices = shift(packed_prices, PRICE_SIZE)
p = prices_to_pack[N_COINS - 2 - k]
assert p < PRICE_MASK
packed_prices = p | packed_prices
return packed_prices
@internal
@view
def _unpack_prices(_packed_prices: uint256) -> uint256[2]:
"""
@notice Unpacks N_COINS-1 prices from a uint256.
@param _packed_prices The packed prices
@return uint256[2] Unpacked prices
"""
unpacked_prices: uint256[N_COINS-1] = empty(uint256[N_COINS-1])
packed_prices: uint256 = _packed_prices
for k in range(N_COINS - 1):
unpacked_prices[k] = packed_prices & PRICE_MASK
packed_prices = shift(packed_prices, -PRICE_SIZE)
return unpacked_prices
# ---------------------- AMM Internal Functions -------------------------------
@internal
def _exchange(
sender: address,
mvalue: uint256,
i: uint256,
j: uint256,
dx: uint256,
min_dy: uint256,
use_eth: bool,
receiver: address,
callbacker: address,
callback_sig: bytes32
) -> uint256:
assert i != j, "dev: coin index out of range"
assert dx > 0, "dev: do not exchange 0 coins"
A_gamma: uint256[2] = self._A_gamma()
xp: uint256[N_COINS] = self.balances
precisions: uint256[N_COINS] = self._unpack(self.packed_precisions)
dy: uint256 = 0
y: uint256 = xp[j] # <----------------- if j > N_COINS, this will revert.
x0: uint256 = xp[i] # <--------------- if i > N_COINS, this will revert.
xp[i] = x0 + dx
self.balances[i] = xp[i]
packed_price_scale: uint256 = self.price_scale_packed
price_scale: uint256[N_COINS - 1] = self._unpack_prices(
packed_price_scale
)
xp[0] *= precisions[0]
for k in range(1, N_COINS):
xp[k] = unsafe_div(
xp[k] * price_scale[k - 1] * precisions[k],
PRECISION
) # <-------- Safu to do unsafe_div here since PRECISION is not zero.
prec_i: uint256 = precisions[i]
# ----------- Update invariant if A, gamma are undergoing ramps ---------
t: uint256 = self.future_A_gamma_time
if t > block.timestamp:
x0 *= prec_i
if i > 0:
x0 = unsafe_div(x0 * price_scale[i - 1], PRECISION)
x1: uint256 = xp[i] # <------------------ Back up old value in xp ...
xp[i] = x0 # |
self.D = MATH.newton_D(A_gamma[0], A_gamma[1], xp, 0) # |
xp[i] = x1 # <-------------------------------------- ... and restore.
# ----------------------- Calculate dy and fees --------------------------
D: uint256 = self.D
prec_j: uint256 = precisions[j]
y_out: uint256[2] = MATH.get_y(A_gamma[0], A_gamma[1], xp, D, j)
dy = xp[j] - y_out[0]
xp[j] -= dy
dy -= 1
if j > 0:
dy = dy * PRECISION / price_scale[j - 1]
dy /= prec_j
fee: uint256 = unsafe_div(self._fee(xp) * dy, 10**10)
dy -= fee # <--------------------- Subtract fee from the outgoing amount.
assert dy >= min_dy, "Slippage"
y -= dy
self.balances[j] = y # <----------- Update pool balance of outgoing coin.
y *= prec_j
if j > 0:
y = unsafe_div(y * price_scale[j - 1], PRECISION)
xp[j] = y # <------------------------------------------------- Update xp.
# ---------------------- Do Transfers in and out -------------------------
########################## TRANSFER IN <-------
self._transfer_in(
self.coins[i], dx, dy, mvalue,
callbacker, callback_sig, # <-------- Callback method is called here.
sender, receiver, use_eth,
)
########################## -------> TRANSFER OUT
self._transfer_out(self.coins[j], dy, use_eth, receiver)
# ------ Tweak price_scale with good initial guess for newton_D ----------
packed_price_scale = self.tweak_price(A_gamma, xp, 0, y_out[1])
log TokenExchange(sender, i, dx, j, dy, fee, packed_price_scale)
return dy
@internal
def tweak_price(
A_gamma: uint256[2],
_xp: uint256[N_COINS],
new_D: uint256,
K0_prev: uint256 = 0,
) -> uint256:
"""
@notice Tweaks price_oracle, last_price and conditionally adjusts
price_scale. This is called whenever there is an unbalanced
liquidity operation: _exchange, add_liquidity, or
remove_liquidity_one_coin.
@dev Contains main liquidity rebalancing logic, by tweaking `price_scale`.
@param A_gamma Array of A and gamma parameters.
@param _xp Array of current balances.
@param new_D New D value.
@param K0_prev Initial guess for `newton_D`.
"""
# ---------------------------- Read storage ------------------------------
rebalancing_params: uint256[3] = self._unpack(
self.packed_rebalancing_params
) # <---------- Contains: allowed_extra_profit, adjustment_step, ma_time.
price_oracle: uint256[N_COINS - 1] = self._unpack_prices(
self.price_oracle_packed
)
last_prices: uint256[N_COINS - 1] = self._unpack_prices(
self.last_prices_packed
)
packed_price_scale: uint256 = self.price_scale_packed
price_scale: uint256[N_COINS - 1] = self._unpack_prices(
packed_price_scale
)
total_supply: uint256 = self.totalSupply
old_xcp_profit: uint256 = self.xcp_profit
old_virtual_price: uint256 = self.virtual_price
last_prices_timestamp: uint256 = self.last_prices_timestamp
# ----------------------- Update MA if needed ----------------------------
if last_prices_timestamp < block.timestamp:
# The moving average price oracle is calculated using the last_price
# of the trade at the previous block, and the price oracle logged
# before that trade. This can happen only once per block.
# ------------------ Calculate moving average params -----------------
alpha: uint256 = MATH.wad_exp(
-convert(
unsafe_div(
(block.timestamp - last_prices_timestamp) * 10**18,
rebalancing_params[2] # <----------------------- ma_time.
),
int256,
)
)
for k in range(N_COINS - 1):
price_oracle[k] = unsafe_div(
min(last_prices[k], 2 * price_oracle[k]) * (10**18 - alpha) +
price_oracle[k] * alpha, # ^-------- Cap spot price into EMA.
10**18
)
self.price_oracle_packed = self._pack_prices(price_oracle)
self.last_prices_timestamp = block.timestamp # <---- Store timestamp.
# price_oracle is used further on to calculate its vector
# distance from price_scale. This distance is used to calculate
# the amount of adjustment to be done to the price_scale.
# ------------------ If new_D is set to 0, calculate it ------------------
D_unadjusted: uint256 = new_D
if new_D == 0: # <--------------------------- _exchange sets new_D to 0.
D_unadjusted = MATH.newton_D(A_gamma[0], A_gamma[1], _xp, K0_prev)
# ----------------------- Calculate last_prices --------------------------
last_prices = MATH.get_p(_xp, D_unadjusted, A_gamma)
for k in range(N_COINS - 1):
last_prices[k] = unsafe_div(last_prices[k] * price_scale[k], 10**18)
self.last_prices_packed = self._pack_prices(last_prices)
# ---------- Update profit numbers without price adjustment first --------
xp: uint256[N_COINS] = empty(uint256[N_COINS])
xp[0] = unsafe_div(D_unadjusted, N_COINS)
for k in range(N_COINS - 1):
xp[k + 1] = D_unadjusted * 10**18 / (N_COINS * price_scale[k])
# ------------------------- Update xcp_profit ----------------------------
xcp_profit: uint256 = 10**18
virtual_price: uint256 = 10**18
if old_virtual_price > 0:
xcp: uint256 = MATH.geometric_mean(xp)
virtual_price = 10**18 * xcp / total_supply
xcp_profit = unsafe_div(
old_xcp_profit * virtual_price,
old_virtual_price
) # <---------------- Safu to do unsafe_div as old_virtual_price > 0.
# If A and gamma are not undergoing ramps (t < block.timestamp),
# ensure new virtual_price is not less than old virtual_price,
# else the pool suffers a loss.
if self.future_A_gamma_time < block.timestamp:
assert virtual_price > old_virtual_price, "Loss"
self.xcp_profit = xcp_profit
# ------------ Rebalance liquidity if there's enough profits to adjust it:
if virtual_price * 2 - 10**18 > xcp_profit + 2 * rebalancing_params[0]:
# allowed_extra_profit --------^
# ------------------- Get adjustment step ----------------------------
# Calculate the vector distance between price_scale and
# price_oracle.
norm: uint256 = 0
ratio: uint256 = 0
for k in range(N_COINS - 1):
ratio = unsafe_div(price_oracle[k] * 10**18, price_scale[k])
# unsafe_div because we did safediv before ----^
if ratio > 10**18:
ratio = unsafe_sub(ratio, 10**18)
else:
ratio = unsafe_sub(10**18, ratio)
norm = unsafe_add(norm, ratio**2)
norm = isqrt(norm) # <-------------------- isqrt is not in base 1e18.
adjustment_step: uint256 = max(
rebalancing_params[1], unsafe_div(norm, 5)
) # ^------------------------------------- adjustment_step.
if norm > adjustment_step: # <---------- We only adjust prices if the
# vector distance between price_oracle and price_scale is
# large enough. This check ensures that no rebalancing
# occurs if the distance is low i.e. the pool prices are
# pegged to the oracle prices.
# ------------------------------------- Calculate new price scale.
p_new: uint256[N_COINS - 1] = empty(uint256[N_COINS - 1])
for k in range(N_COINS - 1):
p_new[k] = unsafe_div(
price_scale[k] * unsafe_sub(norm, adjustment_step)
+ adjustment_step * price_oracle[k],
norm
) # <- norm is non-zero and gt adjustment_step; unsafe = safe
# ---------------- Update stale xp (using price_scale) with p_new.
xp = _xp
for k in range(N_COINS - 1):
xp[k + 1] = unsafe_div(_xp[k + 1] * p_new[k], price_scale[k])
# unsafe_div because we did safediv before ----^
# ------------------------------------------ Update D with new xp.
D: uint256 = MATH.newton_D(A_gamma[0], A_gamma[1], xp, 0)
for k in range(N_COINS):
frac: uint256 = xp[k] * 10**18 / D # <----- Check validity of
assert (frac > 10**16 - 1) and (frac < 10**20 + 1) # p_new.
xp[0] = D / N_COINS
for k in range(N_COINS - 1):
xp[k + 1] = D * 10**18 / (N_COINS * p_new[k]) # <---- Convert
# xp to real prices.
# ---------- Calculate new virtual_price using new xp and D. Reuse
# `old_virtual_price` (but it has new virtual_price).
old_virtual_price = unsafe_div(
10**18 * MATH.geometric_mean(xp), total_supply
) # <----- unsafe_div because we did safediv before (if vp>1e18)
# ---------------------------- Proceed if we've got enough profit.
if (
old_virtual_price > 10**18 and
2 * old_virtual_price - 10**18 > xcp_profit
):
packed_price_scale = self._pack_prices(p_new)
self.D = D
self.virtual_price = old_virtual_price
self.price_scale_packed = packed_price_scale
return packed_price_scale
# --------- price_scale was not adjusted. Update the profit counter and D.
self.D = D_unadjusted
self.virtual_price = virtual_price
return packed_price_scale
@internal
def _claim_admin_fees():
"""
@notice Claims admin fees and sends it to fee_receiver set in the factory.
"""
A_gamma: uint256[2] = self._A_gamma()
xcp_profit: uint256 = self.xcp_profit # <---------- Current pool profits.
xcp_profit_a: uint256 = self.xcp_profit_a # <- Profits at previous claim.
total_supply: uint256 = self.totalSupply
# Do not claim admin fees if:
# 1. insufficient profits accrued since last claim, and
# 2. there are less than 10**18 (or 1 unit of) lp tokens, else it can lead
# to manipulated virtual prices.
if xcp_profit <= xcp_profit_a or total_supply < 10**18:
return
# Claim tokens belonging to the admin here. This is done by 'gulping'
# pool tokens that have accrued as fees, but not accounted in pool's
# `self.balances` yet: pool balances only account for incoming and
# outgoing tokens excluding fees. Following 'gulps' fees:
coins: address[N_COINS] = self.coins
for i in range(N_COINS):
if coins[i] == WETH20:
self.balances[i] = self.balance
else:
self.balances[i] = ERC20(coins[i]).balanceOf(self)
# If the pool has made no profits, `xcp_profit == xcp_profit_a`
# and the pool gulps nothing in the previous step.
vprice: uint256 = self.virtual_price
# Admin fees are calculated as follows.
# 1. Calculate accrued profit since last claim. `xcp_profit`
# is the current profits. `xcp_profit_a` is the profits
# at the previous claim.
# 2. Take out admin's share, which is hardcoded at 5 * 10**9.
# (50% => half of 100% => 10**10 / 2 => 5 * 10**9).
# 3. Since half of the profits go to rebalancing the pool, we
# are left with half; so divide by 2.
fees: uint256 = unsafe_div(
unsafe_sub(xcp_profit, xcp_profit_a) * ADMIN_FEE, 2 * 10**10
)
# ------------------------------ Claim admin fees by minting admin's share
# of the pool in LP tokens.
receiver: address = Factory(self.factory).fee_receiver()
if receiver != empty(address) and fees > 0:
frac: uint256 = vprice * 10**18 / (vprice - fees) - 10**18
claimed: uint256 = self.mint_relative(receiver, frac)
xcp_profit -= fees * 2
self.xcp_profit = xcp_profit
log ClaimAdminFee(receiver, claimed)
# ------------------------------------------- Recalculate D b/c we gulped.
D: uint256 = MATH.newton_D(A_gamma[0], A_gamma[1], self.xp(), 0)
self.D = D
# ------------------- Recalculate virtual_price following admin fee claim.
# In this instance we do not check if current virtual price is greater
# than old virtual price, since the claim process can result
# in a small decrease in pool's value.
self.virtual_price = 10**18 * self.get_xcp(D) / self.totalSupply
self.xcp_profit_a = xcp_profit # <------------ Cache last claimed profit.
@internal
@view
def xp() -> uint256[N_COINS]:
result: uint256[N_COINS] = self.balances
packed_prices: uint256 = self.price_scale_packed
precisions: uint256[N_COINS] = self._unpack(self.packed_precisions)
result[0] *= precisions[0]
for i in range(1, N_COINS):
p: uint256 = (packed_prices & PRICE_MASK) * precisions[i]
result[i] = result[i] * p / PRECISION
packed_prices = shift(packed_prices, -PRICE_SIZE)
return result
@view
@internal
def _A_gamma() -> uint256[2]:
t1: uint256 = self.future_A_gamma_time
A_gamma_1: uint256 = self.future_A_gamma
gamma1: uint256 = A_gamma_1 & 2**128 - 1
A1: uint256 = shift(A_gamma_1, -128)
if block.timestamp < t1:
# --------------- Handle ramping up and down of A --------------------
A_gamma_0: uint256 = self.initial_A_gamma
t0: uint256 = self.initial_A_gamma_time
t1 -= t0
t0 = block.timestamp - t0
t2: uint256 = t1 - t0
A1 = (shift(A_gamma_0, -128) * t2 + A1 * t0) / t1
gamma1 = ((A_gamma_0 & 2**128 - 1) * t2 + gamma1 * t0) / t1
return [A1, gamma1]
@internal
@view
def _fee(xp: uint256[N_COINS]) -> uint256:
fee_params: uint256[3] = self._unpack(self.packed_fee_params)
f: uint256 = MATH.reduction_coefficient(xp, fee_params[2])
return unsafe_div(
fee_params[0] * f + fee_params[1] * (10**18 - f),
10**18
)
@internal
@view
def get_xcp(D: uint256) -> uint256:
x: uint256[N_COINS] = empty(uint256[N_COINS])
x[0] = D / N_COINS
packed_prices: uint256 = self.price_scale_packed # <-- No precisions here
# because we don't switch to "real" units.
for i in range(1, N_COINS):
x[i] = D * 10**18 / (N_COINS * (packed_prices & PRICE_MASK))
packed_prices = shift(packed_prices, -PRICE_SIZE)
return MATH.geometric_mean(x)
@view
@internal
def _calc_token_fee(amounts: uint256[N_COINS], xp: uint256[N_COINS]) -> uint256:
# fee = sum(amounts_i - avg(amounts)) * fee' / sum(amounts)
fee: uint256 = unsafe_div(
unsafe_mul(self._fee(xp), N_COINS),
unsafe_mul(4, unsafe_sub(N_COINS, 1))
)
S: uint256 = 0
for _x in amounts:
S += _x
avg: uint256 = unsafe_div(S, N_COINS)
Sdiff: uint256 = 0
for _x in amounts:
if _x > avg:
Sdiff += unsafe_sub(_x, avg)
else:
Sdiff += unsafe_sub(avg, _x)
return fee * Sdiff / S + NOISE_FEE
@internal
@view
def _calc_withdraw_one_coin(
A_gamma: uint256[2],
token_amount: uint256,
i: uint256,
update_D: bool,
) -> (uint256, uint256, uint256[N_COINS], uint256):
token_supply: uint256 = self.totalSupply
assert token_amount <= token_supply, "dev: token amount more than supply"
assert i < N_COINS, "dev: coin out of range"
xx: uint256[N_COINS] = self.balances
precisions: uint256[N_COINS] = self._unpack(self.packed_precisions)
xp: uint256[N_COINS] = precisions
D0: uint256 = 0
# -------------------------- Calculate D0 and xp -------------------------
price_scale_i: uint256 = PRECISION * precisions[0]
packed_prices: uint256 = self.price_scale_packed
xp[0] *= xx[0]
for k in range(1, N_COINS):
p: uint256 = (packed_prices & PRICE_MASK)
if i == k:
price_scale_i = p * xp[i]
xp[k] = unsafe_div(xp[k] * xx[k] * p, PRECISION)
packed_prices = shift(packed_prices, -PRICE_SIZE)
if update_D: # <-------------- D is updated if pool is undergoing a ramp.
D0 = MATH.newton_D(A_gamma[0], A_gamma[1], xp, 0)
else:
D0 = self.D
D: uint256 = D0
# -------------------------------- Fee Calc ------------------------------
# Charge fees on D. Roughly calculate xp[i] after withdrawal and use that
# to calculate fee. Precision is not paramount here: we just want a
# behavior where the higher the imbalance caused the more fee the AMM
# charges.
# xp is adjusted assuming xp[0] ~= xp[1] ~= x[2], which is usually not the
# case -------------------------------------------------------------------
# |
xp_imprecise: uint256[N_COINS] = xp # |
xp_imprecise[i] -= xp[i] * N_COINS * token_amount / D # <----------------
fee: uint256 = self._fee(xp_imprecise)
dD: uint256 = token_amount * D / token_supply
D_fee: uint256 = fee * dD / (2 * 10**10) + 1 # <-------- Actual fee on D.
approx_fee: uint256 = N_COINS * D_fee * xx[i] / D # <---------- Calculate
# `approx_fee` (assuming balanced state) in ith token.
D -= (dD - D_fee) # <----------------------------------- Charge fee on D.
# ------------------------------------------------------------------------
# --------------------------------- Calculate `y_out`` with `(D - D_fee)`.
y: uint256 = MATH.get_y(A_gamma[0], A_gamma[1], xp, D, i)[0]
dy: uint256 = (xp[i] - y) * PRECISION / price_scale_i
xp[i] = y
return dy, D, xp, approx_fee
# ------------------------ ERC20 functions -----------------------------------
@internal
def _approve(_owner: address, _spender: address, _value: uint256):
self.allowance[_owner][_spender] = _value
log Approval(_owner, _spender, _value)
@internal
def _transfer(_from: address, _to: address, _value: uint256):
assert _to not in [self, empty(address)]
self.balanceOf[_from] -= _value
self.balanceOf[_to] += _value
log Transfer(_from, _to, _value)
@view
@internal
def _domain_separator() -> bytes32:
if chain.id != CACHED_CHAIN_ID:
return keccak256(
_abi_encode(
EIP712_TYPEHASH,
NAME_HASH,
VERSION_HASH,
chain.id,
self,
salt,
)
)
return CACHED_DOMAIN_SEPARATOR
@external
def transferFrom(_from: address, _to: address, _value: uint256) -> bool:
"""
@dev Transfer tokens from one address to another.
@param _from address The address which you want to send tokens from
@param _to address The address which you want to transfer to
@param _value uint256 the amount of tokens to be transferred
@return bool True on successul transfer. Reverts otherwise.
"""
_allowance: uint256 = self.allowance[_from][msg.sender]
if _allowance != max_value(uint256):
self._approve(_from, msg.sender, _allowance - _value)
self._transfer(_from, _to, _value)
return True
@external
def transfer(_to: address, _value: uint256) -> bool:
"""
@dev Transfer token for a specified address
@param _to The address to transfer to.
@param _value The amount to be transferred.
@return bool True on successful transfer. Reverts otherwise.
"""
self._transfer(msg.sender, _to, _value)
return True
@external
def approve(_spender: address, _value: uint256) -> bool:
"""
@notice Allow `_spender` to transfer up to `_value` amount
of tokens from the caller's account.
@dev Non-zero to non-zero approvals are allowed, but should
be used cautiously. The methods increaseAllowance + decreaseAllowance
are available to prevent any front-running that may occur.
@param _spender The account permitted to spend up to `_value` amount of
caller's funds.
@param _value The amount of tokens `_spender` is allowed to spend.
@return bool Success
"""
self._approve(msg.sender, _spender, _value)
return True
@external
def increaseAllowance(_spender: address, _add_value: uint256) -> bool:
"""
@notice Increase the allowance granted to `_spender`.
@dev This function will never overflow, and instead will bound
allowance to max_value(uint256). This has the potential to grant an
infinite approval.
@param _spender The account to increase the allowance of.
@param _add_value The amount to increase the allowance by.
@return bool Success
"""
cached_allowance: uint256 = self.allowance[msg.sender][_spender]
allowance: uint256 = unsafe_add(cached_allowance, _add_value)
if allowance < cached_allowance: # <-------------- Check for an overflow.
allowance = max_value(uint256)
if allowance != cached_allowance:
self._approve(msg.sender, _spender, allowance)
return True
@external
def decreaseAllowance(_spender: address, _sub_value: uint256) -> bool:
"""
@notice Decrease the allowance granted to `_spender`.
@dev This function will never underflow, and instead will bound
allowance to 0.
@param _spender The account to decrease the allowance of.
@param _sub_value The amount to decrease the allowance by.
@return bool Success.
"""
cached_allowance: uint256 = self.allowance[msg.sender][_spender]
allowance: uint256 = unsafe_sub(cached_allowance, _sub_value)
if cached_allowance < allowance: # <------------- Check for an underflow.
allowance = 0
if allowance != cached_allowance:
self._approve(msg.sender, _spender, allowance)
return True
@external
def permit(
_owner: address,
_spender: address,
_value: uint256,
_deadline: uint256,
_v: uint8,
_r: bytes32,
_s: bytes32,
) -> bool:
"""
@notice Permit `_spender` to spend up to `_value` amount of `_owner`'s
tokens via a signature.
@dev In the event of a chain fork, replay attacks are prevented as
domain separator is recalculated. However, this is only if the
resulting chains update their chainId.
@param _owner The account which generated the signature and is granting an
allowance.
@param _spender The account which will be granted an allowance.
@param _value The approval amount.
@param _deadline The deadline by which the signature must be submitted.
@param _v The last byte of the ECDSA signature.
@param _r The first 32 bytes of the ECDSA signature.
@param _s The second 32 bytes of the ECDSA signature.
@return bool Success.
"""
assert _owner != empty(address), "dev: invalid owner"
assert block.timestamp <= _deadline, "dev: permit expired"
nonce: uint256 = self.nonces[_owner]
digest: bytes32 = keccak256(
concat(
b"\x19\x01",
self._domain_separator(),
keccak256(
_abi_encode(
EIP2612_TYPEHASH, _owner, _spender, _value, nonce, _deadline
)
),
)
)
assert ecrecover(digest, _v, _r, _s) == _owner, "dev: invalid signature"
self.nonces[_owner] = unsafe_add(nonce, 1) # <-- Unsafe add is safe here.
self._approve(_owner, _spender, _value)
return True
@internal
def mint(_to: address, _value: uint256) -> bool:
"""
@dev Mint an amount of the token and assigns it to an account.
This encapsulates the modification of balances such that the
proper events are emitted.
@param _to The account that will receive the created tokens.
@param _value The amount that will be created.
@return bool Success.
"""
self.totalSupply += _value
self.balanceOf[_to] += _value
log Transfer(empty(address), _to, _value)
return True
@internal
def mint_relative(_to: address, frac: uint256) -> uint256:
"""
@dev Increases supply by factor of (1 + frac/1e18) and mints it for _to
@param _to The account that will receive the created tokens.
@param frac The fraction of the current supply to mint.
@return uint256 Amount of tokens minted.
"""
supply: uint256 = self.totalSupply
d_supply: uint256 = supply * frac / 10**18
if d_supply > 0:
self.totalSupply = supply + d_supply
self.balanceOf[_to] += d_supply
log Transfer(empty(address), _to, d_supply)
return d_supply
@internal
def burnFrom(_to: address, _value: uint256) -> bool:
"""
@dev Burn an amount of the token from a given account.
@param _to The account whose tokens will be burned.
@param _value The amount that will be burned.
@return bool Success.
"""
self.totalSupply -= _value
self.balanceOf[_to] -= _value
log Transfer(_to, empty(address), _value)
return True
# ------------------------- AMM View Functions -------------------------------
@external
@view
def fee_receiver() -> address:
"""
@notice Returns the address of the admin fee receiver.
@return address Fee receiver.
"""
return Factory(self.factory).fee_receiver()
@external
@view
def calc_token_amount(amounts: uint256[N_COINS], deposit: bool) -> uint256:
"""
@notice Calculate LP tokens minted or to be burned for depositing or
removing `amounts` of coins
@dev Includes fee.
@param amounts Amounts of tokens being deposited or withdrawn
@param deposit True if it is a deposit action, False if withdrawn.
@return uint256 Amount of LP tokens deposited or withdrawn.
"""
view_contract: address = Factory(self.factory).views_implementation()
return Views(view_contract).calc_token_amount(amounts, deposit, self)
@external
@view
def get_dy(i: uint256, j: uint256, dx: uint256) -> uint256:
"""
@notice Get amount of coin[j] tokens received for swapping in dx amount of coin[i]
@dev Includes fee.
@param i index of input token. Check pool.coins(i) to get coin address at ith index
@param j index of output token
@param dx amount of input coin[i] tokens
@return uint256 Exact amount of output j tokens for dx amount of i input tokens.
"""
view_contract: address = Factory(self.factory).views_implementation()
return Views(view_contract).get_dy(i, j, dx, self)
@external
@view
def get_dx(i: uint256, j: uint256, dy: uint256) -> uint256:
"""
@notice Get amount of coin[i] tokens to input for swapping out dy amount
of coin[j]
@dev This is an approximate method, and returns estimates close to the input
amount. Expensive to call on-chain.
@param i index of input token. Check pool.coins(i) to get coin address at
ith index
@param j index of output token
@param dy amount of input coin[j] tokens received
@return uint256 Approximate amount of input i tokens to get dy amount of j tokens.
"""
view_contract: address = Factory(self.factory).views_implementation()
return Views(view_contract).get_dx(i, j, dy, self)
@external
@view
@nonreentrant("lock")
def lp_price() -> uint256:
"""
@notice Calculates the current price of the LP token w.r.t coin at the
0th index
@return uint256 LP price.
"""
price_oracle: uint256[N_COINS-1] = self._unpack_prices(
self.price_oracle_packed
)
return (
3 * self.virtual_price * MATH.cbrt(price_oracle[0] * price_oracle[1])
) / 10**24
@external
@view
@nonreentrant("lock")
def get_virtual_price() -> uint256:
"""
@notice Calculates the current virtual price of the pool LP token.
@dev Not to be confused with `self.virtual_price` which is a cached
virtual price.
@return uint256 Virtual Price.
"""
return 10**18 * self.get_xcp(self.D) / self.totalSupply
@external
@view
@nonreentrant("lock")
def price_oracle(k: uint256) -> uint256:
"""
@notice Returns the oracle price of the coin at index `k` w.r.t the coin
at index 0.
@dev The oracle is an exponential moving average, with a periodicity
determined by `self.ma_time`. The aggregated prices are cached state
prices (dy/dx) calculated AFTER the latest trade.
@param k The index of the coin.
@return uint256 Price oracle value of kth coin.
"""
price_oracle: uint256 = self._unpack_prices(self.price_oracle_packed)[k]
last_prices_timestamp: uint256 = self.last_prices_timestamp
if last_prices_timestamp < block.timestamp: # <------------ Update moving
# average if needed.
last_prices: uint256 = self._unpack_prices(self.last_prices_packed)[k]
ma_time: uint256 = self._unpack(self.packed_rebalancing_params)[2]
alpha: uint256 = MATH.wad_exp(
-convert(
(block.timestamp - last_prices_timestamp) * 10**18 / ma_time,
int256,
)
)
return (
min(last_prices, 2 * price_oracle) * (10**18 - alpha) +
price_oracle * alpha # ^---------------- Cap spot price into EMA.
) / 10**18
return price_oracle
@external
@view
def last_prices(k: uint256) -> uint256:
"""
@notice Returns last price of the coin at index `k` w.r.t the coin
at index 0.
@param k The index of the coin.
@return uint256 Last logged price of coin.
"""
return self._unpack_prices(self.last_prices_packed)[k]
@external
@view
def price_scale(k: uint256) -> uint256:
"""
@notice Returns the price scale of the coin at index `k` w.r.t the coin
at index 0.
@dev Price scale determines the price band around which liquidity is
concentrated.
@param k The index of the coin.
@return uint256 Price scale of coin.
"""
return self._unpack_prices(self.price_scale_packed)[k]
@external
@view
def fee() -> uint256:
"""
@notice Returns the fee charged by the pool at current state.
@dev Not to be confused with the fee charged at liquidity action, since
there the fee is calculated on `xp` AFTER liquidity is added or
removed.
@return uint256 fee bps.
"""
return self._fee(self.xp())
@view
@external
def calc_withdraw_one_coin(token_amount: uint256, i: uint256) -> uint256:
"""
@notice Calculates output tokens with fee
@param token_amount LP Token amount to burn
@param i token in which liquidity is withdrawn
@return uint256 Amount of ith tokens received for burning token_amount LP tokens.
"""
return self._calc_withdraw_one_coin(
self._A_gamma(),
token_amount,
i,
(self.future_A_gamma_time > block.timestamp)
)[0]
@external
@view
def calc_token_fee(
amounts: uint256[N_COINS], xp: uint256[N_COINS]
) -> uint256:
"""
@notice Returns the fee charged on the given amounts for add_liquidity.
@param amounts The amounts of coins being added to the pool.
@param xp The current balances of the pool multiplied by coin precisions.
@return uint256 Fee charged.
"""
return self._calc_token_fee(amounts, xp)
@view
@external
def A() -> uint256:
"""
@notice Returns the current pool amplification parameter.
@return uint256 A param.
"""
return self._A_gamma()[0]
@view
@external
def gamma() -> uint256:
"""
@notice Returns the current pool gamma parameter.
@return uint256 gamma param.
"""
return self._A_gamma()[1]
@view
@external
def mid_fee() -> uint256:
"""
@notice Returns the current mid fee
@return uint256 mid_fee value.
"""
return self._unpack(self.packed_fee_params)[0]
@view
@external
def out_fee() -> uint256:
"""
@notice Returns the current out fee
@return uint256 out_fee value.
"""
return self._unpack(self.packed_fee_params)[1]
@view
@external
def fee_gamma() -> uint256:
"""
@notice Returns the current fee gamma
@return uint256 fee_gamma value.
"""
return self._unpack(self.packed_fee_params)[2]
@view
@external
def allowed_extra_profit() -> uint256:
"""
@notice Returns the current allowed extra profit
@return uint256 allowed_extra_profit value.
"""
return self._unpack(self.packed_rebalancing_params)[0]
@view
@external
def adjustment_step() -> uint256:
"""
@notice Returns the current adjustment step
@return uint256 adjustment_step value.
"""
return self._unpack(self.packed_rebalancing_params)[1]
@view
@external
def ma_time() -> uint256:
"""
@notice Returns the current moving average time in seconds
@dev To get time in seconds, the parameter is multipled by ln(2)
@return uint256 ma_time value.
"""
return self._unpack(self.packed_rebalancing_params)[2] * 693 / 1000
@view
@external
def precisions() -> uint256[N_COINS]: # <-------------- For by view contract.
"""
@notice Returns the precisions of each coin in the pool.
@return uint256[3] precisions of coins.
"""
return self._unpack(self.packed_precisions)
@external
@view
def fee_calc(xp: uint256[N_COINS]) -> uint256: # <----- For by view contract.
"""
@notice Returns the fee charged by the pool at current state.
@param xp The current balances of the pool multiplied by coin precisions.
@return uint256 Fee value.
"""
return self._fee(xp)
@view
@external
def DOMAIN_SEPARATOR() -> bytes32:
"""
@notice EIP712 domain separator.
@return bytes32 Domain Separator set for the current chain.
"""
return self._domain_separator()
# ------------------------- AMM Admin Functions ------------------------------
@external
def ramp_A_gamma(
future_A: uint256, future_gamma: uint256, future_time: uint256
):
"""
@notice Initialise Ramping A and gamma parameter values linearly.
@dev Only accessible by factory admin, and only
@param future_A The future A value.
@param future_gamma The future gamma value.
@param future_time The timestamp at which the ramping will end.
"""
assert msg.sender == Factory(self.factory).admin(), "dev: only owner"
assert block.timestamp > self.initial_A_gamma_time + (MIN_RAMP_TIME - 1), "dev: ramp undergoing"
assert future_time > block.timestamp + MIN_RAMP_TIME - 1 # dev: insufficient time
A_gamma: uint256[2] = self._A_gamma()
initial_A_gamma: uint256 = shift(A_gamma[0], 128)
initial_A_gamma = initial_A_gamma | A_gamma[1]
assert future_A > MIN_A - 1
assert future_A < MAX_A + 1
assert future_gamma > MIN_GAMMA - 1
assert future_gamma < MAX_GAMMA + 1
ratio: uint256 = 10**18 * future_A / A_gamma[0]
assert ratio < 10**18 * MAX_A_CHANGE + 1
assert ratio > 10**18 / MAX_A_CHANGE - 1
ratio = 10**18 * future_gamma / A_gamma[1]
assert ratio < 10**18 * MAX_A_CHANGE + 1
assert ratio > 10**18 / MAX_A_CHANGE - 1
self.initial_A_gamma = initial_A_gamma
self.initial_A_gamma_time = block.timestamp
future_A_gamma: uint256 = shift(future_A, 128)
future_A_gamma = future_A_gamma | future_gamma
self.future_A_gamma_time = future_time
self.future_A_gamma = future_A_gamma
log RampAgamma(
A_gamma[0],
future_A,
A_gamma[1],
future_gamma,
block.timestamp,
future_time,
)
@external
def stop_ramp_A_gamma():
"""
@notice Stop Ramping A and gamma parameters immediately.
@dev Only accessible by factory admin.
"""
assert msg.sender == Factory(self.factory).admin(), "dev: only owner"
A_gamma: uint256[2] = self._A_gamma()
current_A_gamma: uint256 = shift(A_gamma[0], 128)
current_A_gamma = current_A_gamma | A_gamma[1]
self.initial_A_gamma = current_A_gamma
self.future_A_gamma = current_A_gamma
self.initial_A_gamma_time = block.timestamp
self.future_A_gamma_time = block.timestamp
# ------ Now (block.timestamp < t1) is always False, so we return saved A.
log StopRampA(A_gamma[0], A_gamma[1], block.timestamp)
@external
def commit_new_parameters(
_new_mid_fee: uint256,
_new_out_fee: uint256,
_new_fee_gamma: uint256,
_new_allowed_extra_profit: uint256,
_new_adjustment_step: uint256,
_new_ma_time: uint256,
):
"""
@notice Commit new parameters.
@dev Only accessible by factory admin.
@param _new_mid_fee The new mid fee.
@param _new_out_fee The new out fee.
@param _new_fee_gamma The new fee gamma.
@param _new_allowed_extra_profit The new allowed extra profit.
@param _new_adjustment_step The new adjustment step.
@param _new_ma_time The new ma time. ma_time is time_in_seconds/ln(2).
"""
assert msg.sender == Factory(self.factory).admin(), "dev: only owner"
assert self.admin_actions_deadline == 0, "dev: active action"
_deadline: uint256 = block.timestamp + ADMIN_ACTIONS_DELAY
self.admin_actions_deadline = _deadline
# ----------------------------- Set fee params ---------------------------
new_mid_fee: uint256 = _new_mid_fee
new_out_fee: uint256 = _new_out_fee
new_fee_gamma: uint256 = _new_fee_gamma
current_fee_params: uint256[3] = self._unpack(self.packed_fee_params)
if new_out_fee < MAX_FEE + 1:
assert new_out_fee > MIN_FEE - 1 # dev: fee is out of range
else:
new_out_fee = current_fee_params[1]
if new_mid_fee > MAX_FEE:
new_mid_fee = current_fee_params[0]
assert new_mid_fee <= new_out_fee # dev: mid-fee is too high
if new_fee_gamma < 10**18:
assert new_fee_gamma > 0 # dev: fee_gamma out of range [1 .. 10**18]
else:
new_fee_gamma = current_fee_params[2]
self.future_packed_fee_params = self._pack(
[new_mid_fee, new_out_fee, new_fee_gamma]
)
# ----------------- Set liquidity rebalancing parameters -----------------
new_allowed_extra_profit: uint256 = _new_allowed_extra_profit
new_adjustment_step: uint256 = _new_adjustment_step
new_ma_time: uint256 = _new_ma_time
current_rebalancing_params: uint256[3] = self._unpack(self.packed_rebalancing_params)
if new_allowed_extra_profit > 10**18:
new_allowed_extra_profit = current_rebalancing_params[0]
if new_adjustment_step > 10**18:
new_adjustment_step = current_rebalancing_params[1]
if new_ma_time < 872542: # <----- Calculated as: 7 * 24 * 60 * 60 / ln(2)
assert new_ma_time > 86 # dev: MA time should be longer than 60/ln(2)
else:
new_ma_time = current_rebalancing_params[2]
self.future_packed_rebalancing_params = self._pack(
[new_allowed_extra_profit, new_adjustment_step, new_ma_time]
)
# ---------------------------------- LOG ---------------------------------
log CommitNewParameters(
_deadline,
new_mid_fee,
new_out_fee,
new_fee_gamma,
new_allowed_extra_profit,
new_adjustment_step,
new_ma_time,
)
@external
@nonreentrant("lock")
def apply_new_parameters():
"""
@notice Apply committed parameters.
@dev Only callable after admin_actions_deadline.
"""
assert block.timestamp >= self.admin_actions_deadline, "dev: insufficient time"
assert self.admin_actions_deadline != 0, "dev: no active action"
self.admin_actions_deadline = 0
packed_fee_params: uint256 = self.future_packed_fee_params
self.packed_fee_params = packed_fee_params
packed_rebalancing_params: uint256 = self.future_packed_rebalancing_params
self.packed_rebalancing_params = packed_rebalancing_params
rebalancing_params: uint256[3] = self._unpack(packed_rebalancing_params)
fee_params: uint256[3] = self._unpack(packed_fee_params)
log NewParameters(
fee_params[0],
fee_params[1],
fee_params[2],
rebalancing_params[0],
rebalancing_params[1],
rebalancing_params[2],
)
@external
def revert_new_parameters():
"""
@notice Revert committed parameters
@dev Only accessible by factory admin. Setting admin_actions_deadline to 0
ensures a revert in apply_new_parameters.
"""
assert msg.sender == Factory(self.factory).admin(), "dev: only owner"
self.admin_actions_deadline = 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment