Skip to content

Instantly share code, notes, and snippets.

@pmarkun
Created November 16, 2023 19:21
Show Gist options
  • Save pmarkun/48a790df41ea91fd8ed4f0e515700624 to your computer and use it in GitHub Desktop.
Save pmarkun/48a790df41ea91fd8ed4f0e515700624 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#-*- coding: UTF-8 -*-
# Date: 2015-04-09
import sys, re, socket, threading, time, datetime, traceback
from optparse import OptionParser
import base64
import hashlib
DEFAULT_SERVER_PORT = 1554
TRANSPORT_TYPE_LIST = []
DEST_IP = ''
CLIENT_PORT_RANGE = '10014-10015'
NAT_IP_PORT = ''
ENABLE_ARQ = False
ENABLE_FEC = False
TRANSPORT_TYPE_MAP = {
'ts_over_tcp' : 'MP2T/TCP;%s;interleaved=0-1,',
'rtp_over_tcp' : 'MP2T/RTP/TCP;%s;interleaved=0-1,',
'ts_over_udp' : 'MP2T/UDP;%s;destination=%s;client_port=%s,',
'rtp_over_udp' : 'MP2T/RTP/UDP;%s;destination=%s;client_port=%s,'
}
RTSP_VERSION = 'RTSP/1.0'
DEFAULT_USERAGENT = 'Python Rtsp Client 1.0'
HEARTBEAT_INTERVAL = 10 # 10s
LINE_SPLIT_STR = b'\r\n'
HEADER_END_STR = LINE_SPLIT_STR*2
CUR_RANGE = 'npt=end-'
CUR_SCALE = 1
#x-notice in ANNOUNCE, BOS-Begin of Stream, EOS-End of Stream
X_NOTICE_EOS,X_NOTICE_BOS,X_NOTICE_CLOSE = 2101,2102,2103
#--------------------------------------------------------------------------
# Colored Output in Console
#--------------------------------------------------------------------------
BLACK,RED,GREEN,YELLOW,BLUE,MAGENTA,CYAN,WHITE = range(90,98)
def COLOR_STR(msg,color=WHITE):
return '\033[%dm%s\033[0m'%(color,msg)
def PRINT(msg,color=WHITE):
sys.stdout.write(COLOR_STR(msg,color) + '\n')
#--------------------------------------------------------------------------
class RTSPClient(threading.Thread):
def __init__(self,url):
global CUR_RANGE
threading.Thread.__init__(self)
self.setDaemon(True)
self._recv_buf = b''
self._sock = None
self._orig_url = url
self._cseq = 0
self._session_id= ''
self._cseq_map = {} # {CSeq:Method}映射
self._server_ip,self._server_port,self._target, self._username, self._password = self._parse_url(url)
self.credential_header = self.create_basic_auth_header()
if not self._server_ip or not self._target:
print('Invalid url: %s'%url,RED); sys.exit(1)
if '.sdp' not in self._target.lower():
CUR_RANGE = 'npt=0.00000-' # 点播从头开始
self._connect_server()
self._update_dest_ip()
self.running = True
self.playing = False
self.location = ''
self.start()
def run(self):
try:
while self.running:
msg = self.recv_msg()
if msg.startswith(b'RTSP'):
self._process_response(msg)
elif msg.startswith(b'ANNOUNCE'):
self._process_announce(msg)
except Exception as e:
PRINT('Error: %s'%e,RED)
traceback.print_exc()
self.running = False
self.playing = False
self._sock.close()
def create_basic_auth_header(self):
credentials = f"{self._username}:{self._password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return f'Basic {encoded_credentials}'
def create_digest_response(self, www_authenticate, method, uri, username, password):
realm = re.search(r'realm="([^"]+)"', www_authenticate).group(1)
nonce = re.search(r'nonce="([^"]+)"', www_authenticate).group(1)
ha1 = hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()
ha2 = hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
response = hashlib.md5(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest()
return f'Digest username="{username}", realm="{realm}", nonce="{nonce}", uri="{uri}", response="{response}"'
def _parse_url(self, url):
'''解析url,返回(ip,port,target,user,password)五元组'''
pattern = r'rtsp://(?P<user>[^:]+):(?P<password>[^@]+)@(?P<ip>(\d{1,3}\.){3}\d{1,3})(:(?P<port>\d+))?(?P<target>.*)'
m = re.match(pattern, url)
if m is not None:
user = m.group('user')
password = m.group('password')
ip = m.group('ip')
port = m.group('port') if m.group('port') else DEFAULT_SERVER_PORT
target = m.group('target')
return ip, int(port), target, user, password
else:
print('Invalid URL: %s' % url, RED)
sys.exit(1)
def _connect_server(self):
'''连接服务器,建立socket'''
try:
self._sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self._sock.connect((self._server_ip,self._server_port))
#PRINT('Connect [%s:%d] success!'%(self._server_ip,self._server_port), GREEN)
except socket.error as e:
sys.stderr.write('ERROR: %s[%s:%d]'%(e,self._server_ip,self._server_port))
traceback.print_exc()
sys.exit(1)
def _update_dest_ip(self):
'''如果未指定DEST_IP,默认与RTSP使用相同IP'''
global DEST_IP
if not DEST_IP:
DEST_IP = self._sock.getsockname()[0]
PRINT('DEST_IP: %s\n'%DEST_IP, CYAN)
def recv_msg(self):
'''Receber uma mensagem de resposta completa ou mensagem de notificação ANNOUNCE'''
try:
while True:
if HEADER_END_STR in self._recv_buf:
header_end_index = self._recv_buf.find(HEADER_END_STR) + len(HEADER_END_STR)
msg_header = self._recv_buf[:header_end_index]
self._recv_buf = self._recv_buf[header_end_index:]
content_length = self._get_content_length(msg_header.decode())
while len(self._recv_buf) < content_length:
more = self._sock.recv(2048)
if not more:
break
self._recv_buf += more
msg_body = self._recv_buf[:content_length]
self._recv_buf = self._recv_buf[content_length:]
return msg_header + msg_body
else:
more = self._sock.recv(2048)
if not more:
break
self._recv_buf += more
except socket.error as e:
PRINT('Erro ao receber dados: %s' % e, RED)
sys.exit(-1)
return b''
def _get_content_length(self, msg):
'''Extrair o comprimento do conteúdo da mensagem'''
# msg é uma string
m = re.search(r'[Cc]ontent-[Ll]ength:\s?(?P<len>\d+)', msg, re.S)
return (m and int(m.group('len'))) or 0
def _get_time_str(self):
# python 2.6以上才支持%f参数,为兼容低版本采用以下写法
dt = datetime.datetime.now()
return dt.strftime('%Y-%m-%d %H:%M:%S.') + str(dt.microsecond)
def _process_response(self,msg):
import logging
'''处理响应消息'''
'''Processar mensagem de resposta'''
status, headers, body = self._parse_response(msg)
print(msg)
rsp_cseq = int(headers[b'cseq'])
method = self._cseq_map[rsp_cseq]
if status == 401 and b'www-authenticate' in headers:
www_authenticate = headers[b'www-authenticate'].decode()
digest_response = self.create_digest_response(www_authenticate, method, self._target, self._username, self._password)
self.credential_header = digest_response
self._sendmsg(method, self._orig_url, {}) # Reenviar a requisição original com a resposta Digest
return
if self._cseq_map[rsp_cseq] != b'GET_PARAMETER':
PRINT(self._get_time_str() + '\n' + msg.decode())
if status == 302:
self.location = headers[b'location']
if status != 200:
self.do_teardown()
if self._cseq_map[rsp_cseq] == b'DESCRIBE':
track_id_str = self._parse_track_id(body)
self.do_setup(track_id_str)
elif self._cseq_map[rsp_cseq] == 'SETUP':
self._session_id = headers[b'session']
self.do_play(CUR_RANGE,CUR_SCALE)
self.send_heart_beat_msg()
elif self._cseq_map[rsp_cseq] == b'PLAY':
self.playing = True
def _process_announce(self,msg):
'''处理ANNOUNCE通知消息'''
global CUR_RANGE,CUR_SCALE
PRINT(msg)
headers = self._parse_header_params(msg.splitlines()[1:])
x_notice_val = int(headers['x-notice'])
if x_notice_val in (X_NOTICE_EOS,X_NOTICE_BOS):
CUR_SCALE = 1
self.do_play(CUR_RANGE,CUR_SCALE)
elif x_notice_val == X_NOTICE_CLOSE:
self.do_teardown()
def _parse_response(self,msg):
'''解析响应消息'''
header,body = msg.split(HEADER_END_STR)[:2]
header_lines = header.splitlines()
version,status = header_lines[0].split(None,2)[:2]
headers = self._parse_header_params(header_lines[1:])
return int(status),headers,body
def _parse_header_params(self,header_param_lines):
'''解析头部参数'''
headers = {}
for line in header_param_lines:
if line.strip(): # 跳过空行
key,val = line.split(b':', 1)
headers[key.lower()] = val.strip()
return headers
def _parse_track_id(self,sdp):
'''从sdp中解析trackID=2形式的字符串'''
m = re.search(r'a=control:(?P<trackid>[\w=\d]+)',sdp,re.S)
return (m and m.group('trackid')) or ''
def _next_seq(self):
self._cseq += 1
return self._cseq
def _sendmsg(self, method, url, headers):
'''Enviar mensagem'''
msg = '%s %s %s' % (method, url, RTSP_VERSION)
headers['User-Agent'] = DEFAULT_USERAGENT
headers['Authorization'] = self.credential_header
cseq = self._next_seq()
self._cseq_map[cseq] = method
headers['CSeq'] = str(cseq)
if self._session_id: headers['Session'] = self._session_id
for (k, v) in headers.items():
msg += LINE_SPLIT_STR.decode() + '%s: %s' % (k, str(v))
msg += HEADER_END_STR.decode() # Fim dos cabeçalhos
if method != 'GET_PARAMETER' or 'x-RetransSeq' in headers:
print(self._get_time_str() + LINE_SPLIT_STR.decode() + msg)
try:
self._sock.send(msg.encode()) # Codifica a mensagem em bytes
except socket.error as e:
print('Erro ao enviar mensagem: %s' % e, RED)
def _get_transport_type(self):
'''获取SETUP时需要的Transport字符串参数'''
transport_str = ''
ip_type = 'unicast' #if IPAddress(DEST_IP).is_unicast() else 'multicast'
for t in TRANSPORT_TYPE_LIST:
if t not in TRANSPORT_TYPE_MAP:
PRINT('Error param: %s'%t,RED)
sys.exit(1)
if t.endswith('tcp'):
transport_str += TRANSPORT_TYPE_MAP[t]%ip_type
else:
transport_str += TRANSPORT_TYPE_MAP[t]%(ip_type,DEST_IP,CLIENT_PORT_RANGE)
return transport_str
def do_describe(self, additional_headers={}):
headers = {**additional_headers}
headers['Accept'] = 'application/sdp'
if ENABLE_ARQ:
headers['x-Retrans'] = 'yes'
headers['x-Burst'] = 'yes'
if ENABLE_FEC: headers['x-zmssFecCDN'] = 'yes'
if NAT_IP_PORT: headers['x-NAT'] = NAT_IP_PORT
self._sendmsg('DESCRIBE',self._orig_url,headers)
def do_setup(self,track_id_str='', transport="RTP/AVP;unicast;client_port=6296-6297"):
headers = {}
headers['Transport'] = transport or self._get_transport_type()
self._sendmsg('SETUP',self._orig_url+'/'+track_id_str,headers)
def do_play(self,range='npt=end-',scale=1):
headers = {}
headers['Range'] = range
headers['Scale'] = scale
self._sendmsg('PLAY',self._orig_url,headers)
def do_pause(self):
self._sendmsg('PAUSE',self._orig_url,{})
def do_teardown(self):
self._sendmsg('TEARDOWN',self._orig_url,{})
self.running = False
def do_options(self):
self._sendmsg('OPTIONS',self._orig_url,{})
def do_get_parameter(self):
self._sendmsg('GET_PARAMETER',self._orig_url,{})
def send_heart_beat_msg(self):
'''定时发送GET_PARAMETER消息保活'''
if self.running:
self.do_get_parameter()
threading.Timer(HEARTBEAT_INTERVAL, self.send_heart_beat_msg).start()
#-----------------------------------------------------------------------
# Input with autocompletion
#-----------------------------------------------------------------------
import readline
COMMANDS = ['play','range:','scale:','pause','forward','backward','begin','live','teardown','exit','help']
def complete(text,state):
options = [i for i in COMMANDS if i.startswith(text)]
return (state < len(options) and options[state]) or None
def input_cmd():
readline.set_completer_delims(' \t\n')
readline.parse_and_bind("tab: complete")
readline.set_completer(complete)
cmd = raw_input(COLOR_STR('Input Command # ',CYAN))
PRINT('') # add one line
return cmd
#-----------------------------------------------------------------------
def exec_cmd(rtsp,cmd):
'''根据命令执行操作'''
global CUR_RANGE,CUR_SCALE
if cmd in ('exit','teardown'):
rtsp.do_teardown()
elif cmd == 'pause':
CUR_SCALE = 1; CUR_RANGE = 'npt=now-'
rtsp.do_pause()
elif cmd == 'help':
PRINT(play_ctrl_help())
elif cmd == 'forward':
if CUR_SCALE < 0: CUR_SCALE = 1
CUR_SCALE *= 2; CUR_RANGE = 'npt=now-'
elif cmd == 'backward':
if CUR_SCALE > 0: CUR_SCALE = -1
CUR_SCALE *= 2; CUR_RANGE = 'npt=now-'
elif cmd == 'begin':
CUR_SCALE = 1; CUR_RANGE = 'npt=beginning-'
elif cmd == 'live':
CUR_SCALE = 1; CUR_RANGE = 'npt=end-'
elif cmd.startswith(b'play'):
m = re.search(r'range[:\s]+(?P<range>[^\s]+)',cmd)
if m: CUR_RANGE = m.group('range')
m = re.search(r'scale[:\s]+(?P<scale>[\d\.]+)',cmd)
if m: CUR_SCALE = int(m.group('scale'))
if cmd not in ('pause','exit','teardown','help'):
rtsp.do_play(CUR_RANGE,CUR_SCALE)
def main(url):
rtsp = RTSPClient(url)
rtsp.do_describe({"Require" : "www.onvif.org/ver20/backchannel"})
time.sleep(0.5)
print("############")
#rtsp.do_setup()
rtsp._sendmsg('SETUP','rtsp://192.168.0.101/track2',{})
try:
while rtsp.running or rtsp.location:
if rtsp.playing:
cmd = input_cmd()
exec_cmd(rtsp,cmd)
# 302重定向重新建链
if not rtsp.running and rtsp.location:
rtsp = RTSPClient(rtsp.location)
rtsp.do_describe()
time.sleep(0.5)
except KeyboardInterrupt:
rtsp.do_teardown()
print('\n^C received, Exit.')
def play_ctrl_help():
help = COLOR_STR('In running, you can control play by input "forward","backward","begin","live","pause"\n',MAGENTA)
help += COLOR_STR('or "play" with "range" and "scale" parameter, such as "play range:npt=beginning- scale:2"\n',MAGENTA)
help += COLOR_STR('You can input "exit","teardown" or ctrl+c to quit\n',MAGENTA)
return help
if __name__ == '__main__':
usage = COLOR_STR('%prog [options] url\n\n',GREEN) + play_ctrl_help()
parser = OptionParser(usage=usage)
parser.add_option('-t','--transport',dest='transport',default='tcp_over_udp',help='Set transport type when SETUP: ts_over_tcp, ts_over_udp, rtp_over_tcp, rtp_over_udp[default]')
parser.add_option('-d','--dest_ip',dest='dest_ip',help='Set dest ip of udp data transmission, default use same ip with rtsp')
parser.add_option('-p','--client_port',dest='client_port',help='Set client port range when SETUP of udp, default is "10014-10015"')
parser.add_option('-n','--nat',dest='nat',help='Add "x-NAT" when DESCRIBE, arg format "192.168.1.100:20008"')
parser.add_option('-r','--arq',dest='arq',action="store_true",help='Add "x-Retrans:yes" when DESCRIBE')
parser.add_option('-f','--fec',dest='fec',action="store_true",help='Add "x-zmssFecCDN:yes" when DESCRIBE')
(options,args) = parser.parse_args()
if len(args) < 1:
parser.print_help()
sys.exit()
if options.transport: TRANSPORT_TYPE_LIST = options.transport.split(',')
if options.dest_ip: DEST_IP = options.dest_ip; print(DEST_IP)
if options.client_port: CLIENT_PORT_RANGE = options.client_port
if options.nat: NAT_IP_PORT = options.nat
if options.arq: ENABLE_ARQ = options.arq
if options.fec: ENABLE_FEC = options.fec
url = args[0]
main(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment