Skip to content

Instantly share code, notes, and snippets.

@costastf
Created April 14, 2018 11:02
Show Gist options
  • Save costastf/a7768ce76f9df0683d58c91f9681565c to your computer and use it in GitHub Desktop.
Save costastf/a7768ce76f9df0683d58c91f9681565c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
# -*- coding: UTF-8 -*-
# File: ScpWrapper
#
__author__ = 'Costas Tyfoxylos <costas.tyf@gmail.com>'
__docformat__ = 'plaintext'
__date__ = '2015-03-17'
import os
import sys
import sh
import logging
reload(sys)
logger_basename = 'ScpWrapper'
logger = logging.getLogger(logger_basename)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# create formatter
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(log_format)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
logger.setLevel(logging.INFO)
class ScpWrapper(object):
def __init__(self, user_name, server, key):
# open stdout in unbuffered mode
sys.stdout = os.fdopen(sys.stdout.fileno(), 'wb', 0)
self.user_name = user_name
self.server = server
self.key = self._validate_key(key)
self._aggregated = ''
self._authenticate()
self._scp = sh.scp.bake('-i', self.key,
'-o', 'StrictHostKeyChecking=no')
self._connection = '{user}@{target}'.format(user=self.user_name,
target=self.server)
@staticmethod
def _validate_key(key):
try:
_ = open(key).read()
except IOError:
message = 'Unable to read key file. Cannot continue...'
logger.error(message)
raise SystemExit(message)
return key
def _check_login(self, char, stdin, process):
self._aggregated += char
if self._aggregated.endswith(
'password: ') or self._aggregated.endswith("': "):
# reinstate the normal buffered mode for stdout
reload(sys)
process.kill()
def _authenticate(self):
try:
logger.info('Checking key access')
_ = sh.ssh('-o', 'IdentitiesOnly=yes',
'-o', 'IdentityFile={key}'.format(key=self.key),
'{user}@{server}'.format(user=self.user_name,
server=self.server),
'ls',
_out=self._check_login,
_out_bufsize=0,
_tty_in=True)
# reinstate the normal buffered mode for stdout
except sh.SignalException_9:
logger.error('Failed!')
message = "User doesn't have key access to the server."
logger.error(message)
raise SystemExit(message)
except sh.ErrorReturnCode_255:
message = 'Could not connect to ssh server :{server}.'.format(
server=self.server)
logger.exception(message)
raise SystemExit(message)
return True
def copy(self, source, destination):
destination = destination[:-1] if destination.endswith('/') else destination
try:
destination = '{connection}:{destination}'.format(connection=self._connection,
destination=destination)
logger.info('Trying to copy {source} to {destination}'.format(source=source,
destination=destination))
self._scp(source, destination)
logger.info('Done')
result = True
except Exception:
message = 'Failed to copy {source} to {destination}.'.format(
source=source,
destination=destination)
logger.exception(message)
result = False
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment