Instantly share code, notes, and snippets.

@tommyskg /.README.md
Last active Sep 13, 2017

Embed
What would you like to do?
A mtgx2rexster script migrating all commonly used attributes of Maltego graphml-files.
#!/usr/bin/env python
__author__ = "Tommy Skaug"
__license__ = "Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)"
__version__ = "0.1.1"
__status__ = "Development"
#### MALTEGO-TITAN
# A Maltego to Titan, the graph database, conversion script
# Takes a Maltego GraphML file as input, parses it with lxml and
# extracts the essential parts. Still a little rough around the edges
# but does the job.
###
from rexpro import RexProConnection
import lxml.etree as et
import sys
# Various GraphML elements, incl. those used in Maltego
graphml = {
"graph": "{http://graphml.graphdrawing.org/xmlns}graph",
"vertice": "{http://graphml.graphdrawing.org/xmlns}node",
"edge": "{http://graphml.graphdrawing.org/xmlns}edge",
"data": "{http://graphml.graphdrawing.org/xmlns}data",
"properties": "{http://maltego.paterva.com/xml/mtgx}Properties",
"property": "{http://maltego.paterva.com/xml/mtgx}Property",
"value": "{http://maltego.paterva.com/xml/mtgx}Value",
"properties2": "{http://maltego.paterva.com/xml/mtgx}Properties2",
"notes": "{http://maltego.paterva.com/xml/mtgx}Notes",
"bookmarks": "{http://maltego.paterva.com/xml/mtgx}Bookmarks",
"entitylist": "{http://maltego.paterva.com/xml/mtgx}MaltegoEntityList",
"label": "{http://graphml.graphdrawing.org/xmlns}data[@key='label']",
}
def getVertices(elements):
""" Takes mtg:MaltegoEntity XML elements as input, returns a list of dicts """
vertice_list=[]
for e in elements:
attribs = {}
attribs['value']={}
vertice=dict(e.getparent().getparent().items())
attribs['vertice_id']=vertice['id']
items=dict(e.items())
attribs['entity_type']=items['type']
attribs['entity_id']=items['id']
properties = e.findall(graphml.get('properties'))
properties2 = e.findall(graphml.get('properties2'))
notes = e.findall(graphml.get('notes'))
bookmarks = e.findall(graphml.get('bookmarks'))
attribs['notes']=notes
for p in properties:
for data in p.findall(graphml.get('property')):
items=dict(data.items())
if items['name']=="ipaddress.internal" or items['name']=="whois-info": continue
for d in data.findall(graphml.get('value')):
attribs['value'][items['name'].replace("properties.","")]=d.text
attribs['displayName']=items['displayName']
attribs['hidden']=items['hidden']
attribs['name']=items['name'].replace("properties.","")
attribs['nullable']=items['nullable']
attribs['readonly']=items['readonly']
attribs['type']=items['type']
vertice_list.append(attribs)
return vertice_list
def getEdges(elements):
""" Takes mtg:MaltegoLink XML elements as input, returns a list of dicts """
edge_list=[]
for e in elements:
attribs={}
attribs.update(dict(e.getparent().getparent().items()))
properties = e.findall(graphml.get('properties'))
for p in properties:
for data in p.findall(graphml.get('property')):
items=dict(data.items())
attribs['link_type']=items['name']
if items['displayName']!="Label": continue
for d in data.findall(graphml.get('value')):
attribs['label']=d.text
edge_list.append(attribs)
return edge_list
def check_existing_edge(graph,vid1,vid2,label):
""" Checks if an edge exists between two vertices. """
try:
check_existing = graph.execute("""
def v1 = g.v(vid1)
def v2 = g.v(vid2)
return v1.both(label).filter{it == v2}.count()
""", {'vid1':vid1, 'vid2':vid2, 'label':label})
if int(check_existing)>0:
return True
else:
return False
except:
return False
def check_existing_vertice(graph,maltegoId):
""" Checks if there is an existing vertice with a given maltegoId property. Note, this is really slow and disabled by default"""
check_existing = graph.execute("""
return g.V('maltegoId',maltegoId).count()
""", {'maltegoId':maltegoId})
if int(check_existing)>0:
return True
else:
return False
def add_edge(graph,vid1,vid2,label):
""" Adds and commits an edge between two given vertices with a specified label """
try:
edge = graph.execute("""
def v1 = g.v(vid1)
def v2 = g.v(vid2)
g.addEdge(v1, v2, label)
g.commit()
""", {'vid1':vid1, 'vid2':vid2, 'label':label})
return True
except:
pass
return False
def titanRegisterEdges(graph,edges,vertice_list,skipCheck=True):
""" takes a list of edges, a list of mappings between vertices and graph IDs for efficient binding """
for e in edges:
source,target,label=vertice_list[e['source']],vertice_list[e['target']],e['label']
if not check_existing_edge(graph,source,target,label):
add_edge(graph,source,target,label)
print label
def titanRegisterVertices(graph,vertices,skipCheck=True):
""" takes a list of vertices as input, registers them in Titan """
vertice_list={}
for v in vertices:
vals=[]
for key in v['value']:
if v['value'][key]:
vals.append(u"'%s':'%s'"%(key,v['value'][key].replace("\\","\\\\")))
if skipCheck==False:
if check_existing_vertice(graph,v['vertice_id']): continue
vertice = graph.execute("""
def v1 = g.addVertex([
name:name,
displayName:displayName,
maltegoId:maltego_id,
type:type,
%s])
return v1
"""%','.join(vals), {
'name':"%s"%v['name'],
'displayName':"%s"%v['displayName'],
'maltego_id':"%s"%v['vertice_id'],
'notes':"%s"%v['notes'],
'type':"%s"%v['entity_type'].replace("maltego.","")})
vertice_list.update({v['vertice_id']:vertice['_id']})
return vertice_list
def main():
# By default the Maltego extraction dir layout
document="Graphs/Graph1.graphml"
# The Maltego XML namespace
nsmap = {'mtg': 'http://maltego.paterva.com/xml/mtgx'}
# Get the GraphML XML tree from file and parse it
doc = et.parse(document)
# carve out all nodes, here named vertices
vertices = getVertices(doc.xpath('//mtg:MaltegoEntity', namespaces=nsmap))
# get all relations between the vertices
edges = getEdges(doc.xpath('//mtg:MaltegoLink', namespaces=nsmap))
graph = RexProConnection('localhost',8184,'titan')
# feed the vertices dict to Titan, get the created IDs
vertice_list=titanRegisterVertices(graph,vertices)
# "draw" the edges between the vertices, use the vertice_list hashmap for efficiency
titanRegisterEdges(graph,edges,vertice_list)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment