Last active
March 17, 2024 15:50
-
-
Save damp11113/610c516f7236752366d320b78b2ec618 to your computer and use it in GitHub Desktop.
V1.5 simple python ssh server with paramiko. Full version is here https://github.com/damp11113/PyserSSH
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import paramiko | |
import socket | |
import sys | |
import threading | |
from damp11113 import pyversion, osversion, cmd, list2str | |
import pyfiglet | |
CWD = os.path.dirname(os.path.realpath(__file__)) | |
HOSTKEY = paramiko.RSAKey(filename=os.path.join(CWD, 'private_key.pem')) | |
os_version = osversion(cuser=True) | |
system_banner = ( | |
f"\033[36mPyserSSH V1.5 \033[0m\n" | |
f"\033[33m!!Warning!! This is Testing Version of PyserSSH \033[0m\n" | |
#f"\033[35mUse '=' as spacer \033[0m\n" | |
f"\033[35mUse Putty for best experience \033[0m\n" | |
) | |
print(system_banner) | |
class Server(paramiko.ServerInterface): | |
def __init__(self): | |
self.event = threading.Event() | |
self.current_user = None # Initialize a variable to store the current user | |
def check_channel_request(self, kind, chanid): | |
if kind == 'session': | |
return paramiko.OPEN_SUCCEEDED | |
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED | |
def check_auth_password(self, username, password): | |
if (username == 'admin') and (password == 'test'): | |
self.current_user = username # Store the current user upon successful authentication | |
return paramiko.AUTH_SUCCESSFUL | |
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes): | |
return True | |
def check_channel_shell_request(self, channel): | |
return True | |
server = '0.0.0.0' | |
ssh_port = 2222 | |
history = [] | |
def replace_enter_with_crlf(input_string): | |
if '\n' in input_string: | |
input_string = input_string.replace('\n', '\r\n') | |
return input_string | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((server, ssh_port)) | |
while True: | |
try: | |
try: | |
sock.listen(100) | |
print("Listening for connections...") | |
client, addr = sock.accept() | |
except Exception as e: | |
print('Listen failed: ' + str(e)) | |
else: | |
print(f"Accepted connection from {addr[0]}:{addr[1]}") | |
bhSession = paramiko.Transport(client) | |
bhSession.add_server_key(HOSTKEY) | |
bhSession.default_window_size = 2147483647 | |
bhSession.packetizer.REKEY_BYTES = pow(2, 40) | |
bhSession.packetizer.REKEY_PACKETS = pow(2, 40) | |
server = Server() | |
bhSession.start_server(server=server) | |
chan = bhSession.accept(20) | |
if chan is None: | |
print('*** No channel.') | |
chan.close() | |
print('[+] Authenticated!') | |
welcome_banner = ( | |
f"Welcome {server.current_user} to {osversion(fullversion=True)} \033[0m\n\n" | |
f"{pyfiglet.figlet_format('DPCloudev', font='usaflag', width=100)} \033[0m" | |
) | |
chan.sendall(replace_enter_with_crlf(system_banner + welcome_banner)) | |
chan.send("\r\n") | |
prompt = f'{server.current_user}@{os_version}> ' # Create the shell prompt | |
chan.send(replace_enter_with_crlf(prompt).encode('utf-8')) # Send the prompt to the client | |
typlist = [] | |
currenthistory = 0 | |
try: | |
while True: | |
command = chan.recv(8192).decode('utf-8', errors='replace') | |
if not command: | |
break | |
else: | |
print(f'{typlist}') | |
#if command.strip() == "\x1b[A": | |
# try: | |
# chan.send("\n" + history[currenthistory]) | |
# currenthistory += 1 | |
# except: | |
# pass | |
if command.strip() == '\x7f' or command.strip() == '\x08': | |
if typlist == []: | |
chan.send("") | |
else: | |
typlist = typlist[:-1] | |
chan.send(command.strip()) | |
else: | |
typlist.append(command) | |
chan.send(f'{command}') | |
print(f'{command}') | |
print(f'{typlist}') | |
if command == '\r': | |
# Join the commands in typlist into a single command | |
print(typlist[:-1]) | |
full_command = ''.join(typlist[:-1]) | |
typlist = [] # Clear typlist for the next set of commands | |
# Here, you can execute the full_command and send the output back to the client. | |
# For simplicity, we're just echoing the full_command back to the client. | |
#chan.send(replace_enter_with_crlf(f'\nYou executed: {full_command}')) | |
print(full_command) | |
# system command | |
if not full_command == '': | |
if full_command == "exit": | |
chan.send(replace_enter_with_crlf("\nLogout")) | |
chan.close() | |
break | |
elif full_command == "about": | |
about_messsage = ( | |
"PyserSSH (Beta)\n" | |
"Version 1.5\n" | |
"Author: damp11113\n" | |
f"Runtime: Python {pyversion(fullversion=True)}" | |
) | |
chan.send(replace_enter_with_crlf("\n" + about_messsage)) | |
elif full_command.startswith("history"): | |
if full_command.endswith(" clear"): | |
history.clear() | |
chan.send(replace_enter_with_crlf("\nHistory Cleaned")) | |
else: | |
chan.send(replace_enter_with_crlf("\n" + list2str(history))) | |
elif full_command.startswith("cmd ") or full_command.startswith("cmd"): | |
if full_command.startswith("cmd "): | |
cmdcommand = full_command.split("cmd ")[1] | |
process = subprocess.Popen(cmdcommand, shell=True, stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, universal_newlines=True) | |
for line in process.stdout: | |
try: | |
chan.send(replace_enter_with_crlf(f"\n" + line.strip())) | |
except Exception as e: | |
print(f"Error sending message: {str(e)}") | |
else: | |
chan.send(replace_enter_with_crlf("\n for use cmd 'cmd <command args>'")) | |
process.stdout.close() | |
process.stderr.close() | |
process.wait() | |
else: # unknown command | |
chan.send(replace_enter_with_crlf(f"\nCommand '{full_command}' not found")) | |
history.append(full_command) | |
chan.send("\r\n") | |
prompt = f'{server.current_user}@{os_version}> ' # Create the shell prompt | |
chan.send(replace_enter_with_crlf(prompt).encode('utf-8')) # Send the prompt to the client | |
# Check if the user is disconnected | |
if chan.closed: | |
print('User disconnected') | |
break | |
except KeyboardInterrupt: | |
chan.close() | |
bhSession.close() | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment