Skip to content

Instantly share code, notes, and snippets.

@darkerego
Created January 19, 2023 00:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darkerego/86455d4a18f5f30d5944a9d91c2d93ad to your computer and use it in GitHub Desktop.
Save darkerego/86455d4a18f5f30d5944a9d91c2d93ad to your computer and use it in GitHub Desktop.
Enumerating all the token pairs
class DexLoader:
def dump(self, obj, path):
with open(path, 'w') as f:
json.dump(obj, f)
def load(self, path):
with open(path, 'r') as f:
_dex = Dex(**json.load(f))
return _dex
class Factory(NamedTuple):
"""
Object to hold a uniswap v2 Factory contract config
"""
router: str
address: str
name: str
# @dataclasses.dataclass
class Dex(NamedTuple):
"""
Object representing a Dex config
"""
name: str
router_address: str
factory: Factory
pools: list = []
class TokenPair(NamedTuple):
token0: str
token1: str
address: str
decimals_0: int
decimals_1: int
symbol_0: str
symbol_1: str
factory: Factory
@property
def name(self):
return '%s_%s' % (self.symbol_0, self.symbol_1)
class LiquidityPool(NamedTuple):
pair: TokenPair
@property
def name(self):
return '%s_%s' % (self.pair.symbol_0, self.pair.symbol_1)
class DexPair(NamedTuple):
dex1: Dex
dex2: Dex
pool_pairs: list = []
def generate_all_pool_pairs(self):
for pool in self.dex1.pools:
for _pool in self.dex2.pools:
if _pool.pair.name == pool.pair.name:
pool_pair = PoolPair(pool, _pool)
if not self.pool_pairs.__contains__(pool_pair):
self.pool_pairs.append(pool_pair)
@property
def name(self):
return self.dex1.name + '_' + self.dex2.name
def random_pool_pair(self):
return random.choice(self.pool_pairs)
@dataclasses.dataclass
class InitializedLiquidityPool:
def __init__(self, pool: LiquidityPool, w3: web3.Web3):
self.pair = pool.pair
self.reserve_0 = 0
self.reserve_1 = 0
self.w3 = w3
self.name = '%s_%s' % (self.pair.symbol_0, self.pair.symbol_1)
def update_reserves(self):
pair_address = self.pair.address
pair_contract = self.w3.eth.contract(to_checksum_address(pair_address), abi=abi_lib.UNI_V2_PAIR)
reserves = pair_contract.functions.getReserves().call()
self.reserve_0 = int(reserves[0])
self.reserve_1 = int(reserves[1])
def reserves(self):
token0_reserve = self.reserve_0 # / (10 ** pair.decimals_0)
token1_reserve = self.reserve_1 # / (10 ** pair.decimals_1)
return token0_reserve, token1_reserve
def quote_price(self):
token0_reserve = self.reserve_0
token1_reserve = self.reserve_1
price = (token0_reserve / 10 ** self.pair.decimals_0) / (token1_reserve / self.pair.decimals_1)
return price
def base_price(self):
token0_reserve = self.reserve_0
token1_reserve = self.reserve_1
price = (token1_reserve / self.pair.decimals_1) / (token0_reserve / 10 ** self.pair.decimals_0)
return price
def calculate_swap(self, qty=0, max_impact=0.01, token=0):
"""
Calculate amount received and execution price for a swap
with price impact.
Pool info
USDC = 2,000,000
ETH = 1,000
Constant Product = 2,000,000,000
Market Price = 2,000
First example, 10,000 USDC for ETH
After swap
USDC = 2,010,000 (because we added 10,000 to the pool)
Constant Product = 2,000,000,000 (stays the same)
ETH = 995.024 (constant product / new usdc amount)
ETH recieved = 4.976 (old eth amount - new eth amount)
Price paid per ETH = 2009.64 USDC
ETH recieved = 4.976 (old eth amount - new eth amount)
Price impact = 0.48%
:param qty: of sell token
:param token: index of sell token
:return: amount received, price paid, price impact, qty
"""
fee = 0.003
r0, r1 = self.reserves()
r0 = (r0 / (10 ** self.pair.decimals_0))
r1 = (r1 / (10 ** self.pair.decimals_1))
constant_product = r0 * r1
token0_price = r1 / r0
token1_price = r0 / r1
if qty == 0 and token == 0:
qty = r0 * max_impact / ((1 - max_impact) * (1 - fee))
if qty == 0 and token == 1:
qty = r1 * max_impact / ((1 - max_impact) * (1 - fee))
if token == 0:
old_price = token1_price
new_r0 = r0 + (qty * (1 - fee))
new_constant_product = constant_product + (1 - (qty * (1 - fee)))
new_r1 = new_constant_product / new_r0
new_price = new_r0 / new_r1
price_impact = ((new_price / old_price) - 1) * 100
received = r1 - new_r1
return received, new_price, price_impact, qty
else:
old_price = token0_price
new_r1 = r1 + (qty * (1 - fee))
new_constant_product = constant_product + (1 - (qty * (1 - fee)))
new_r0 = new_constant_product / new_r1
new_price = new_r1 / new_r0
price_impact = ((new_price / old_price) - 1) * 100
received = r0 - new_r0
return received, new_price, price_impact, qty
def _as_dict(self):
return self.__dict__
class PoolPair:
def __init__(self, pool1: InitializedLiquidityPool, pool2: InitializedLiquidityPool):
self.pool1 = pool1
self.pool2 = pool2
self.order_qty: int = 0
self.updated_at: float = time.time()
@property
def name(self):
return self.pool1.name + '_' + self.pool2.name
def update_reserves(self):
self.pool1.update_reserves()
self.pool2.update_reserves()
self.updated_at = time.time()
def _as_dict(self):
return {'pool1': self.pool1, 'pool2': self.pool2, 'order_qty': self.order_qty, 'updated_at': self.updated_at,
'name': self.name}
class ConfigCreate:
def __init__(self, config: dict):
self.config = config
self.dexes = []
def get_pair(self, token1, token2, factory, dec_1, dec_2, sym_1, sym_2):
token1 = to_checksum_address(token1)
token2 = to_checksum_address(token2)
_original_pair_order = [token1, token2]
pair_order = sorted([token1, token2])
if pair_order != _original_pair_order:
token1 = pair_order[1]
token2 = pair_order[0]
else:
token1 = pair_order[0]
token2 = pair_order[1]
contract = self.w3.eth.contract(factory.address, abi=abi_lib.UNI_V2_FACTORY)
pair_address = self.wrap_contract_call(contract.functions.getPair(token1, token2))
if pair_address == "0x0000000000000000000000000000000000000000" or not pair_address:
print('Pair does not exist!')
return None
print(pair_address)
return TokenPair(token1, token2, pair_address, dec_1, dec_2,
sym_1, sym_2, factory)
def create_configs(self):
print('[~] Enumerating Routers ... ')
for router in self.config.get('routers'):
router_address = to_checksum_address(router.get('address'))
router_name = router.get('dex')
factory_addr = self.get_factory(router_address)
_factory = Factory(router, factory_addr, router_name)
self.factories.append(_factory)
dex = Dex(router_name, router_address, _factory)
self.dexes.append(dex)
print('[~] Enumerating token pairs ... ')
for base in self.config.get('amounts'):
base_symbol = base.get('sym')
base_address = base.get('asset')
base_decimals = base.get('decimals')
for quote in self.config.get('tokens'):
quote_symbol = quote.get('sym')
quote_address = quote.get('address')
if base_address != quote_address:
print(f'Creating {base_symbol}:{quote_symbol}')
quote_decimals = quote.get('decimals')
for factory_ in self.factories:
_pair = self.get_pair(token1=base_address, token2=quote_address, factory=factory_,
dec_1=base_decimals, dec_2=quote_decimals, sym_1=base_symbol,
sym_2=quote_symbol)
if _pair:
self.pairs.append(_pair)
_pool = self.create_pool(_pair)
self.pools.append(_pool)
self.add_pool_to_dex(_pool)
for _dex in self.dexes:
print(_dex.name, _dex._asdict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment