Skip to content

Instantly share code, notes, and snippets.

@LoadLow
Last active October 21, 2019 21:13
Show Gist options
  • Save LoadLow/32911addf2ac5e5f6a9a723cebd1d0fe to your computer and use it in GitHub Desktop.
Save LoadLow/32911addf2ac5e5f6a9a723cebd1d0fe to your computer and use it in GitHub Desktop.
ECW Prequals - Challenge "Puzzle"
#!/usr/bin/env python
"""
Modbus Message Parser
--------------------------------------------------------------------------
"""
from __future__ import print_function
import codecs as c
import types
from pymodbus.compat import IS_PYTHON3
from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.transaction import ModbusSocketFramer
class Decoder(object):
def __init__(self, framer, encode=False):
""" Initialize a new instance of the decoder
:param framer: The framer to use
:param encode: If the message needs to be encoded
"""
self.framer = framer
self.encode = encode
self.registers = bytearray(0xffff)
def decode(self, message):
""" Attempt to decode the supplied message
:param message: The messge to decode
"""
if IS_PYTHON3:
value = message if self.encode else c.encode(message, 'hex_codec')
else:
value = message if self.encode else message.encode('hex')
decoders = [
self.framer(ServerDecoder(), client=None),
self.framer(ClientDecoder(), client=None)
]
for decoder in decoders:
try:
decoder.addToFrame(message)
if decoder.checkFrame():
unit = decoder._header.get("uid", 0x00)
decoder.advanceFrame()
decoder.processIncomingPacket(message, self.report, unit)
except:
pass
def check_errors(self, decoder, message):
""" Attempt to find message errors
:param message: The message to find errors in
"""
def report(self, message):
""" The callback to print the message information
:param message: The message to print
"""
if message.__class__.__name__ == "WriteSingleRegisterRequest":
self.registers[message.address] = message.value
def get_messages(option):
""" A helper method to generate the messages to parse
:param options: The option manager
:returns: The message iterator to parse
"""
if option.message:
if option.transaction:
msg = ""
for segment in option.message.split():
segment = segment.replace("0x", "")
segment = "0" + segment if len(segment) == 1 else segment
msg = msg + segment
option.message = msg
if not option.ascii:
if not IS_PYTHON3:
option.message = option.message.decode('hex')
else:
option.message = c.decode(option.message.encode(), 'hex_codec')
yield option.message
elif option.file:
with open(option.file, "r") as handle:
for line in handle:
if line.startswith('#'): continue
if not option.ascii:
line = line.strip()
line = c.decode(line.encode(), 'hex_codec')
yield line
def main():
""" The main runner function
"""
infile = "samples/modbus_messages"
outfile = "samples/modbus_memdump"
option = types.SimpleNamespace()
option.ascii = False
option.debug = False
option.message = ""
option.file = infile
decoder = Decoder(ModbusSocketFramer, False)
for message in get_messages(option):
decoder.decode(message)
newfile = open(outfile, 'wb')
newfile.write(decoder.registers)
newfile.close()
print("Done ! Go check mem dump now : < " + outfile+" >")
if __name__ == "__main__":
main()
$ strings samples/result_modbus
<FLAG>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment