Skip to content

Instantly share code, notes, and snippets.

@Humoud
Created June 25, 2022 08:43
Show Gist options
  • Save Humoud/47ba78def91dd66c8eb2bbfd02b0b31b to your computer and use it in GitHub Desktop.
Save Humoud/47ba78def91dd66c8eb2bbfd02b0b31b to your computer and use it in GitHub Desktop.
think it works
import json
from bson import json_util
from bson.objectid import ObjectId
import pymongo
class MongoDBHandler():
"""
Binlex MongoDB Handler
"""
def __init__(self):
self.mongodb = pymongo.MongoClient()
self.cursor = self.mongodb['binlex']
@staticmethod
def jsonify(data):
return json.loads(json.dumps(data, default=json_util.default))
def query(self, query):
docs = self.cursor.find()
results = []
for doc in docs:
results.append(self.jsonify(doc))
return results
def query_doc_id(self, collection, id):
cursor = self.cursor[collection]
result = cursor.find_one({'_id': ObjectId(id)})
return self.jsonify(result)
def stats_collection_count(self, collection):
cursor = self.cursor[collection]
count = cursor.count_documents({})
return count
def upsert_trait(self, data, collection):
cursor = self.cursor[collection]
trait_id = cursor.update_one(
filter={
'bytes_sha256': data['bytes_sha256']
},
update={
"$set": data
},
upsert=True
).upserted_id
if trait_id is None:
trait_id = cursor.find_one({
'bytes_sha256': data['bytes_sha256']
})['_id']
return trait_id
def upsert_file_trait(self, data, trait_id):
cursor = self.cursor['files']
files_id = cursor.update_one(
filter={
'collection': data['collection'],
'mode': data['mode'],
'sha256': data['sha256'],
'trait_id': trait_id
},
update={
"$set": data
},
upsert=True
).upserted_id
return files_id
class MongoQuery():
def __init__(self):
self.mongodb = MongoDBHandler()
def process(self, body, sha256):
data = json.loads(body)
corpus = data['corpus']
file_data = {
'corpus': data['corpus'],
'offset': data['offset'],
'sha256': sha256,
'mode': data['mode']
}
del data['corpus']
del data['offset']
del data['mode']
if corpus.startswith('default'):
trait_id = self.mongodb.upsert_trait(data, 'default')
# print('[*] default collection trait_id ' + str(trait_id))
file_data['trait_id'] = trait_id
file_data['collection'] = 'default'
files_id = self.mongodb.upsert_file_trait(file_data, trait_id)
# if files_id is None:
# print('default files trait already exists')
# else:
# print('default collection inserted files_id ' + str(files_id))
elif corpus.startswith('malware') is True:
trait_id = self.mongodb.upsert_trait(data, 'malware')
# print('[*] malware collection trait_id ' + str(trait_id))
file_data['trait_id'] = trait_id
file_data['collection'] = 'malware'
files_id = self.mongodb.upsert_file_trait(file_data, trait_id)
# if files_id is None:
# print('malware files trait already exists')
# else:
# print('malware collection inserted files_id ' + str(files_id))
elif corpus.startswith('goodware') is True:
trait_id = self.mongodb.upsert_trait(data, 'goodware')
# print('[*] goodware collection trait_id ' + str(trait_id))
file_data['trait_id'] = trait_id
file_data['collection'] = 'goodware'
files_id = self.mongodb.upsert_file_trait(file_data, trait_id)
# if files_id is None:
# print('goodware files trait already exists')
# else:
# print('goodware collection inserted files_id ' + str(files_id))
# else:
# print('message contained invalid corpus')
from MongoDBHandler import MongoQuery
import multiprocessing
from hashlib import sha256
import pybinlex
import json
PROCESSES = 3
def send_to_binlex(sample):
mongodb = MongoQuery()
pe = pybinlex.PE()
print(f'Processing: {sample}')
result = pe.read_file(sample)
if result:
# hash sample
f = open(sample, 'rb')
file_hash = sha256(f.read()).hexdigest()
f.close()
# generate traits
disassembler = pybinlex.Disassembler(pe)
disassembler.set_threads(4)
disassembler.set_corpus('malware')
disassembler.set_tags(['family'])
disassembler.disassemble()
traits = disassembler.get_traits()
# save traits to DB
for t in traits:
obj = json.dumps(t, indent=2)
mongodb.process(obj, file_hash)
return True
else:
return False
def main():
samples = ['paths','here']
with multiprocessing.Pool(PROCESSES) as pool:
# Process samples
results = pool.imap_unordered(send_to_binlex, samples)
print('Results:')
for r in results:
print(r)
print()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment