Skip to content

Instantly share code, notes, and snippets.

@damp11113
Last active March 17, 2024 15:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save damp11113/610c516f7236752366d320b78b2ec618 to your computer and use it in GitHub Desktop.
Save damp11113/610c516f7236752366d320b78b2ec618 to your computer and use it in GitHub Desktop.
V1.5 simple python ssh server with paramiko. Full version is here https://github.com/damp11113/PyserSSH
import os
import subprocess
import paramiko
import socket
import sys
import threading
from damp11113 import pyversion, osversion, cmd, list2str
import pyfiglet
CWD = os.path.dirname(os.path.realpath(__file__))
HOSTKEY = paramiko.RSAKey(filename=os.path.join(CWD, 'private_key.pem'))
os_version = osversion(cuser=True)
system_banner = (
f"\033[36mPyserSSH V1.5 \033[0m\n"
f"\033[33m!!Warning!! This is Testing Version of PyserSSH \033[0m\n"
#f"\033[35mUse '=' as spacer \033[0m\n"
f"\033[35mUse Putty for best experience \033[0m\n"
)
print(system_banner)
class Server(paramiko.ServerInterface):
def __init__(self):
self.event = threading.Event()
self.current_user = None # Initialize a variable to store the current user
def check_channel_request(self, kind, chanid):
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
if (username == 'admin') and (password == 'test'):
self.current_user = username # Store the current user upon successful authentication
return paramiko.AUTH_SUCCESSFUL
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
return True
def check_channel_shell_request(self, channel):
return True
server = '0.0.0.0'
ssh_port = 2222
history = []
def replace_enter_with_crlf(input_string):
if '\n' in input_string:
input_string = input_string.replace('\n', '\r\n')
return input_string
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((server, ssh_port))
while True:
try:
try:
sock.listen(100)
print("Listening for connections...")
client, addr = sock.accept()
except Exception as e:
print('Listen failed: ' + str(e))
else:
print(f"Accepted connection from {addr[0]}:{addr[1]}")
bhSession = paramiko.Transport(client)
bhSession.add_server_key(HOSTKEY)
bhSession.default_window_size = 2147483647
bhSession.packetizer.REKEY_BYTES = pow(2, 40)
bhSession.packetizer.REKEY_PACKETS = pow(2, 40)
server = Server()
bhSession.start_server(server=server)
chan = bhSession.accept(20)
if chan is None:
print('*** No channel.')
chan.close()
print('[+] Authenticated!')
welcome_banner = (
f"Welcome {server.current_user} to {osversion(fullversion=True)} \033[0m\n\n"
f"{pyfiglet.figlet_format('DPCloudev', font='usaflag', width=100)} \033[0m"
)
chan.sendall(replace_enter_with_crlf(system_banner + welcome_banner))
chan.send("\r\n")
prompt = f'{server.current_user}@{os_version}> ' # Create the shell prompt
chan.send(replace_enter_with_crlf(prompt).encode('utf-8')) # Send the prompt to the client
typlist = []
currenthistory = 0
try:
while True:
command = chan.recv(8192).decode('utf-8', errors='replace')
if not command:
break
else:
print(f'{typlist}')
#if command.strip() == "\x1b[A":
# try:
# chan.send("\n" + history[currenthistory])
# currenthistory += 1
# except:
# pass
if command.strip() == '\x7f' or command.strip() == '\x08':
if typlist == []:
chan.send("")
else:
typlist = typlist[:-1]
chan.send(command.strip())
else:
typlist.append(command)
chan.send(f'{command}')
print(f'{command}')
print(f'{typlist}')
if command == '\r':
# Join the commands in typlist into a single command
print(typlist[:-1])
full_command = ''.join(typlist[:-1])
typlist = [] # Clear typlist for the next set of commands
# Here, you can execute the full_command and send the output back to the client.
# For simplicity, we're just echoing the full_command back to the client.
#chan.send(replace_enter_with_crlf(f'\nYou executed: {full_command}'))
print(full_command)
# system command
if not full_command == '':
if full_command == "exit":
chan.send(replace_enter_with_crlf("\nLogout"))
chan.close()
break
elif full_command == "about":
about_messsage = (
"PyserSSH (Beta)\n"
"Version 1.5\n"
"Author: damp11113\n"
f"Runtime: Python {pyversion(fullversion=True)}"
)
chan.send(replace_enter_with_crlf("\n" + about_messsage))
elif full_command.startswith("history"):
if full_command.endswith(" clear"):
history.clear()
chan.send(replace_enter_with_crlf("\nHistory Cleaned"))
else:
chan.send(replace_enter_with_crlf("\n" + list2str(history)))
elif full_command.startswith("cmd ") or full_command.startswith("cmd"):
if full_command.startswith("cmd "):
cmdcommand = full_command.split("cmd ")[1]
process = subprocess.Popen(cmdcommand, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
for line in process.stdout:
try:
chan.send(replace_enter_with_crlf(f"\n" + line.strip()))
except Exception as e:
print(f"Error sending message: {str(e)}")
else:
chan.send(replace_enter_with_crlf("\n for use cmd 'cmd <command args>'"))
process.stdout.close()
process.stderr.close()
process.wait()
else: # unknown command
chan.send(replace_enter_with_crlf(f"\nCommand '{full_command}' not found"))
history.append(full_command)
chan.send("\r\n")
prompt = f'{server.current_user}@{os_version}> ' # Create the shell prompt
chan.send(replace_enter_with_crlf(prompt).encode('utf-8')) # Send the prompt to the client
# Check if the user is disconnected
if chan.closed:
print('User disconnected')
break
except KeyboardInterrupt:
chan.close()
bhSession.close()
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment