Skip to content

Instantly share code, notes, and snippets.

@itsecurityco
Last active November 12, 2021 05:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itsecurityco/9b0ed66dded62ec8b184ff6fe17d53ff to your computer and use it in GitHub Desktop.
Save itsecurityco/9b0ed66dded62ec8b184ff6fe17d53ff to your computer and use it in GitHub Desktop.
Main CTF Ekoparty 2021 - PLC
# import socket programming library
# @itsecurityco (Juan)
import socket
# import thread module
from _thread import *
door_closed = b"""
______________
|\ ___________ /|
| | _ _ _ _ | |
| | | | | | | | |
| | |-+-+-+-| | |
| | |-+-+=+%| | |
| | |_|_|_|_| | |
| | ___ | |
| | [___] ()| |
| | ||| |
| | ()| |
| | | |
| | | |
| | | |
|_|___________|_| The door is closed
"""
door_has_been_closed = b"""
______________
|\ ___________ /|
| | _ _ _ _ | |
| | | | | | | | |
| | |-+-+-+-| | |
| | |-+-+=+%| | |
| | |_|_|_|_| | |
| | ___ | |
| | [___] ()| |
| | ||| |
| | ()| |
| | | |
| | | |
| | | |
|_|___________|_| The door has been closed
"""
function_code_five_hint = b"You are close! read the protocol :)"
door_open = b"""
______________
|\ ___________ /|
| | /|,| | | |
| | |,x,| | | |
| | |,x,' | | |
| | |,x | | |
| | |/ | | |
| | /] | | |
| | [/ ()\ | |
| | | \,^%---
| | | <\/ \ The door has been open
| | |< | |
| | ,'< | |
| | ,' ^^|\ | |
|_|,'_____/__\|_| EKO{Write_Single_Coil_ON}
"""
# thread function
def threaded(c):
try:
c.settimeout(5)
# data received from client
data = c.recv(255)
if not data:
print('Bye')
c.close()
uid = data[6:7] # Unit Identifier
fn_code = data[7:8] # 05 (0x05) Write Single Coil
out_addr = data[8:10] # Output Address
out_val = data[10:12] # Output Value
mei = data[8:9] # MEI type: Read Device Identification (14)
print("""
uid: %s
fn_code: %s
out_addr: %s
out_val: %s
mei: %s
""" % (uid, fn_code, out_addr, out_val, mei))
if fn_code == b"\x01" and uid == b"\xff" and out_addr == b"\x00\x00" and out_val == b"\x00\x01":
# Function code 1 (Read coils)
# Se verifica si utiliza la funcion 1
# Se verifica que el uid (Unit Identifier/SlaveID) sea 255
# Se verifica si el out_addr (Starting Address a leer) es 0x0000
# Se verifica si el out_val (Quantity of coils a leer) es 1
#
# Los valores de UID, Starting Address y Quantity of coils los va sacar de la Funcion 43 (read device identificacion)... Ver el ultimo elif...
data = door_closed
elif fn_code == b"\x05" and out_val == b"\xff\x00":
# AQUI SE LE DA LA FLAG AL PARTICIPANTE
# Function code 5 (Write single coil)
# Se verifica si utiliza la funcion 5 y si envia el output value con valor ON (\xff\x00)
data = door_open
elif fn_code == b"\x05" and out_val == b"\x00\x00":
# Function code 5 (Write single coil)
# Aqui se verifica si utiliza la funcion 5 pero envia el output value con valor OFF (\x00\x00)
# Se le da un ascii cerrando (OFF) la puerta para que sepa que va por bien camino
data = door_has_been_closed
elif fn_code == b"\x05" and out_val == b"\x00\x01":
# Function code 5 (Write single coil)
# Aqui se verifica si utiliza la funcion 5 pero envia mal el output value
# Se le da una pista para que sepa que va por bien camino
data = function_code_five_hint
elif fn_code == b"\x05" and out_val == b"\xff\xff":
# Function code 5 (Write single coil)
# Aqui se verifica si utiliza la funcion 5 pero envia mal el output value
# Se le da una pista para que sepa que va por bien camino
data = function_code_five_hint
elif fn_code == b"\x2b" and mei == b"\x0e":
# Function code 43 (Read Device Identification) Hints via banner grabbing
# Hint sale con msf -> scanner/scada/modbus_banner_grabbing
# Object 1
obj_one_s = "NULL Life PLC"
obj_one_l = (len(obj_one_s)).to_bytes(1, byteorder='big')
obj_one = bytearray([ord(x) for x in obj_one_s])
# Object 2
obj_two_s = "Start with function code 1"
obj_two_l = (len(obj_two_s)).to_bytes(1, byteorder='big')
obj_two = bytearray([ord(x) for x in obj_two_s])
# Object 3
obj_three_s = "\x00\x01\x00\x00\x00\x06\xff\x01\x00\x00\x00\x01"
obj_three_l = (48).to_bytes(1, byteorder='big')
obj_three = bytearray('\\x'.join(['%02x' % i for i in [ord(x) for x in obj_three_s]]).encode('utf-8'))
data = b"\x00\x01" # Transaction Identifier
data += b"\x00\x00" # Protocol Identifier
data += (
10 +
len(obj_one_s) +
len(obj_two_s) +
int.from_bytes(obj_three_l, byteorder='big')
).to_bytes(2, byteorder='big') # Length
data += b"\xff" # Unit Identifier
data += b"\x2b" # 0010 1011 = Function Code: Encapsulated Interface Transport (43)
data += b"\x0e" # MEI Type: Read Device Identification (14)
data += b"\x01" # Read Device ID code: Basic Device Identification (1)
data += b"\x81" # Conformity level
data += b"\x00" # More Follows
data += b"\x00" # Next Object Id
data += b"\x03" # Number of Objets (3)
# List Of
data += b"\x00" # Object ID
data += obj_one_l # Object length
data += obj_one # Object Value
data += b"\x01" # Object ID
data += obj_two_l # Object length
data += obj_two # Object Value
data += b"\x02" # Object ID
data += obj_three_l # Object length
data += obj_three # Object Value
else:
data = b"I don't know that function code"
# send back to client
c.send(data)
# connection closed
c.close()
except:
print('Bye')
c.close()
def Main():
host = ""
# reverse a port on your computer
# in our case it is 12345 but it
# can be anything
port = 65001
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
print("socket binded to port", port)
# put the socket into listening mode
s.listen(20)
print("socket is listening")
# a forever loop until client wants to exit
while True:
try:
# establish connection with client
c, addr = s.accept()
# lock acquired by client
#print_lock.acquire()
print('Connected to :', addr[0], ':', addr[1])
# Start a new thread and return its identifier
start_new_thread(threaded, (c,))
except KeyboardInterrupt:
s.close()
s.close()
if __name__ == '__main__':
Main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment