Skip to content

Instantly share code, notes, and snippets.

@andynog
Last active July 5, 2023 18:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andynog/67e722dd10007aeee67b481740ddd26d to your computer and use it in GitHub Desktop.
Save andynog/67e722dd10007aeee67b481740ddd26d to your computer and use it in GitHub Desktop.
Generate a report about the genesis file size per Cosmos chain
from urllib.parse import urljoin, urlparse
import json
import shutil
import zipfile
import gzip
import tarfile
import requests
import os
import glob
from datetime import datetime
from prettytable import PrettyTable
x = PrettyTable()
def get_chain_info(chain):
url = 'https://chains.cosmos.directory/' + chain + '/chain'
payload = {}
headers = {}
response = requests.request('GET', url, headers=headers, data=payload)
return response.json()
def get_chains():
url = 'https://chains.cosmos.directory/'
payload = {}
headers = {}
response = requests.request('GET', url, headers=headers, data=payload)
return response.json()
def check_if_downloaded(name):
dir_path = os.path.join(os.getcwd(), 'genesis', name)
genesis_file_path = os.path.join(dir_path, 'genesis.json')
if not os.path.exists(genesis_file_path):
return False
else:
return True
def fetch_genesis(genesis_location, name):
genesis_file_path = None
try:
# remove any querystrings or parameters
genesis_location = parsed = urljoin(genesis_location, urlparse(genesis_location).path)
# check if not raw GitHub location, if so replace with raw location
if '/blob/' in genesis_location:
genesis_location = genesis_location.replace('/blob/', '/raw/')
response = requests.get(genesis_location)
except:
print('failed to download genesis from', genesis_location, 'for', name, 'chain')
return None
else:
headers = response.headers
if response.status_code == 200:
if not genesis_location.endswith('.json'):
# check type of compression
if genesis_location.endswith('.tar.gz'):
print('compressed .tar.gz', genesis_location)
save_tar_gz_file(name, response.content)
elif genesis_location.endswith('.gz'):
print('compressed .gz', genesis_location)
save_gz_file(name, response.content)
elif genesis_location.endswith('.zip'):
print('compressed .zip', genesis_location)
save_zip_file(name, response.content)
else:
# check if it's json
try:
json.loads(response.text)
print('other is json', genesis_location)
except:
print('failed to parse other as json', genesis_location)
genesis_file_path = save_genesis_file(name, response.content)
else:
try:
json.loads(response.text)
print('json', genesis_location)
except:
print('failed to parse as json', genesis_location)
genesis_file_path = save_genesis_file(name, response.content)
else:
print('error fetching with http status', response.status_code, 'for chain', chain_name)
return genesis_file_path
def save_genesis_file(name, content):
dir_path = os.path.join(os.getcwd(), 'genesis', name)
genesis_file_path = os.path.join(dir_path, 'genesis.json')
if not os.path.exists(dir_path):
os.makedirs(dir_path)
with open(genesis_file_path, 'wb') as f:
f.write(content)
print('saved genesis for', name)
return genesis_file_path
def save_zip_file(name, content):
dir_path = os.path.join(os.getcwd(), 'compressed', name)
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.zip')
if not os.path.exists(dir_path):
os.makedirs(dir_path)
with open(compressed_genesis_file_path, 'wb') as f:
f.write(content)
with zipfile.ZipFile(compressed_genesis_file_path, 'r') as zip_ref:
zip_ref.extractall(dir_path)
print('uncompressed zip', compressed_genesis_file_path)
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json')
file_content = open(unzipped_genesis_file_path, "rb")
data = file_content.read()
save_genesis_file(name, data)
print('saved compressed genesis for', name)
return compressed_genesis_file_path
def save_tar_gz_file(name, content):
dir_path = os.path.join(os.getcwd(), 'compressed', name)
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.tar.gz')
if not os.path.exists(dir_path):
os.makedirs(dir_path)
with open(compressed_genesis_file_path, 'wb') as f:
f.write(content)
with tarfile.open(compressed_genesis_file_path, 'r') as tar_file:
tar_file.extractall(dir_path)
print('uncompressed tar', compressed_genesis_file_path)
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json')
if not os.path.exists(unzipped_genesis_file_path):
for genesis in glob.glob(dir_path + "/*genesis.json"):
unzipped_genesis_file_path = genesis
file_content = open(unzipped_genesis_file_path, "rb")
data = file_content.read()
save_genesis_file(name, data)
print('saved compressed genesis for', name)
return compressed_genesis_file_path
def save_gz_file(name, content):
dir_path = os.path.join(os.getcwd(), 'compressed', name)
compressed_genesis_file_path = os.path.join(dir_path, 'genesis.gz')
if not os.path.exists(dir_path):
os.makedirs(dir_path)
with open(compressed_genesis_file_path, 'wb') as f:
f.write(content)
unzipped_genesis_file_path = os.path.join(dir_path, 'genesis.json')
with gzip.open(compressed_genesis_file_path, 'r') as gzip_file, \
open(unzipped_genesis_file_path, 'wb') as unzipped_file:
shutil.copyfileobj(gzip_file, unzipped_file)
print('uncompressed gz', unzipped_genesis_file_path)
file_content = open(unzipped_genesis_file_path, "rb")
data = file_content.read()
save_genesis_file(name, data)
print('saved compressed genesis for', name)
return compressed_genesis_file_path
def get_file_size(genesis_path):
return os.stat(genesis_path).st_size / (1024 * 1024)
def get_local_genesis_info():
local_genesis_info = []
genesis_root = os.path.join(os.getcwd(), 'genesis')
for root, dirs, files in os.walk(genesis_root):
for name in files:
file_path = os.path.join(root, name)
chain = os.path.basename(root)
local_genesis_info.append({'name': chain, 'file': file_path})
return local_genesis_info
def print_table():
print('\ngenesis info as of', datetime.now().strftime('%d/%b/%Y, %H:%M:%S'))
total_size = 0
num_genesis = 0
local_genesis_info = get_local_genesis_info()
x.field_names = ['chain', 'genesis size (MB)']
for gi in local_genesis_info:
num_genesis += 1
size = get_file_size(gi['file'])
total_size = total_size + size
x.add_row([gi['name'], '{:.3f}'.format(size)])
x.align['chain'] = 'l'
x.align['genesis size (MB)'] = 'r'
x.sortby = 'genesis size (MB)'
x.sort_key = lambda row: float(row[0])
x.reversesort = True
print(to_markdown_table(x))
# print(pt.get_string(sortby='genesis size (MB)', reversesort=True, sort_key=lambda row: float(row[0])))
avg = total_size / num_genesis
print('\naverage genesis size is', '{:.3f}'.format(avg), 'MB across', num_genesis, 'chains\n')
def to_markdown_table(pt):
_junc = pt.junction_char
if _junc != "|":
pt.junction_char = "|"
markdown = [row[1:-1] for row in pt.get_string().split("\n")[1:-1]]
pt.junction_char = _junc
return "\n".join(markdown)
if __name__ == '__main__':
chains_response = get_chains()
for chain in chains_response['chains']:
chain_name = chain['chain_name']
if not check_if_downloaded(chain_name):
chain_info = get_chain_info(chain_name)
genesis_url = chain_info['codebase']['genesis']['genesis_url']
if not genesis_url:
print('no genesis information for', chain_name, 'chain')
else:
fetch_genesis(genesis_url, chain_name)
else:
print('already downloaded genesis for', chain_name)
print_table()
print('finished!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment