Skip to content

Instantly share code, notes, and snippets.

@rgov
Created February 15, 2022 00:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rgov/94a3f43081fe2ca9748d6d7ca2b70098 to your computer and use it in GitHub Desktop.
Save rgov/94a3f43081fe2ca9748d6d7ca2b70098 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
This is a so-far unsuccessful script for sniffing ROS service calls.
The idea was to republish them as regular messages so they could be recorded to a rosbag.
But I had trouble getting it to work...
'''
import argparse
import io
import logging
import socket
import urllib.parse
import rospy
import rosservice
import scapy.layers.inet
import scapy.sendrecv
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--service', '-s', default='/jvl_motor/stop')
args = parser.parse_args()
# Get classes for the request and response messages
srvcls = rosservice.get_service_class_by_name(args.service)
reqcls = srvcls._request_class
rescls = srvcls._response_class
# Look up the rosrpc:// URI for the service
uri = urllib.parse.urlparse(rosservice.get_service_uri(args.service))
info = socket.getaddrinfo(uri.hostname, uri.port, proto=socket.IPPROTO_TCP)
addrs = [ entry[4] for entry in info ]
# Generate a BPF filter to catch service calls
rules = []
for host, port in addrs:
rules.append(f'host {host} and port {port}')
rule = ' or '.join(f'({rule})' for rule in rules)
# Protocol decoders
client_protocol = rospy.impl.tcpros_service.TCPROSServiceClient(
args.service, srvcls)
server_protocol = rospy.impl.tcpros_service.TCPService(
args.service, srvcls)
# Callback function for when a packet is received
in_buffer, out_buffer = io.BytesIO(), io.BytesIO()
def process_packet(packet):
# We only care about packets carrying a payload
if scapy.packet.Raw not in packet:
return
# Determine whether this is inbound (i.e., a request to the service)
addr = (
packet[scapy.layers.inet.IP].dst,
packet[scapy.layers.inet.TCP].dport,
)
is_inbound = (addr in addrs)
buffer = in_buffer if is_inbound else out_buffer
protocol = server_protocol if is_inbound else client_protocol
# Append the payload to the corresponding buffer
buffer.write(packet[scapy.packet.Raw].load)
# See if we get any decoded messages out
msgs = []
protocol.read_messages(buffer, msgs, None)
print('> srv' if is_inbound else '< cli', msgs, buffer.tell())
# Start sniffing
scapy.sendrecv.sniff(filter=rule, prn=process_packet)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment