Skip to content

Instantly share code, notes, and snippets.

@ricardocchaves
Last active March 25, 2021 00:56
Show Gist options
  • Save ricardocchaves/78eb16d9d4680a856bf99d6cfdc04bc8 to your computer and use it in GitHub Desktop.
Save ricardocchaves/78eb16d9d4680a856bf99d6cfdc04bc8 to your computer and use it in GitHub Desktop.
IPFS - Get download progress of reachable peers for a CID
#!/usr/bin/python3
# Eg. command: ./ipfs_progress.py QmV5z4Wv7ENX5KoLuBHAbBVg8Yb9ibFM9xYMWb1gLWn93w
import subprocess as sp
import sys
import threading
import concurrent.futures as thr
import curses
import time
from datetime import datetime
class ProgressRegister:
def __init__(self, blocks):
self._lock = threading.Lock()
self.reg = {} # {peer:[]block}
self.blocks = blocks
def add(self,peer,block):
# Lock thread to write
with self._lock:
if peer in self.reg:
if block not in self.reg[peer]:
self.reg[peer].append(block)
else:
self.reg[peer] = [block]
def __str__(self):
ret = ""
# Lock thread to read. Not 100% necessary, as it's a read.
with self._lock:
for k in sorted(self.reg.keys()):
ret += "{}: {}/{}\n".format(k, len(self.reg[k]), len(self.blocks))
return ret.strip() # remove last '\n'
def __repr__(self):
return self.__str__()
def run(args):
block, reg = args
cmd = "ipfs dht findprovs {}"
out = sp.check_output(cmd.format(block).split(), stderr=sp.STDOUT, timeout=3)
#proc = sp.Popen(cmd.format(block).split(), stdout=sp.PIPE)
#out, err = proc.communicate()
peers = out.decode("utf-8").strip().split("\n")
for peer in peers:
reg.add(peer,block)
##### Handling arguments
USAGE="USAGE: {} CID -curses".format(sys.argv[0])
print(USAGE)
if len(sys.argv) < 2: # no arguments
print("invalid arguments")
sys.exit(0)
CID=sys.argv[1]
USE_CURSES=False
if len(sys.argv) == 3 and "curses" in sys.argv[2]:
USE_CURSES=True
##### Getting unique blocks for CID
cmd = "ipfs refs -r -u {}".format(CID)
proc = sp.Popen(cmd.split(), stdout=sp.PIPE)
out, _ = proc.communicate()
out = out.decode("utf-8")
blocks = []
for block in out.split("\n"):
if len(block) < 1:
continue
blocks.append(block)
if len(blocks) == 0:
print("0 blocks on CID {}".format(CID))
sys.exit(0)
##### Init curses for printing
scr = None
if USE_CURSES:
scr = curses.initscr()
curses.noecho()
curses.cbreak()
reg = ProgressRegister(blocks)
print("Loading...")
try:
while True:
##### Launching DHT FINDPROVS for each block
args = ((block, reg) for block in blocks)
with thr.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(run, args)
##### Display progress
now = datetime.now()
d = now.strftime("%d/%m/%Y %H:%M:%S")
if USE_CURSES:
scr.addstr(0,0,"{} LEGEND: PeerID: downloadedBlocks / Total".format(d))
for i,str in enumerate("{}".format(reg).split('\n')):
if USE_CURSES:
scr.addstr(i+1,0,str)
if USE_CURSES:
scr.refresh()
else:
print(d,reg)
time.sleep(1)
except:
if USE_CURSES:
curses.echo()
curses.nocbreak()
curses.endwin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment