Skip to content

Instantly share code, notes, and snippets.

@meeuw
Created October 2, 2020 19:05
Show Gist options
  • Save meeuw/d25665c2ab8d68ecee7bd65a71ed365b to your computer and use it in GitHub Desktop.
Save meeuw/d25665c2ab8d68ecee7bd65a71ed365b to your computer and use it in GitHub Desktop.
Simple STUN implementation in Python, usage: python pystun.py stun.l.google.com 19302
#!/usr/bin/python
import socket
import uuid
import struct
import io
import sys
def unpack(stream, fmt):
size = struct.calcsize(fmt)
buf = stream.read(size)
return struct.unpack(fmt, buf)
def stun(host, port=3478):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transaction_id = uuid.uuid4().bytes
message_type = b"\x00\x01" # binding-request
message_length = b"\x00\x00"
request = message_type + message_length + transaction_id
sock.sendto(request, (host, port))
response, address = sock.recvfrom(2048)
response_len = len(response)
response = io.BytesIO(response)
message_type, message_length = unpack(response, "!HH")
assert transaction_id == response.read(16)
assert message_length + 20 == response_len
response_attributes = []
while response.tell() < response_len:
attribute_type, attribute_length = unpack(response, "!HH")
if attribute_type == 0x0001: # mapped-address
zero, family, port = unpack(response, "!BBH")
assert zero == 0
assert family == 0x01 # IPv4
address = socket.inet_ntoa(response.read(4))
response_attributes.append({
"address": address,
"port": port
})
else:
# print("attribute_type", attribute_type)
response.read(attribute_length)
return response_attributes
if __name__ == '__main__':
print(stun(sys.argv[1], int(sys.argv[2])))
@pradt2
Copy link

pradt2 commented Feb 6, 2022

Here's what I used to check all hosts from the always-online-stun project.

#!/usr/bin/env python3

import socket
import uuid
import struct
import io
import sys
import urllib.request


def unpack(stream, fmt):
    size = struct.calcsize(fmt)
    buf = stream.read(size)
    return struct.unpack(fmt, buf)


def stun(host, port=3478):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    transaction_id = uuid.uuid4().bytes
    message_type = b"\x00\x01"  # binding-request
    message_length = b"\x00\x00"
    request = message_type + message_length + transaction_id
    sock.sendto(request, (host, port))
    sock.settimeout(5.0)
    response, address = sock.recvfrom(2048)
    response_len = len(response)
    response = io.BytesIO(response)

    message_type, message_length = unpack(response, "!HH")
    assert transaction_id == response.read(16)
    assert message_length + 20 == response_len

    response_attributes = []
    while response.tell() < response_len:
        attribute_type, attribute_length = unpack(response, "!HH")
        if attribute_type == 0x0001:  # mapped-address
            zero, family, port = unpack(response, "!BBH")
            assert zero == 0
            assert family == 0x01  # IPv4
            address = socket.inet_ntoa(response.read(4))
            response_attributes.append({
                "address": address,
                "port": port
            })
        else:
            # print("attribute_type", attribute_type)
            response.read(attribute_length)
    return response_attributes

if __name__ == '__main__':
    file = urllib.request.urlopen("https://raw.githubusercontent.com/pradt2/always-online-stun/master/valid_ipv4s.txt")
    for line in file:
        host, port = line.decode("utf-8").split(":")
        host = host.strip()
        port = port.strip()
        try:
            print("Response from " + host + ":" + port + " -> " + str(stun(host, int(port))))
        except:
            print("Response from " + host + ":" + port + " -> " + "Timeout or other error")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment