Skip to content

Instantly share code, notes, and snippets.

@caiaffa
Last active June 1, 2019 19:59
Show Gist options
  • Save caiaffa/d6b768a607195700169017520c65f166 to your computer and use it in GitHub Desktop.
Save caiaffa/d6b768a607195700169017520c65f166 to your computer and use it in GitHub Desktop.
import sys
from contextlib import contextmanager
import paramiko
sys.tracebacklimit = 0
class FtpClient:
def __init__(self, hostname, username, password, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
@contextmanager
def connect(self):
ftp = paramiko.SSHClient()
error = None
try:
ftp.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
ftp.load_system_host_keys()
ftp.connect(
hostname=self.hostname,
username=self.username,
password=self.password,
port=self.port
)
ftp = ftp.open_sftp()
yield ftp
except Exception as _error:
error = _error
finally:
ftp.close()
if error:
raise error
class SftpClient:
_connection = None
def __init__(self, hostname, username, password, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.create_connection(self.hostname, self.username, self.password, self.port)
@classmethod
def create_connection(cls, hostname, username, password, port):
try:
# Connect SFTP client
sftp = paramiko.SSHClient()
sftp.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
sftp.load_system_host_keys()
sftp.connect(hostname=hostname, username=username, password=password, port=port)
sftp = sftp.open_sftp()
cls._connection = sftp
except Exception as error:
print(f'An error occurred creating SFTP client: {error}')
def close(self):
self._connection.close()
import paramiko
import time
class SftpClient:
def __init__(self, hostname, username, password, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.ssh_client = None
self.retry = 3
self.connect()
self.sftp = self.ssh_client.open_sftp()
def connect(self):
while self.ssh_client is None:
if self.retry != 3:
time.sleep(5)
self.connect_ssh()
self.retry -= 1
if not self.retry:
raise Exception('It was not possible to stabilish an ftp connection')
def connect_ssh(self):
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
self.ssh_client.load_system_host_keys()
self.ssh_client.connect(
hostname=self.hostname, username=self.username, password=self.password, port=self.port
)
except paramiko.ssh_exception.NoValidConnectionsError as _error:
self.ssh_client = None
except Exception as _error:
raise _error
def close(self):
if self.ssh_client:
self.sftp.close()
self.ssh_client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
import paramiko
class SftpClient:
def __init__(self, hostname, username, password, port=22):
self.hostname = hostname
self.username = username
self.password = password
self.port = port
self.ssh_client = self.connect_ssh()
self.sftp = self.ssh_client.open_sftp()
def connect_ssh(self):
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy)
ssh_client.load_system_host_keys()
ssh_client.connect(hostname=self.hostname, username=self.username, password=self.password, port=self.port)
return ssh_client
def list_dir(self, path='.'):
return self.sftp.listdir(path)
def download(self, remote_path, local_path):
self.sftp.get(remote_path, local_path)
def close(self):
self.sftp.close()
self.ssh_client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment