Skip to content

Instantly share code, notes, and snippets.

@llamafilm
Created January 16, 2022 03:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save llamafilm/d9731f4f9b48b7c20c4b8eccaa0e52ef to your computer and use it in GitHub Desktop.
Save llamafilm/d9731f4f9b48b7c20c4b8eccaa0e52ef to your computer and use it in GitHub Desktop.
unfinished partial implementation of SAP (Session Announcement Protocol)
import socket
import time
import sdp_transform
# unfinished partial implementation of SAP (Session Announcement Protocol)
# https://datatracker.ietf.org/doc/html/rfc2974
MCAST_GRP = '239.255.255.255'
MCAST_PORT = 9875
MCAST_TTL = 255
SDP_FILE = '''v=0
o=- 3 0 IN IP4 10.49.51.51
s=CP950A 1-8
c=IN IP4 239.69.83.67/32
t=0 0
m=audio 6518 RTP/AVP 96
a=clock-domain:PTPv2 0
a=rtpmap:96 L24/48000/8
a=recvonly
a=framecount:48
a=sync-time:0
a=ptime:1
a=ts-refclk:ptp=IEEE1588-2008:00-60-DB-FF-FE-01-02-C8:0
a=mediaclk:direct=0
'''
def build_header(type, src_ip):
if type == 'announce':
# Flags
x = 32
header = x.to_bytes(1, 'big')
else:
pass
# Authentication Length
x = 0
header += x.to_bytes(1, 'big')
# header Identifier Hash
x = 1
header += x.to_bytes(2, 'big')
# Originating Source
# TODO: handle IPv6 source
header += int(src_ip.split('.')[0]).to_bytes(1, 'big')
header += int(src_ip.split('.')[1]).to_bytes(1, 'big')
header += int(src_ip.split('.')[2]).to_bytes(1, 'big')
header += int(src_ip.split('.')[3]).to_bytes(1, 'big')
# Payload Type
header += bytes('application/sdp', 'ascii')
x = 0
header += x.to_bytes(1, 'big')
return header
def main():
# simp
try:
sdp_dict = sdp_transform.parse(SDP_FILE)
except Exception as e:
print('Failed to parse SDP file', SDP_FILE, e)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, MCAST_TTL)
header = build_header('announce', sdp_dict['origin']['address'])
message = header
# SDP
message += SDP_FILE.encode('ascii')
while True:
print('Sending packet')
sock.sendto(message, (MCAST_GRP, MCAST_PORT))
#break # only run once for testing
time.sleep(5)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment