Skip to content

Instantly share code, notes, and snippets.

@azinazadi
Created April 1, 2022 20:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azinazadi/ef2a655810b3320e9d879103ca723018 to your computer and use it in GitHub Desktop.
Save azinazadi/ef2a655810b3320e9d879103ca723018 to your computer and use it in GitHub Desktop.
python sftp client to download files from remote in concurrent threads
import os
from threading import Thread
from time import sleep
import pysftp
from stat import S_IMODE, S_ISDIR, S_ISREG
def get_r_portable(remotedir, localdir, num_threads = 10):
print(remotedir)
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
def _s(i):
print(str(i) + " Connecting")
c = pysftp.Connection('pi.local', username='zinoo', password='poi', cnopts=cnopts)
print(str(i) + " Connected")
return c
sftps = []
threads = []
for i in range(num_threads):
t = Thread(target=lambda i: sftps.append(_s(i)), args=(i,))
threads.append(t)
t.start()
dirq, fileq = traverse(sftps, remotedir, localdir, num_threads)
for d in dirq:
try:
os.mkdir(d)
except OSError:
pass
def copy_files(sf):
while True:
if len(fileq) == 0:
return
r, l = fileq.pop()
sf.get(r, l, preserve_mtime=True)
print(f"remaining ( {len(fileq)} ): Copied: {l}")
threads = []
for i in range(num_threads):
t = Thread(target=copy_files, args=(sftps[i], ))
threads.append(t)
t.start()
# wait for the threads to complete
for t in threads:
t.join()
def traverse(sftps: list[pysftp.Connection], remotedir, localdir, num_threads=10):
dirq = []
fileq = []
to_traverse = [(remotedir, localdir)]
threads = []
active = [False for i in range(num_threads)]
finished = False
def tr(i):
while True:
if finished:
return
if (len(to_traverse) == 0) or len(sftps) <= i:
print(str(i) + " waiting")
sleep(3)
continue
sf = sftps[i]
active[i] = True
(rd, ld) = to_traverse.pop(0)
for entry in sf.listdir(rd):
remotepath = rd + "/" + entry
localpath = os.path.join(ld, entry)
print(str(i) + " : " + remotepath)
stat = sf.stat(remotepath)
mode = stat.st_mode
if S_ISDIR(mode):
dirq.append(localpath)
to_traverse.append((remotepath, localpath))
elif S_ISREG(mode):
if stat.st_mtime > os.path.getmtime(localpath) + 1:
fileq.append((remotepath, localpath))
active[i] = False
for (i) in range(num_threads):
t = Thread(target=tr, args=(i,))
threads.append(t)
t.start()
while not finished:
if len(to_traverse) > 0:
continue
finished = not (True in active)
sleep(0.1)
for t in threads:
t.join()
return dirq, fileq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment