Skip to content

Instantly share code, notes, and snippets.

@LiberiFatali
Forked from crayfishapps/rtsp_authentication.py
Last active June 23, 2023 01:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LiberiFatali/1d1c19141f5762b916d5466174e398c1 to your computer and use it in GitHub Desktop.
Save LiberiFatali/1d1c19141f5762b916d5466174e398c1 to your computer and use it in GitHub Desktop.
RTSP client with basic and digest authentication - converted to Python3
#! /usr/bin/python
import socket,time,string,base64,hashlib,xml.etree.ElementTree as ET
import sys
RTSP_VERSION = "RTSP/1.0"
LINE_SPLIT_STR = "\r\n"
HEADER_END_STR = LINE_SPLIT_STR*2
#--------------------------------------------------------------------------
# Colored Output in Console
#--------------------------------------------------------------------------
BLACK,RED,GREEN,YELLOW,BLUE,MAGENTA,CYAN,WHITE = range(90,98)
def COLOR_STR(msg,color=WHITE):
return '\033[%dm%s\033[0m'%(color,msg)
def PRINT(msg,color=WHITE):
sys.stdout.write(COLOR_STR(msg,color) + '\n')
#--------------------------------------------------------------------------
m_Vars = {
"bufLen" : 1024 * 10,
"defaultServerIp" : "1.2.3.4",
"defaultServerPort" : 554,
"defaultTestUri" : "/abc",
"defaultUserAgent" : "RTSP Client",
"defaultUsername" : "user",
"defaultPassword" : "pass"
}
def genmsg_DESCRIBE(url,seq,userAgent,authSeq):
msgRet = "DESCRIBE " + url + " RTSP/1.0\r\n"
msgRet += "CSeq: " + str(seq) + "\r\n"
msgRet += "Authorization: " + authSeq + "\r\n"
msgRet += "User-Agent: " + userAgent + "\r\n"
msgRet += "Accept: application/sdp\r\n"
msgRet += "\r\n"
return msgRet
def genmsg_SETUP(url,seq,userAgent,authSeq):
msgRet = f"SETUP {url} RTSP/1.0\r\n"
msgRet += "CSeq: " + str(seq) + "\r\n"
msgRet += "Authorization: " + authSeq + "\r\n"
msgRet += "User-Agent: " + userAgent + "\r\n"
msgRet += "Blocksize: 65535\r\n"
msgRet += "Transport: RTP/AVP/TCP;unicast\r\n"
msgRet += "\r\n"
return msgRet
def genmsg_OPTIONS(url, seq, userAgent):
msgRet = "OPTIONS " + url + " RTSP/1.0\r\n"
msgRet += "CSeq: " + str(seq) + "\r\n"
msgRet += "User-Agent: " + userAgent + "\r\n"
# msgRet += "Session: " + sessionId + "\r\n"
msgRet += "\r\n"
return msgRet
def genmsg_PLAY(url,seq,userAgent,sessionId,authSeq):
msgRet = "PLAY " + url + " RTSP/1.0\r\n"
msgRet += "CSeq: " + str(seq) + "\r\n"
msgRet += "Authorization: " + authSeq + "\r\n"
msgRet += "User-Agent: " + userAgent + "\r\n"
msgRet += "Session: " + sessionId + "\r\n"
msgRet += "Range: npt=0.000-\r\n"
msgRet += "\r\n"
return msgRet
def genmsg_TEARDOWN(url,seq,userAgent,sessionId,authSeq):
msgRet = "TEARDOWN " + url + " RTSP/1.0\r\n"
msgRet += "CSeq: " + str(seq) + "\r\n"
msgRet += "User-Agent: " + userAgent + "\r\n"
msgRet += "Session: " + sessionId + "\r\n"
msgRet += "\r\n"
return msgRet
def gen_msg(opt, url, seq, user_agent, session_id, auth_seq):
msgRet = " ".join((opt, url, RTSP_VERSION + LINE_SPLIT_STR))
msgRet += "CSeq: " + str(seq) + LINE_SPLIT_STR
msgRet += "Authorization: " + auth_seq + LINE_SPLIT_STR
msgRet += "User-Agent: " + user_agent + LINE_SPLIT_STR
msgRet += "Session: " + session_id + LINE_SPLIT_STR
msgRet += f"Range: npt=0.000-{LINE_SPLIT_STR}"
msgRet += LINE_SPLIT_STR
return msgRet
def decode_describe_response(strContent):
dict_info = {
"base_url": None,
"tracks": []
}
messageStrings = strContent.split("\n")
for element in messageStrings:
a = element.find("rtsp")
if a >= 0:
dict_info["base_url"] = element[a:-1]
b = element.find("control:track")
if b >= 0:
dict_info["tracks"].append(element[b+8:-1])
return dict_info
def decodeSession(strContent):
mapRetInf = {}
messageStrings = strContent.split("\n")
for element in messageStrings:
if "Session" in element:
a = element.find(":")
b = element.find(";")
mapRetInf = element[a+2:b]
return mapRetInf
def generateAuthString(username,password,realm,method,uri,nonce):
mapRetInf = {}
m1 = hashlib.md5((username + ":" + realm + ":" + password).encode()).hexdigest()
m2 = hashlib.md5((method + ":" + uri).encode()).hexdigest()
response = hashlib.md5((m1 + ":" + nonce + ":" + m2).encode()).hexdigest()
mapRetInf = "Digest "
mapRetInf += "username=\"" + m_Vars["defaultUsername"] + "\", "
mapRetInf += "password=\"" + m_Vars["defaultPassword"] + "\", "
mapRetInf += "realm=\"" + realm + "\", "
mapRetInf += "algorithm=\"MD5\", "
mapRetInf += "nonce=\"" + nonce + "\", "
mapRetInf += "uri=\"" + uri + "\", "
mapRetInf += "response=\"" + response + "\""
return mapRetInf
if __name__ == "__main__":
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((m_Vars["defaultServerIp"],m_Vars["defaultServerPort"]))
seq = 1
url = "rtsp://" + m_Vars["defaultServerIp"] + m_Vars["defaultTestUri"]
isDigest = False
authSeq = base64.b64encode((m_Vars["defaultUsername"] + ":" + m_Vars["defaultPassword"]).encode())
authSeq = "Basic " + str(authSeq)
msg_options = genmsg_OPTIONS(url, seq, m_Vars["defaultUserAgent"])
print(msg_options)
s.send(msg_options.encode())
msg3 = s.recv(m_Vars["bufLen"])
msg3 = msg3.decode()
print(msg3)
seq = seq + 1
msg_describe = genmsg_DESCRIBE(url,seq,m_Vars["defaultUserAgent"],authSeq)
print(msg_describe)
s.send(msg_describe.encode())
msg1 = s.recv(m_Vars["bufLen"])
msg1 = msg1.decode()
print(msg1)
seq = seq + 1
if msg1.find("Unauthorized") > 1:
isDigest = True
# New DESCRIBE with digest authentication
start = msg1.find("realm")
begin = msg1.find("\"", start)
end = msg1.find("\"", begin + 1)
realm = msg1[begin+1:end]
start = msg1.find("nonce")
begin = msg1.find("\"", start)
end = msg1.find("\"", begin + 1)
nonce = msg1[begin+1:end]
seq_auth_describe = generateAuthString(m_Vars["defaultUsername"],m_Vars["defaultPassword"],realm,"DESCRIBE",m_Vars["defaultTestUri"],nonce)
msg_describe = genmsg_DESCRIBE(url,seq,m_Vars["defaultUserAgent"], seq_auth_describe)
print(msg_describe)
s.send(msg_describe.encode())
#
msg1 = s.recv(m_Vars["bufLen"])
msg1 = msg1.decode()
print(msg1)
seq = seq + 1
dict_info = decode_describe_response(msg1)
print(f"dict_info: {dict_info}")
if isDigest == True:
seq_auth_setup = generateAuthString(m_Vars["defaultUsername"], m_Vars["defaultPassword"], realm, "SETUP", m_Vars["defaultTestUri"], nonce)
msg_setup = genmsg_SETUP(dict_info["base_url"] + dict_info["tracks"][0], seq, m_Vars["defaultUserAgent"], seq_auth_setup)
print(msg_setup)
s.send(msg_setup.encode())
msg2 = s.recv(m_Vars["bufLen"])
msg2 = msg2.decode()
print(msg2)
seq = seq + 1
sessionId = decodeSession(msg2)
print(f"sessionId: {sessionId}")
msg_play = genmsg_PLAY(url + "/",seq, m_Vars["defaultUserAgent"], sessionId, seq_auth_setup)
print(msg_play)
s.send(msg_play.encode())
# msg4 = s.recv(m_Vars["bufLen"])
# # msg4 = msg4.decode()
# print(msg4)
recv_buf = b""
try:
while True:
if HEADER_END_STR in str(recv_buf):
break
more = s.recv(2048)
# print(more)
if not more:
break
recv_buf += more
if "OK" in str(more):
break
except socket.error as e:
PRINT("Receive data error: %s" % e, RED)
sys.exit(-1)
print("recv_buf: ", recv_buf.decode("utf-8", "ignore"))
# seq = seq + 1
# starttime = time.time()
# while True :
# # Send a new RTSP OPTION command to keep the stream alive
# now = time.time()
# if (now - starttime) > 10:
# msg_options = genmsg_OPTIONS(url,seq,m_Vars["defaultUserAgent"],sessionId,authSeq)
# print(msg_options)
# s.send(msg_options.encode())
# msg5 = s.recv(m_Vars["bufLen"])
# # msg5 = msg5.decode()
# print(msg5)
# starttime = time.time()
# msgRcv = s.recv(m_Vars["bufLen"])
seq = seq + 1
msg_teardown = genmsg_TEARDOWN(url,seq,m_Vars["defaultUserAgent"],sessionId,authSeq)
print(msg_teardown)
s.send(msg_teardown.encode())
msg6 = s.recv(m_Vars["bufLen"])
msg6 = msg6.decode("utf-8", "ignore")
print(msg6)
s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment