Skip to content

Instantly share code, notes, and snippets.

@PMaynard
Last active August 29, 2015 14:19
Show Gist options
  • Save PMaynard/102a80bc0905241a0c75 to your computer and use it in GitHub Desktop.
Save PMaynard/102a80bc0905241a0c75 to your computer and use it in GitHub Desktop.
PDML to CSV
from lxml import etree
import csv, argparse
def readData(data_file, listop=False):
if listop:
data = []
else:
data = set()
with open(data_file, 'rb') as f:
reader = csv.reader(f)
try:
for row in reader:
if listop:
data.append(row)
else:
data.add(row[0])
except csv.Error as e:
sys.exit('file %s, line %d: %s' % (data_file, reader.line_num, e))
return data
def writeData(data_file, data):
with open(data_file, 'wb') as f:
writer = csv.writer(f)
for a in list(data):
writer.writerow([a])
def main(pdml_file, output, features):
# Convert pcap to pdml:
# -> tshark -r 2015-03-16-MITM.pcap -T pdml "104apci or 104asdu" > 2015-03-16-MITM.pdml
tree = etree.parse(pdml_file)
# Find all packets which contain a 104APCI field.
packets = [e for e in tree.xpath('/pdml/packet[proto[@name="104apci"]]')]
macaddresses = readData("tmp/mac.tmp")
ipaddress = readData("tmp/ip.tmp")
oneOfour = readData("tmp/104.tmp")
converting = [ [macaddresses, "tmp/mac.tmp"], [ipaddress, "tmp/ip.tmp"], [oneOfour, "tmp/104.tmp" ] ]
with open(output, 'wb') as f:
writer = csv.writer(f)
line = []
parameters = readData(features, True)
for p in parameters:
if len(p) == 4:
if p[3] == 'macaddresses':
p[3] = macaddresses
if p[3] == 'ipaddress':
p[3] = ipaddress
if p[3] == 'oneOfour':
p[3] = oneOfour
header = [p[0] for p in parameters]
header.append("class")
writer.writerow(header)
for layers in packets:
for p in parameters:
found = False
for layer in layers:
for field in layer:
# found = findField(field, line, p)
if field.get(p[1]) == p[0]:
# Check if it needs to be converted.
try:
convert(field, p[3], line, p )
except:
line.append(field.get(p[2]))
found = True
elif len(field) > 1:
for subfield in field:
if subfield.get(p[1]) == p[0]:
line.append(subfield.get(p[2]))
found = True
elif len(subfield) > 1:
for subsubfield in subfield:
if subsubfield.get(p[1]) == p[0]:
line.append(subsubfield.get(p[2]))
found = True
if not found:
line.append("?")
line.append("in_db")
writer.writerow(line)
line = []
if macaddresses.difference(readData("tmp/mac.tmp")):
writeData("tmp/mac.tmp", macaddresses)
if ipaddress.difference(readData("tmp/ip.tmp")):
writeData("tmp/ip.tmp", ipaddress)
if oneOfour.difference(readData("tmp/104.tmp")):
writeData("tmp/104.tmp", oneOfour)
# def findField(field, line, p):
# if field.get(p[1]) == p[0]:
# # print field.get(p[1]), p[0]
# # try:
# # convert(field, p[3], line, p)
# # except:
# line.append(field.get(p[2]))
# return True
# # elif len(field) > 1:
# # for f in field:
# # if findField(f, line, p):
# # return True
# return False
def convert(field, hashstore, line, p):
found = False
for i, mac in enumerate(hashstore):
if mac == field.get(p[2]):
line.append(i)
found = True
if not found:
hashstore.add(field.get(p[2]))
line.append(len(hashstore))
found = True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='PDML to CSV')
parser.add_argument('-p', '--pdml', dest='pdml_file', action='store',
help='PDML File to convert.', required=True)
parser.add_argument('-w', '--output', dest='output', action='store',
help='PDML File to convert.', required=True)
parser.add_argument('-f', '--features', dest='features', action='store',
help='CSV list of features.', required=True)
args = parser.parse_args()
main(args.pdml_file, args.output, args.features)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment