Skip to content

Instantly share code, notes, and snippets.

@th0ma5w
Created June 10, 2018 23:54
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save th0ma5w/d7e69454a24dcadda89a226252421705 to your computer and use it in GitHub Desktop.
Save th0ma5w/d7e69454a24dcadda89a226252421705 to your computer and use it in GitHub Desktop.
Single Page Web Application and REST API for TzumiMagicTV
# th0ma5w at github
#
# requires:
# http://archive.openwrt.org/attitude_adjustment/12.09/ar71xx/generic/packages/zlib_1.2.7-1_ar71xx.ipk
# http://archive.openwrt.org/attitude_adjustment/12.09/ar71xx/generic/packages/python-mini_2.7.3-1_ar71xx.ipk
#
# python ./tzumi_server.py
#
import socket
tvs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tvs.settimeout(10)
tvs.connect(('127.0.0.1',6000))
tvs.send('<msg type="login_req"><params protocol="TCP" port="8000"/></msg>')
tv_response = tvs.recv(4096)
def tune(freq, prog):
global tv_response, tvs
tvs.send('<msg type="tune_req"><params tv_type="ATSC" freq="%s" bandwidth="8000" plp="0" programNumber="%s"/></msg>' % (freq, prog))
tv_response = tvs.recv(4096)
return tv_response
def check():
global tv_response, tvs
tvs.send('<msg type="Check_Lock_req"></msg>')
tv_response = tvs.recv(4096)
return tv_response
MAX_PACKET = 32768
server_sock = socket.socket(
socket.AF_INET,
socket.SOCK_STREAM,
socket.IPPROTO_TCP)
server_sock.bind(('0.0.0.0',80))
server_sock.listen(1)
def recv_all(sock):
prev_timeout = sock.gettimeout()
try:
sock.settimeout(0.01)
rdata=[]
while True:
try:
rdata.append(sock.recv(MAX_PACKET))
except socket.timeout:
return ''.join(rdata)
finally:
sock.settimeout(prev_timeout)
def normalize_line_endings(s):
return ''.join((line + '\n') for line in s.splitlines())
html_header = "<html>"
html_footer = "</html>"
head_header = "<head>"
head_footer = "</head>"
body_header = "<body>"
body_footer = "</body>"
xml_header = "<xml>"
xml_footer = "</xml>"
#p=new DOMParser();
#d=p.parseFromString('<xml><msg>hello</msg></xml>','application/xml');
#console.log(d.getElementsByTagName('msg')[0].textContent)
css_styles = """
<style>
</style>
"""
javascript_source = """
<script>
dom_parser = new DOMParser();
var TUNING = false;
var TUNED = false;
tune_url = function(freq,prog) { return "/tune/" + freq + "/" + prog; }
check_url = "/check"
channels = {
2:57000,
3:63000,
4:69000,
5:79000,
6:85000,
7:177000,
8:183000,
9:189000,
10:195000,
11:201000,
12:207000,
13:213000,
14:473000,
15:479000,
16:485000,
17:491000,
18:497000,
19:503000,
20:509000,
21:515000,
22:521000,
23:527000,
24:533000,
25:539000,
26:545000,
27:551000,
28:557000,
29:563000,
30:559000,
31:575000,
32:581000,
33:587000,
34:593000,
35:559000,
36:605000,
37:611000,
38:617000,
39:613000,
40:629000,
41:635000,
42:641000,
43:647000,
44:653000,
45:659000,
46:665000,
47:671000,
48:677000,
49:683000,
50:689000,
51:695000
}
var channeldiv = document.getElementById('channeldiv');
update_freq = function(f){
freq=document.getElementById('frequency').value=f;
return true;
}
var channel_block_count = 0;
for (c in channels){
existing = channeldiv.innerHTML;
newhtml = existing + '<input type="radio" id="c'+c+'" name="channel" value="'+channels[c]+'" onclick="update_freq('+channels[c]+');">' + c + ' ';
channel_block_count = channel_block_count + 1;
if (channel_block_count % 10 === 0) {
newhtml = newhtml + "<br />";
}
channeldiv.innerHTML = newhtml;
}
set_tv_tuned = function() {document.getElementById('status').innerHTML="Tuned"};
set_tv_untuned = function() {document.getElementById('status').innerHTML="Untuned"};
process_check_response = function(xml_response){
d=dom_parser.parseFromString(xml_response,'application/xml');
ack_info=d.getElementsByTagName('ack_info')[0]
ret_attrib=ack_info.attributes['ret']
ret=ret_attrib.value
if (ret==="0") {
TUNED=true;
console.log("TUNED");
set_tv_tuned();
}
if (ret==="-1") {
TUNED=false;
console.log("UNTUNED");
set_tv_untuned();
}
TUNING = false;
}
send_request = function(url){
var r = new XMLHttpRequest();
r.open('GET', url, true);
r.onreadystatechange=function(){
if (r.readyState==4 && r.status==200){
process_check_response(r.responseText);
}
};
r.send();
}
check_tv = function(){
if (! TUNING){
send_request(check_url);
}
}
tune_tv_f = function(freq,prog) {
TUNING = true;
send_request(tune_url(freq,prog));
}
tune_tv = function() {
TUNING = true;
freq=document.getElementById('frequency').value;
prog=document.getElementById('program').value;
send_request(tune_url(freq,prog));
}
let check_timer = setInterval(check_tv, 1000);
</script>
"""
def html_head():
return head_header + css_styles + head_footer
def body_wrap(s):
return body_header + s + body_footer
def html_wrap(s):
return html_header + body_wrap(s) + html_footer
def html_app_wrap(s):
return html_header + html_head() + body_wrap(s+javascript_source) + html_footer
def xml_wrap(s):
return xml_header + s + xml_footer
html_headers = {
'Content-Type': 'text/html; encoding=utf8',
}
xml_headers = {
'Content-Type': 'application/xml; encoding=utf8',
}
json_headers = {
'Content-Type': 'application/json; encoding=utf8',
}
playlist_headers = {
'Content-Type': 'application/vnd.apple.mpegurl; encoding=utf8',
}
def tune_response(r):
parts = r.uri.split('/')
freq = parts[2]
prog = parts[3]
response = tune(freq,prog)
return (xml_headers, xml_wrap(response))
def check_response(r):
response = check()
return (xml_headers, xml_wrap(response))
def make_response_debug(r):
br = "<br/>"
response_body = [
"URI: " + r.uri + br,
"Method: " + r.method + br,
"Proto: " + r.proto + br,
"Body: " + r.body + br,
br,
"<ul>"
]
for k,v in r.headers.iteritems():
response_body.append('<li><b>%s</b> : %s</li>' % (k,v))
response_body.append('</ul>')
return (html_headers, html_wrap(''.join(response_body)))
def test_xml(r):
global tv_response
tvs.send('<msg type="login_req"><params protocol="TCP" port="8000"/></msg>')
tv_response = tvs.recv(4096)
#fix response
tv_response = tv_response.replace('"<','"/><')
tv_response = tv_response.replace('"device_type','" device_type')
return (xml_headers, xml_wrap(tv_response))
def web_app(r):
return (html_headers, html_app_wrap("""
<b>TV Tuner</b><br/>
<table>
<tr>
<td>Status:</td>
<td><span id="status"></status></td>
</tr>
<tr>
<td>Frequency:</td>
<td><input id="frequency" type="text"></input></td>
</tr>
<tr>
<td>Program:</td>
<td><input id="program" type="text"></input></td>
</tr>
</table>
<button type="button" onClick="tune_tv()">Tune</button><br/>
<div id="channeldiv"></div>
"""))
routes = {
'/tune/' : tune_response,
'/check' : check_response,
'/xml' : test_xml,
"/" : web_app
}
class Request:
headers=None
body=None
method=None
uri=None
proto=None
def resolve_route(headers,body,method,uri,proto):
r=Request()
r.headers, r.body, r.method, r.uri, r.proto = headers,body,method,uri,proto
matched = False
for k,v in routes.iteritems():
if (uri[:len(k)] == k):
matched = True
return v(r)
if not matched:
return make_response_debug(r)
while True:
client_sock, clinet_addr = server_sock.accept()
try:
request = normalize_line_endings(recv_all(client_sock))
request_head, request_body = request.split('\n\n',1)
request_head = request_head.splitlines()
request_headline = request_head[0]
request_headers = dict(x.split(': ', 1) for x in request_head[1:])
request_method, request_uri, request_proto = request_headline.split(
' ', 3)
response_headers, response_body = resolve_route(
request_headers,
request_body,
request_method,
request_uri,
request_proto)
response_headers.update({
'Content-Length': len(response_body),
'Connection': 'close'
})
response_headers_raw = ''.join('%s: %s\n' % (k,v) for k,v in
response_headers.iteritems())
response_proto = 'HTTP/1.1'
response_status = '200'
response_status_text = 'OK'
client_sock.send('%s %s %s' % (
response_proto,
response_status,
response_status_text))
client_sock.send(response_headers_raw)
client_sock.send('\n')
client_sock.send(response_body)
except:
pass
client_sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment