-
-
Save LiberiFatali/1d1c19141f5762b916d5466174e398c1 to your computer and use it in GitHub Desktop.
RTSP client with basic and digest authentication - converted to Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
import socket,time,string,base64,hashlib,xml.etree.ElementTree as ET | |
import sys | |
RTSP_VERSION = "RTSP/1.0" | |
LINE_SPLIT_STR = "\r\n" | |
HEADER_END_STR = LINE_SPLIT_STR*2 | |
#-------------------------------------------------------------------------- | |
# Colored Output in Console | |
#-------------------------------------------------------------------------- | |
BLACK,RED,GREEN,YELLOW,BLUE,MAGENTA,CYAN,WHITE = range(90,98) | |
def COLOR_STR(msg,color=WHITE): | |
return '\033[%dm%s\033[0m'%(color,msg) | |
def PRINT(msg,color=WHITE): | |
sys.stdout.write(COLOR_STR(msg,color) + '\n') | |
#-------------------------------------------------------------------------- | |
m_Vars = { | |
"bufLen" : 1024 * 10, | |
"defaultServerIp" : "1.2.3.4", | |
"defaultServerPort" : 554, | |
"defaultTestUri" : "/abc", | |
"defaultUserAgent" : "RTSP Client", | |
"defaultUsername" : "user", | |
"defaultPassword" : "pass" | |
} | |
def genmsg_DESCRIBE(url,seq,userAgent,authSeq): | |
msgRet = "DESCRIBE " + url + " RTSP/1.0\r\n" | |
msgRet += "CSeq: " + str(seq) + "\r\n" | |
msgRet += "Authorization: " + authSeq + "\r\n" | |
msgRet += "User-Agent: " + userAgent + "\r\n" | |
msgRet += "Accept: application/sdp\r\n" | |
msgRet += "\r\n" | |
return msgRet | |
def genmsg_SETUP(url,seq,userAgent,authSeq): | |
msgRet = f"SETUP {url} RTSP/1.0\r\n" | |
msgRet += "CSeq: " + str(seq) + "\r\n" | |
msgRet += "Authorization: " + authSeq + "\r\n" | |
msgRet += "User-Agent: " + userAgent + "\r\n" | |
msgRet += "Blocksize: 65535\r\n" | |
msgRet += "Transport: RTP/AVP/TCP;unicast\r\n" | |
msgRet += "\r\n" | |
return msgRet | |
def genmsg_OPTIONS(url, seq, userAgent): | |
msgRet = "OPTIONS " + url + " RTSP/1.0\r\n" | |
msgRet += "CSeq: " + str(seq) + "\r\n" | |
msgRet += "User-Agent: " + userAgent + "\r\n" | |
# msgRet += "Session: " + sessionId + "\r\n" | |
msgRet += "\r\n" | |
return msgRet | |
def genmsg_PLAY(url,seq,userAgent,sessionId,authSeq): | |
msgRet = "PLAY " + url + " RTSP/1.0\r\n" | |
msgRet += "CSeq: " + str(seq) + "\r\n" | |
msgRet += "Authorization: " + authSeq + "\r\n" | |
msgRet += "User-Agent: " + userAgent + "\r\n" | |
msgRet += "Session: " + sessionId + "\r\n" | |
msgRet += "Range: npt=0.000-\r\n" | |
msgRet += "\r\n" | |
return msgRet | |
def genmsg_TEARDOWN(url,seq,userAgent,sessionId,authSeq): | |
msgRet = "TEARDOWN " + url + " RTSP/1.0\r\n" | |
msgRet += "CSeq: " + str(seq) + "\r\n" | |
msgRet += "User-Agent: " + userAgent + "\r\n" | |
msgRet += "Session: " + sessionId + "\r\n" | |
msgRet += "\r\n" | |
return msgRet | |
def gen_msg(opt, url, seq, user_agent, session_id, auth_seq): | |
msgRet = " ".join((opt, url, RTSP_VERSION + LINE_SPLIT_STR)) | |
msgRet += "CSeq: " + str(seq) + LINE_SPLIT_STR | |
msgRet += "Authorization: " + auth_seq + LINE_SPLIT_STR | |
msgRet += "User-Agent: " + user_agent + LINE_SPLIT_STR | |
msgRet += "Session: " + session_id + LINE_SPLIT_STR | |
msgRet += f"Range: npt=0.000-{LINE_SPLIT_STR}" | |
msgRet += LINE_SPLIT_STR | |
return msgRet | |
def decode_describe_response(strContent): | |
dict_info = { | |
"base_url": None, | |
"tracks": [] | |
} | |
messageStrings = strContent.split("\n") | |
for element in messageStrings: | |
a = element.find("rtsp") | |
if a >= 0: | |
dict_info["base_url"] = element[a:-1] | |
b = element.find("control:track") | |
if b >= 0: | |
dict_info["tracks"].append(element[b+8:-1]) | |
return dict_info | |
def decodeSession(strContent): | |
mapRetInf = {} | |
messageStrings = strContent.split("\n") | |
for element in messageStrings: | |
if "Session" in element: | |
a = element.find(":") | |
b = element.find(";") | |
mapRetInf = element[a+2:b] | |
return mapRetInf | |
def generateAuthString(username,password,realm,method,uri,nonce): | |
mapRetInf = {} | |
m1 = hashlib.md5((username + ":" + realm + ":" + password).encode()).hexdigest() | |
m2 = hashlib.md5((method + ":" + uri).encode()).hexdigest() | |
response = hashlib.md5((m1 + ":" + nonce + ":" + m2).encode()).hexdigest() | |
mapRetInf = "Digest " | |
mapRetInf += "username=\"" + m_Vars["defaultUsername"] + "\", " | |
mapRetInf += "password=\"" + m_Vars["defaultPassword"] + "\", " | |
mapRetInf += "realm=\"" + realm + "\", " | |
mapRetInf += "algorithm=\"MD5\", " | |
mapRetInf += "nonce=\"" + nonce + "\", " | |
mapRetInf += "uri=\"" + uri + "\", " | |
mapRetInf += "response=\"" + response + "\"" | |
return mapRetInf | |
if __name__ == "__main__": | |
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
s.connect((m_Vars["defaultServerIp"],m_Vars["defaultServerPort"])) | |
seq = 1 | |
url = "rtsp://" + m_Vars["defaultServerIp"] + m_Vars["defaultTestUri"] | |
isDigest = False | |
authSeq = base64.b64encode((m_Vars["defaultUsername"] + ":" + m_Vars["defaultPassword"]).encode()) | |
authSeq = "Basic " + str(authSeq) | |
msg_options = genmsg_OPTIONS(url, seq, m_Vars["defaultUserAgent"]) | |
print(msg_options) | |
s.send(msg_options.encode()) | |
msg3 = s.recv(m_Vars["bufLen"]) | |
msg3 = msg3.decode() | |
print(msg3) | |
seq = seq + 1 | |
msg_describe = genmsg_DESCRIBE(url,seq,m_Vars["defaultUserAgent"],authSeq) | |
print(msg_describe) | |
s.send(msg_describe.encode()) | |
msg1 = s.recv(m_Vars["bufLen"]) | |
msg1 = msg1.decode() | |
print(msg1) | |
seq = seq + 1 | |
if msg1.find("Unauthorized") > 1: | |
isDigest = True | |
# New DESCRIBE with digest authentication | |
start = msg1.find("realm") | |
begin = msg1.find("\"", start) | |
end = msg1.find("\"", begin + 1) | |
realm = msg1[begin+1:end] | |
start = msg1.find("nonce") | |
begin = msg1.find("\"", start) | |
end = msg1.find("\"", begin + 1) | |
nonce = msg1[begin+1:end] | |
seq_auth_describe = generateAuthString(m_Vars["defaultUsername"],m_Vars["defaultPassword"],realm,"DESCRIBE",m_Vars["defaultTestUri"],nonce) | |
msg_describe = genmsg_DESCRIBE(url,seq,m_Vars["defaultUserAgent"], seq_auth_describe) | |
print(msg_describe) | |
s.send(msg_describe.encode()) | |
# | |
msg1 = s.recv(m_Vars["bufLen"]) | |
msg1 = msg1.decode() | |
print(msg1) | |
seq = seq + 1 | |
dict_info = decode_describe_response(msg1) | |
print(f"dict_info: {dict_info}") | |
if isDigest == True: | |
seq_auth_setup = generateAuthString(m_Vars["defaultUsername"], m_Vars["defaultPassword"], realm, "SETUP", m_Vars["defaultTestUri"], nonce) | |
msg_setup = genmsg_SETUP(dict_info["base_url"] + dict_info["tracks"][0], seq, m_Vars["defaultUserAgent"], seq_auth_setup) | |
print(msg_setup) | |
s.send(msg_setup.encode()) | |
msg2 = s.recv(m_Vars["bufLen"]) | |
msg2 = msg2.decode() | |
print(msg2) | |
seq = seq + 1 | |
sessionId = decodeSession(msg2) | |
print(f"sessionId: {sessionId}") | |
msg_play = genmsg_PLAY(url + "/",seq, m_Vars["defaultUserAgent"], sessionId, seq_auth_setup) | |
print(msg_play) | |
s.send(msg_play.encode()) | |
# msg4 = s.recv(m_Vars["bufLen"]) | |
# # msg4 = msg4.decode() | |
# print(msg4) | |
recv_buf = b"" | |
try: | |
while True: | |
if HEADER_END_STR in str(recv_buf): | |
break | |
more = s.recv(2048) | |
# print(more) | |
if not more: | |
break | |
recv_buf += more | |
if "OK" in str(more): | |
break | |
except socket.error as e: | |
PRINT("Receive data error: %s" % e, RED) | |
sys.exit(-1) | |
print("recv_buf: ", recv_buf.decode("utf-8", "ignore")) | |
# seq = seq + 1 | |
# starttime = time.time() | |
# while True : | |
# # Send a new RTSP OPTION command to keep the stream alive | |
# now = time.time() | |
# if (now - starttime) > 10: | |
# msg_options = genmsg_OPTIONS(url,seq,m_Vars["defaultUserAgent"],sessionId,authSeq) | |
# print(msg_options) | |
# s.send(msg_options.encode()) | |
# msg5 = s.recv(m_Vars["bufLen"]) | |
# # msg5 = msg5.decode() | |
# print(msg5) | |
# starttime = time.time() | |
# msgRcv = s.recv(m_Vars["bufLen"]) | |
seq = seq + 1 | |
msg_teardown = genmsg_TEARDOWN(url,seq,m_Vars["defaultUserAgent"],sessionId,authSeq) | |
print(msg_teardown) | |
s.send(msg_teardown.encode()) | |
msg6 = s.recv(m_Vars["bufLen"]) | |
msg6 = msg6.decode("utf-8", "ignore") | |
print(msg6) | |
s.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment