Created
September 6, 2019 05:30
-
-
Save tranductam2802/e061893a3714619311343c3c36a4b41b to your computer and use it in GitHub Desktop.
Funny SSH client has written by Python for learning paramiko library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from termcolor import * | |
from sys import stdin | |
import colorama | |
import paramiko as ssh | |
import sys | |
import time | |
import os | |
HOST = '' | |
USER = '' | |
RSA_KEY_LOCATION = '' | |
TIMEOUT = 300 | |
READ_TIMEOUT = 2 | |
class AllowAllKeys(ssh.MissingHostKeyPolicy): | |
def missing_host_key(self, client, hostname, key): | |
return | |
def display(line): | |
colorama.init() | |
print(line, end = '', flush=True) | |
def error(line): | |
display(line) | |
sys.exit(0) | |
def prepare(client): | |
channel = client.invoke_shell() | |
ssh_stdin = channel.makefile('wb') | |
ssh_stdout = channel.makefile('rb') | |
return ssh_stdin, ssh_stdout | |
if __name__ == "__main__": | |
# Connection settings | |
client = ssh.SSHClient() | |
client.load_system_host_keys() | |
client.load_host_keys(os.path.expanduser('~/.ssh/known_hosts')) | |
client.set_missing_host_key_policy(AllowAllKeys()) | |
rsa_key = ssh.RSAKey.from_private_key_file(RSA_KEY_LOCATION) | |
# Connectings | |
try: | |
client.connect(hostname=HOST, port=22, username=USER, pkey=rsa_key, timeout=TIMEOUT) | |
except (ssh.SSHException, ssh.AuthenticationException): | |
error('Authentication failed for host: {0}'.format(HOST)) | |
except ssh.NoValidConnectionsError: | |
error('Host "{0}" not found'.format(HOST)) | |
except TimeoutError: | |
error('Host range "{0}" not found'.format(HOST)) | |
# Prepare pipeline | |
ssh_stdin, ssh_stdout = prepare(client) | |
command = None | |
try: | |
# Applciation start | |
while True: | |
line_buffer = '' | |
while not ssh_stdout.channel.exit_status_ready(): | |
# Timeout settings | |
end = time.time() + READ_TIMEOUT | |
while len(ssh_stdout.channel.in_buffer) == 0: | |
if time.time() > end: | |
error('Connection timeout') | |
# Read data one by one | |
binary = ssh_stdout.read(1) | |
# Convert binary data to UTF-8 | |
line_buffer += binary.decode("utf-8") | |
# Process response data | |
if line_buffer.endswith('\n'): | |
# Message always end by '\r\n' or '\n' | |
raw_line = line_buffer.rstrip() | |
if raw_line == 'logout': | |
# Logout message always close channels | |
raise Exception('Logout') | |
elif command is None or command.rstrip() != raw_line: | |
# Command data also input to log | |
display(line_buffer) | |
line_buffer = '' | |
elif line_buffer.endswith('$ '): | |
# Command ready message alway end by '$ ' | |
if command is not None and command.startswith('cd'): | |
# cd command's reponse message had the same command ready message | |
command = '\n' | |
# Write one more dummy command for update new one | |
ssh_stdin.write('{}'.format(command)) | |
line_buffer = '' | |
else: | |
# Display command ready message | |
display(line_buffer) | |
break | |
command = stdin.readline() | |
ssh_stdin.write('{}'.format(command)) | |
except Exception as e: | |
# Catch message from logout feature | |
print('Bye!', end = '', flush=True) | |
finally: | |
# Always close connection in the end | |
ssh_stdout.close() | |
ssh_stdin.close() | |
client.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment