Last active
October 24, 2020 04:16
-
-
Save r00tten/ad77fc7da06455f6684c54b6d54365fa to your computer and use it in GitHub Desktop.
To extract Powershell command and CC details from Emotet Microsoft Office Word Documents. It is using oledump.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re | |
import sys | |
import subprocess | |
import glob | |
import base64 | |
import yaml | |
def dumpYaml(data): | |
if data == [] or len(data) == 0: | |
return False | |
res = [] | |
for i in data: | |
res.append({'sample':i}) | |
try: | |
with open('powExtract.yaml', 'w') as file: | |
yaml.safe_dump(res, stream=file) | |
except Exception: | |
return False | |
return True | |
def findBigger(arr): | |
if arr == []: | |
return False | |
i = 0 | |
big = 0 | |
for i in range(len(arr)): | |
if i == 0: | |
big = i | |
elif int(arr[i]['size']) > int(arr[big]['size']): | |
big = i | |
return big | |
def commandEx(cmdStr): | |
if cmdStr == '': | |
return False | |
try: | |
res = subprocess.Popen(cmdStr, stdout=subprocess.PIPE).stdout.read() | |
res = res.decode('utf-8') | |
except UnicodeDecodeError: | |
res = res | |
except Exception as e: | |
print(e) | |
exit() | |
return res | |
def psSearch(fStr): | |
if fStr == "" or fStr == None: | |
return [] | |
try: | |
with open(fStr, 'r', errors='ignore') as file: | |
data = file.read() | |
data = data.replace('\00', '') | |
except: | |
return [] | |
start = -1 | |
delimiter = "" | |
payload = "powershell -e" | |
compPower = re.compile(r'powershell', flags=re.IGNORECASE) | |
arrLoc = [] | |
res = [] | |
for i in range(len(payload) - 1): | |
compP = re.compile(payload[i], flags=re.IGNORECASE) | |
for j in compP.finditer(data): | |
nextP = re.compile(payload[i + 1], flags=re.IGNORECASE) | |
for k in nextP.finditer(data[j.start() + 1:j.start() + 500]): | |
if j.start() == k.start() - 1: | |
continue | |
delimiter = data[j.start() + 1:k.start() + j.start() + 1] | |
tmp = data[j.start() - 100:j.start() + 500] | |
tmp = tmp.replace(delimiter, '') | |
if re.search(r'powershell -e', tmp, flags=re.IGNORECASE): | |
start = j.start() - 25 | |
break | |
if start != -1: | |
break | |
if start != -1: | |
break | |
data = data[start:] | |
compD = re.compile(re.escape(delimiter)) | |
pre = 0 | |
for i in compD.finditer(data): | |
if i.start() < pre + 50: | |
pre = i.start() | |
else: | |
break | |
data = data[:pre + 50] | |
data = data.replace(delimiter, '') | |
data = re.sub(r'( )+', ' ', data) | |
for i in compPower.finditer(data): | |
arrLoc.append(i.start()) | |
for i in range(0, len(arrLoc)): | |
if i < len(arrLoc) - 1: | |
res.append(data[arrLoc[i]:arrLoc[i+1] + 1]) | |
else: | |
res.append(data[arrLoc[i]:]) | |
return res | |
def streamCleaner(data): | |
if data == '': | |
return False | |
res = "" | |
payload = "powershell -e" | |
delimiter = "" | |
for i in range(len(payload) - 1): | |
compP = re.compile(payload[i], flags=re.IGNORECASE) | |
for j in compP.finditer(data): | |
nextP = re.search(payload[i + 1], data[j.start() + 1:], flags=re.IGNORECASE) | |
if not nextP or j.start() == nextP.start() - 1: | |
continue | |
delimiter = data[j.start() + 1:nextP.start() + j.start() + 1] | |
tmp = data.replace(delimiter, '') | |
if re.search(r'powershell -e', tmp, flags=re.IGNORECASE): | |
res = tmp | |
break | |
if res != "": | |
break | |
return res | |
def psCleaner(data): | |
if data == "": | |
return False | |
data = re.search(r'([a-zA-Z]{10} -(e|E|encod) )([a-zA-Z0-9- =/+]+)', data).group() | |
data = re.sub(r' +', ' ', data) | |
r_base64 = data.split(' ')[2] | |
while len(r_base64) > 0: | |
try: | |
r_baseDecoded = base64.b64decode(r_base64) | |
psBase64 = r_base64 | |
break | |
except Exception: | |
r_base64 = r_base64[:len(r_base64) - 1] | |
r_baseDecoded = r_baseDecoded.decode('ascii', 'ignore') | |
r_baseDecoded = r_baseDecoded.replace('\x00', '') | |
arrDecoded = r_baseDecoded.split(';') | |
str_CC = "" | |
for j in range(len(arrDecoded)): | |
tmpCC = re.search(r'\[char\]42', (re.sub(r'[\'\"\`\+]', '', arrDecoded[j]))) | |
if tmpCC != False and tmpCC != None: | |
str_CC = arrDecoded[j] | |
break | |
if str_CC == "": | |
return False | |
str_CC = re.search(r'(?<=[=]{1}).+?(?=(\.[splitSPLIT\"\'\`]{5,10}))', str_CC).group() | |
if re.search(r'\'\+\'|\'\+\|\+\'', str_CC): | |
str_CC = eval(str_CC) | |
str_CC = re.sub(r'[\'\"]', '', str_CC) | |
str_CC = re.sub(r'http', 'hxxp', str_CC) | |
arr_CC = str_CC.split('*') | |
return {'cmd':data, 'power':psBase64, 'cc':arr_CC} | |
def viaOledump(fStr): | |
ps_cleaned = [] | |
cmd = ["python3", "oledump.py", r"{}".format(fStr)] | |
ret = commandEx(cmd) | |
if ret == False: | |
return [] | |
streams = [] | |
for j in ret.split('\n'): | |
if re.search(r"((Macros/).+?(/i[0-9]{2})?/(f|o))", j): | |
streams.append(j) | |
if streams == [] or len(streams) == 0: | |
return [] | |
mStreams = [] | |
for k in streams: | |
tmp = re.sub(r'[ ]+', ' ', k).split(' ') | |
for z in range(len(tmp)): | |
if re.search(r':', tmp[z]): | |
break | |
if re.search(r'm|M', tmp[z + 1]): | |
mStreams.append({'id':tmp[z].split(':')[0], 'size':tmp[z + 2]}) | |
else: | |
mStreams.append({'id':tmp[z].split(':')[0], 'size':tmp[z + 1]}) | |
bigger = findBigger(mStreams) | |
if bigger < 0: | |
return [] | |
cmd = ["python3", "oledump.py", r"{}".format(fStr), "-d", "-s", mStreams[bigger]['id']] | |
streamData = commandEx(cmd) | |
if streamData == False: | |
return [] | |
streamDataCleaned = streamCleaner(streamData.decode("utf-8", "ignore")) | |
if streamDataCleaned == "": | |
return [] | |
tmp = psCleaner(streamDataCleaned) | |
if tmp != False: | |
ps_cleaned.append(tmp) | |
return ps_cleaned | |
def viaHexdump(fStr): | |
ps_cleaned = [] | |
ps_searched = psSearch(fStr) | |
if ps_searched == []: | |
return [] | |
for j in ps_searched: | |
tmp = psCleaner(j) | |
if tmp != False: | |
ps_cleaned.append(tmp) | |
return ps_cleaned | |
def processer(arr): | |
if arr == []: | |
return False | |
res = [] | |
for i in range(len(arr)): | |
print(arr[i]) | |
ps = viaOledump(arr[i]) | |
if ps == []: | |
ps = viaHexdump(arr[i]) | |
preRes = [] | |
for k in ps: | |
preRes.append({'Powerhell_b64': k['power'], 'C&C': k['cc']}) | |
if preRes != []: | |
res.append({'file': arr[i], 'detail': preRes}) | |
return res | |
def listDir(dirStr): | |
if dirStr == '': | |
return False | |
dFiles = [] | |
# for i in glob.glob(r"{}/*.doc*".format(dirStr)): | |
for i in glob.glob(r"{}/*.img*".format(dirStr)): | |
dFiles.append(i) | |
return dFiles | |
def main(): | |
if len(sys.argv) <= 1: | |
print('[-] Please provide a folder path.') | |
path = sys.argv[1] | |
dFiles = listDir(path) | |
res = processer(dFiles) | |
dumpYaml(res) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment