Created
January 19, 2023 00:56
-
-
Save darkerego/860d9b9f5ef72a4e740d3bedb335a94a to your computer and use it in GitHub Desktop.
Enumerating all the token pairs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DexLoader: | |
def dump(self, obj, path): | |
with open(path, 'w') as f: | |
json.dump(obj, f) | |
def load(self, path): | |
with open(path, 'r') as f: | |
_dex = Dex(**json.load(f)) | |
return _dex | |
class Factory(NamedTuple): | |
""" | |
Object to hold a uniswap v2 Factory contract config | |
""" | |
router: str | |
address: str | |
name: str | |
# @dataclasses.dataclass | |
class Dex(NamedTuple): | |
""" | |
Object representing a Dex config | |
""" | |
name: str | |
router_address: str | |
factory: Factory | |
pools: list = [] | |
class TokenPair(NamedTuple): | |
token0: str | |
token1: str | |
address: str | |
decimals_0: int | |
decimals_1: int | |
symbol_0: str | |
symbol_1: str | |
factory: Factory | |
@property | |
def name(self): | |
return '%s_%s' % (self.symbol_0, self.symbol_1) | |
class LiquidityPool(NamedTuple): | |
pair: TokenPair | |
@property | |
def name(self): | |
return '%s_%s' % (self.pair.symbol_0, self.pair.symbol_1) | |
class DexPair(NamedTuple): | |
dex1: Dex | |
dex2: Dex | |
pool_pairs: list = [] | |
def generate_all_pool_pairs(self): | |
for pool in self.dex1.pools: | |
for _pool in self.dex2.pools: | |
if _pool.pair.name == pool.pair.name: | |
pool_pair = PoolPair(pool, _pool) | |
if not self.pool_pairs.__contains__(pool_pair): | |
self.pool_pairs.append(pool_pair) | |
@property | |
def name(self): | |
return self.dex1.name + '_' + self.dex2.name | |
def random_pool_pair(self): | |
return random.choice(self.pool_pairs) | |
@dataclasses.dataclass | |
class InitializedLiquidityPool: | |
def __init__(self, pool: LiquidityPool, w3: web3.Web3): | |
self.pair = pool.pair | |
self.reserve_0 = 0 | |
self.reserve_1 = 0 | |
self.w3 = w3 | |
self.name = '%s_%s' % (self.pair.symbol_0, self.pair.symbol_1) | |
def update_reserves(self): | |
pair_address = self.pair.address | |
pair_contract = self.w3.eth.contract(to_checksum_address(pair_address), abi=abi_lib.UNI_V2_PAIR) | |
reserves = pair_contract.functions.getReserves().call() | |
self.reserve_0 = int(reserves[0]) | |
self.reserve_1 = int(reserves[1]) | |
def reserves(self): | |
token0_reserve = self.reserve_0 # / (10 ** pair.decimals_0) | |
token1_reserve = self.reserve_1 # / (10 ** pair.decimals_1) | |
return token0_reserve, token1_reserve | |
def quote_price(self): | |
token0_reserve = self.reserve_0 | |
token1_reserve = self.reserve_1 | |
price = (token0_reserve / 10 ** self.pair.decimals_0) / (token1_reserve / self.pair.decimals_1) | |
return price | |
def base_price(self): | |
token0_reserve = self.reserve_0 | |
token1_reserve = self.reserve_1 | |
price = (token1_reserve / self.pair.decimals_1) / (token0_reserve / 10 ** self.pair.decimals_0) | |
return price | |
def calculate_swap(self, qty=0, max_impact=0.01, token=0): | |
""" | |
Calculate amount received and execution price for a swap | |
with price impact. | |
Pool info | |
USDC = 2,000,000 | |
ETH = 1,000 | |
Constant Product = 2,000,000,000 | |
Market Price = 2,000 | |
First example, 10,000 USDC for ETH | |
After swap | |
USDC = 2,010,000 (because we added 10,000 to the pool) | |
Constant Product = 2,000,000,000 (stays the same) | |
ETH = 995.024 (constant product / new usdc amount) | |
ETH recieved = 4.976 (old eth amount - new eth amount) | |
Price paid per ETH = 2009.64 USDC | |
ETH recieved = 4.976 (old eth amount - new eth amount) | |
Price impact = 0.48% | |
:param qty: of sell token | |
:param token: index of sell token | |
:return: amount received, price paid, price impact, qty | |
""" | |
fee = 0.003 | |
r0, r1 = self.reserves() | |
r0 = (r0 / (10 ** self.pair.decimals_0)) | |
r1 = (r1 / (10 ** self.pair.decimals_1)) | |
constant_product = r0 * r1 | |
token0_price = r1 / r0 | |
token1_price = r0 / r1 | |
if qty == 0 and token == 0: | |
qty = r0 * max_impact / ((1 - max_impact) * (1 - fee)) | |
if qty == 0 and token == 1: | |
qty = r1 * max_impact / ((1 - max_impact) * (1 - fee)) | |
if token == 0: | |
old_price = token1_price | |
new_r0 = r0 + (qty * (1 - fee)) | |
new_constant_product = constant_product + (1 - (qty * (1 - fee))) | |
new_r1 = new_constant_product / new_r0 | |
new_price = new_r0 / new_r1 | |
price_impact = ((new_price / old_price) - 1) * 100 | |
received = r1 - new_r1 | |
return received, new_price, price_impact, qty | |
else: | |
old_price = token0_price | |
new_r1 = r1 + (qty * (1 - fee)) | |
new_constant_product = constant_product + (1 - (qty * (1 - fee))) | |
new_r0 = new_constant_product / new_r1 | |
new_price = new_r1 / new_r0 | |
price_impact = ((new_price / old_price) - 1) * 100 | |
received = r0 - new_r0 | |
return received, new_price, price_impact, qty | |
def _as_dict(self): | |
return self.__dict__ | |
class PoolPair: | |
def __init__(self, pool1: InitializedLiquidityPool, pool2: InitializedLiquidityPool): | |
self.pool1 = pool1 | |
self.pool2 = pool2 | |
self.order_qty: int = 0 | |
self.updated_at: float = time.time() | |
@property | |
def name(self): | |
return self.pool1.name + '_' + self.pool2.name | |
def update_reserves(self): | |
self.pool1.update_reserves() | |
self.pool2.update_reserves() | |
self.updated_at = time.time() | |
def _as_dict(self): | |
return {'pool1': self.pool1, 'pool2': self.pool2, 'order_qty': self.order_qty, 'updated_at': self.updated_at, | |
'name': self.name} | |
class ConfigCreate: | |
def __init__(self, config: dict): | |
self.config = config | |
self.dexes = [] | |
def get_pair(self, token1, token2, factory, dec_1, dec_2, sym_1, sym_2): | |
token1 = to_checksum_address(token1) | |
token2 = to_checksum_address(token2) | |
_original_pair_order = [token1, token2] | |
pair_order = sorted([token1, token2]) | |
if pair_order != _original_pair_order: | |
token1 = pair_order[1] | |
token2 = pair_order[0] | |
else: | |
token1 = pair_order[0] | |
token2 = pair_order[1] | |
contract = self.w3.eth.contract(factory.address, abi=abi_lib.UNI_V2_FACTORY) | |
pair_address = self.wrap_contract_call(contract.functions.getPair(token1, token2)) | |
if pair_address == "0x0000000000000000000000000000000000000000" or not pair_address: | |
print('Pair does not exist!') | |
return None | |
print(pair_address) | |
return TokenPair(token1, token2, pair_address, dec_1, dec_2, | |
sym_1, sym_2, factory) | |
def create_configs(self): | |
print('[~] Enumerating Routers ... ') | |
for router in self.config.get('routers'): | |
router_address = to_checksum_address(router.get('address')) | |
router_name = router.get('dex') | |
factory_addr = self.get_factory(router_address) | |
_factory = Factory(router, factory_addr, router_name) | |
self.factories.append(_factory) | |
dex = Dex(router_name, router_address, _factory) | |
self.dexes.append(dex) | |
print('[~] Enumerating token pairs ... ') | |
for base in self.config.get('amounts'): | |
base_symbol = base.get('sym') | |
base_address = base.get('asset') | |
base_decimals = base.get('decimals') | |
for quote in self.config.get('tokens'): | |
quote_symbol = quote.get('sym') | |
quote_address = quote.get('address') | |
if base_address != quote_address: | |
print(f'Creating {base_symbol}:{quote_symbol}') | |
quote_decimals = quote.get('decimals') | |
for factory_ in self.factories: | |
_pair = self.get_pair(token1=base_address, token2=quote_address, factory=factory_, | |
dec_1=base_decimals, dec_2=quote_decimals, sym_1=base_symbol, | |
sym_2=quote_symbol) | |
if _pair: | |
self.pairs.append(_pair) | |
_pool = self.create_pool(_pair) | |
self.pools.append(_pool) | |
self.add_pool_to_dex(_pool) | |
for _dex in self.dexes: | |
print(_dex.name, _dex._asdict) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment