Created
February 15, 2022 00:41
-
-
Save rgov/94a3f43081fe2ca9748d6d7ca2b70098 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
This is a so-far unsuccessful script for sniffing ROS service calls. | |
The idea was to republish them as regular messages so they could be recorded to a rosbag. | |
But I had trouble getting it to work... | |
''' | |
import argparse | |
import io | |
import logging | |
import socket | |
import urllib.parse | |
import rospy | |
import rosservice | |
import scapy.layers.inet | |
import scapy.sendrecv | |
logging.basicConfig(level=logging.INFO) | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--service', '-s', default='/jvl_motor/stop') | |
args = parser.parse_args() | |
# Get classes for the request and response messages | |
srvcls = rosservice.get_service_class_by_name(args.service) | |
reqcls = srvcls._request_class | |
rescls = srvcls._response_class | |
# Look up the rosrpc:// URI for the service | |
uri = urllib.parse.urlparse(rosservice.get_service_uri(args.service)) | |
info = socket.getaddrinfo(uri.hostname, uri.port, proto=socket.IPPROTO_TCP) | |
addrs = [ entry[4] for entry in info ] | |
# Generate a BPF filter to catch service calls | |
rules = [] | |
for host, port in addrs: | |
rules.append(f'host {host} and port {port}') | |
rule = ' or '.join(f'({rule})' for rule in rules) | |
# Protocol decoders | |
client_protocol = rospy.impl.tcpros_service.TCPROSServiceClient( | |
args.service, srvcls) | |
server_protocol = rospy.impl.tcpros_service.TCPService( | |
args.service, srvcls) | |
# Callback function for when a packet is received | |
in_buffer, out_buffer = io.BytesIO(), io.BytesIO() | |
def process_packet(packet): | |
# We only care about packets carrying a payload | |
if scapy.packet.Raw not in packet: | |
return | |
# Determine whether this is inbound (i.e., a request to the service) | |
addr = ( | |
packet[scapy.layers.inet.IP].dst, | |
packet[scapy.layers.inet.TCP].dport, | |
) | |
is_inbound = (addr in addrs) | |
buffer = in_buffer if is_inbound else out_buffer | |
protocol = server_protocol if is_inbound else client_protocol | |
# Append the payload to the corresponding buffer | |
buffer.write(packet[scapy.packet.Raw].load) | |
# See if we get any decoded messages out | |
msgs = [] | |
protocol.read_messages(buffer, msgs, None) | |
print('> srv' if is_inbound else '< cli', msgs, buffer.tell()) | |
# Start sniffing | |
scapy.sendrecv.sniff(filter=rule, prn=process_packet) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment