Instantly share code, notes, and snippets.

Embed
What would you like to do?
Three classes demonstrating the interchangeability between the OpenIOC 1.1 library and the Rexster Graph API.
#!/usr/bin/env python
__author__ = "Tommy Skaug"
__license__ = "Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)"
__version__ = "1.0"
__status__ = "Complete"
from ioc_writer import ioc_api
from lxml import etree as et
from rexpro import RexProConnection
class Graph:
def __init__(self):
self.graph = RexProConnection('localhost',8184,'titan')
def addVertice(self,content,content_type,condition):
vertice_id = self.graph.execute("""
def v1 = g.addVertex([content:content,content_type:content_type,condition:condition])
return v1""",
{'content':content, 'content_type':content_type, 'condition':condition})
return vertice_id
def addEdge(self,vid1,vid2,label):
edge = self.graph.execute("""
def v1 = g.v(vid1)
def v2 = g.v(vid2)
g.addEdge(v1, v2, label)
g.commit()""",{'vid1':vid1['_id'], 'vid2':vid2['_id'], 'label':label})
#graph=Graph()
#v1=graph.addVertice('vg.no','domain','is')
#v2=graph.addVertice('195.88.55.16','ip','is')
#graph.addEdge(v1,v2,'and')
class IOC:
def __init__(self):
self.IOC = ioc_api.IOC(name='Test', description='A test IOC generated from a Python script', author='Someone')
self.IOC.set_created_date()
self.IOC.set_published_date()
self.IOC.set_lastmodified_date()
self.IOC.update_name('test')
self.IOC.update_description('A Test')
self.id = self.IOC.iocid
self.lastId=None
def addNode(self,label,text,type,indicator,condition='is',addToLast=False):
IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition,
label,
text,
type,
indicator)
if addToLast and self.last:
self.last.append(IndicatorItem_node)
else:
self.IOC.top_level_indicator.append(IndicatorItem_node)
current_guid = IndicatorItem_node.attrib['id']
print current_guid
self.last = IndicatorItem_node
def __str__(self):
self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True)
return self.xml
#ioc = IOC()
#ioc.addNode('test','Just a test','domain','vg.no')
#ioc.addNode('test','Just a test','ip','195.88.55.16',addToLast=True)
#print ioc
class RexsterIOC:
def __init__(self):
self.graph = RexProConnection('localhost',8184,'titan')
self.IOC = ioc_api.IOC(name='Test', description='A test IOC generated from a Python script', author='Someone')
self.IOC.set_created_date()
self.IOC.set_published_date()
self.IOC.set_lastmodified_date()
self.IOC.update_name('test')
self.IOC.update_description('A Test')
self.id = self.IOC.iocid
self.lastId=None
def addNode(self,label,text,type,indicator,condition='is',addToLast=False):
IndicatorItem_node = ioc_api.make_IndicatorItem_node(condition,
label,
text,
type,
indicator)
if addToLast and self.last:
self.last.append(IndicatorItem_node)
else:
self.IOC.top_level_indicator.append(IndicatorItem_node)
current_guid = IndicatorItem_node.attrib['id']
self.last = IndicatorItem_node
def traverse(self,rootNodeId):
root=self.graph.execute("""return g.v(vid)""",{'vid':str(rootNodeId)})
self.addNode('test','Just a test',
root['_properties']['content_type'],
root['_properties']['content'],
root['_properties']['condition'])
one_level_out=self.graph.execute("""return g.v(vid).out""",{'vid':str(rootNodeId)})
for vertex in one_level_out:
self.addNode('test','Just a test',
vertex['_properties']['content_type'],
vertex['_properties']['content'],
vertex['_properties']['condition'],addToLast=True)
def __str__(self):
self.xml = et.tostring(self.IOC.root, encoding='utf-8', xml_declaration=True, pretty_print=True)
return self.xml
#ioc = RexsterIOC()
#ioc.traverse(80284) # the root node
#print ioc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment