Skip to content

Instantly share code, notes, and snippets.

@BdVade
Last active May 3, 2022 01:09
Show Gist options
  • Save BdVade/6fa19c43e62d3e3a461d6116dabebb47 to your computer and use it in GitHub Desktop.
Save BdVade/6fa19c43e62d3e3a461d6116dabebb47 to your computer and use it in GitHub Desktop.
Two Scripts to compile the volume traded of a token on a pool from it's AMM . 1 through Solscan The other through an RPC Node
import requests
import time
import json
def get_amm_historical_volume_from_rpc_node(amm_address: str):
url = 'https://api.mainnet-beta.solana.com/'
rerun = True
volumes = []
volume = 0
c = 0
body = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [
amm_address,
{
"limit": 1000
}
]
}
headers = {
"Content-Type": "application/json",
"Accept": "*/*"
}
while rerun:
try:
response = requests.post(url, data=json.dumps(body), headers=headers)
except requests.exceptions.ConnectionError:
time.sleep(5)
continue
except ConnectionError:
time.sleep(5)
continue
try:
data = response.json().get("result")
except Exception as e:
print(e)
time.sleep(5)
continue
rerun = True if len(data) >= 1000 else False
for item in data:
while True:
try:
item_response = requests.post(url, data=json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
item.get("signature"),
"json"
]
}), headers=headers).json()
if item_response.get("result").get("meta").get("err"):
pass
else:
after = item_response.get("result").get("meta").get("postBalances")[0]
before = item_response.get("result").get("meta").get("preBalances")[0]
balance_difference = abs(after-before)
volume += balance_difference
volumes.append(balance_difference)
print(item_response)
break
except Exception as e:
print(f"Attempt to get signature deets. Error is : {e}")
time.sleep(3)
continue
body = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [
amm_address,
{
"limit": 1000,
"before": data[-1].get("signature")
}
]
}
c += 1
print(c)
print(volume)
print(f"the volume is {sum(volumes)}")
return volumes
get_amm_historical_volume_from_rpc_node("7XawhbbxtsRcQA8KTkHT9f9nc6d69UwqCDh6U5EEbEmX")
import requests
import time
def get_amm_historical_volume(amm_address: str):
url = f"https://api.solscan.io/amm/txs?address={amm_address}&type=swap&offset=0&limit=10&" \
f"&tx_status=success"
rerun = True
volumes = []
volume = 0
c = 0
while rerun:
try:
response = requests.get(url)
except requests.exceptions.ConnectionError:
time.sleep(5)
continue
except ConnectionError:
time.sleep(5)
continue
try:
data = response.json().get("data")
except Exception as e:
print(e)
time.sleep(5)
continue
rerun = (data.get("hasNext"))
items = data.get("items")
for item in items:
volumes.append(float(item.get("volume")))
volume += float(item.get("volume"))
unix_time = items[-1].get("blockUnixTime")
_id = items[-1].get("_id")
url = f"https://api.solscan.io/amm/txs?address={amm_address}&type=swap&offset=0&limit=10&" \
f"&tx_status=success&before_id={_id}&before_time={unix_time}"
c += 1
print(c)
print(volume)
print(sum(volumes))
return volumes
get_amm_historical_volume("7XawhbbxtsRcQA8KTkHT9f9nc6d69UwqCDh6U5EEbEmX")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment