Skip to content

Instantly share code, notes, and snippets.

@niccokunzmann
Created April 6, 2018 15:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niccokunzmann/c9c47b4d6b6f3762ca87aeacf17ba8f6 to your computer and use it in GitHub Desktop.
Save niccokunzmann/c9c47b4d6b6f3762ca87aeacf17ba8f6 to your computer and use it in GitHub Desktop.
Send olsr name service announcements with Python.
"""
This module allows sending OLSR v1 messages to announce host names.
You can find more information here:
https://github.com/servalproject/olsr/blob/master/lib/nameservice/README_NAMESERVICE
"""
import struct
import socket
OLSR_PORT = 698
def get_olsr_name_announcement_message(originator_ip, hostname, host_ip, sequence_number):
"""Return a message which announces a given name on the network.
- originator_ip the IP from which this message is supposed to be sent.
Since this service might be running in a docker container, this should be the client IP
in the community network where this service is reachable.
- host_name is the name of the host we want to announce.
- host_ip is the IP of the host we want to announce.
This should be the same as the originator_ip if the service wants to announce its hostname.
- sequence_number is the number of packets sent before.
"""
# assumptions
# -----------
# 1. Each packet only transports one message.
# Therefore, the message sequence number and the packet sequence number are identical.
packet_sequence_number = message_sequence_number = sequence_number
padding = ((-len(hostname)) % 4)
# Wireshark field identifier
message = struct.pack(">H", 40 + len(hostname) + padding) # Packet Length
message += struct.pack(">H", packet_sequence_number & 65535) # Packet Sequence Number
message += b"\x82" # Message Type: Nameservice
message += b"\xce" # Validity Time: 1792,000 seconds
message += struct.pack(">H", 36 + len(hostname) + padding) # Message : length of the one contained message
message += socket.inet_aton(originator_ip) # Originator Address
message += b"\xff" # TTL : 255
message += b"\x00" # Hop Count : 0
message += struct.pack(">H", message_sequence_number & 65535) # : Message Sequence Number
message += b"\x00\x01" # Version : 1
message += b"\x00\x01" # Count : 1
# Nameservice Host embedded message
message += b"\x00\x00" # Message Type : Host (0)
message += struct.pack(">H", len(hostname)) # Length
message += socket.inet_aton(host_ip) # Address
message += b"\x00" * 12
message += hostname.encode() # Content
message += b"\x00" * padding
return message
class OLSR_NameService:
"""This service announces the given names for the ip addresses."""
def __init__(self, originator_ip, announce_to_ips):
"""Create a new OLSR v1 name service announcer with ips to announce it to."""
self.originator_ip = originator_ip
self.announce_to_ips = announce_to_ips
self.packets_sent = 0
# from https://stackoverflow.com/questions/12607516/python-udp-broadcast-not-sending#12607646
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.announcement_to_message_id = set()
def add_entry(self, ip, hostname):
"""Add the ip and its hostname to the announcements."""
self.announcement_to_message_id.add((ip, hostname))
def remove_entry(self, ip, hostname):
"""Remove a specific announcement."""
self.announcement_to_message_id.pop((ip, hostname), None)
def announce(self):
"""Announce the entries."""
for ip, hostname in self.announcement_to_message_id:
packet = get_olsr_name_announcement_message(self.originator_ip, hostname, ip, self.packets_sent)
self.packets_sent += 1
for destination in self.announce_to_ips:
self.socket.sendto(packet, ((destination, OLSR_PORT) if isinstance(destination, str) else destination))
if __name__ == "__main__":
service = OLSR_NameService("10.22.71.211", ["10.22.71.202", "10.22.254.111", "10.22.71.211"])
service.add_entry("10.22.71.211", "test.quelltext.eu")
while not input("Press ENTER to send, space and ENTER to quit: "):
service.announce()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment