Created
December 28, 2012 04:53
-
-
Save nnabeyang/4394635 to your computer and use it in GitHub Desktop.
xhr2でバイナリデータを送信するテストです。ボタンをクリックするとクライアントから3バイトの情報がArrayBufferで送られます。それをサーバーがバイナリとして受け取り、それをそのままクライアントに送り返します。クライアント側で受け取った3バイトの情報をtextareaに表示します。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<html> | |
<head> | |
<meta http-equiv="Content-Type" content="text/html;charset=utf-8" > | |
<title>xhr2 test</title> | |
<body> | |
<textarea id="response"></textarea> | |
<button id="send">send</button> | |
<script> | |
var txtArea = document.getElementById('response'); | |
document.getElementById('send').addEventListener('click', function(e) { | |
var xhr = new XMLHttpRequest(); | |
xhr.open('POST', '/send', true); | |
xhr.responseType = 'arraybuffer'; | |
xhr.onload = function(e) { | |
var dv; | |
if(this.status == 200) { | |
var dv = new DataView(this.response); | |
txtArea.value = '('+ | |
dv.getUint8(0) + ','+ | |
dv.getUint8(1) + ','+ | |
dv.getUint8(2) + ')'; | |
} else { | |
console.log(this.status); | |
} | |
}; | |
buf = new ArrayBuffer(3); | |
dv = new DataView(buf); | |
dv.setUint8(0, 0x01); | |
dv.setUint8(1, 0x02); | |
dv.setUint8(2, 0x03); | |
xhr.send(buf); | |
}, false); | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- encoding: utf-8 -*- | |
import os | |
from BaseHTTPServer import BaseHTTPRequestHandler | |
import binascii | |
ERROR_HTML = \ | |
"<head>\n"\ | |
"<title>Error response</title>\n"\ | |
"</head>\n"\ | |
"<body>\n"\ | |
"<h1>Error response</h1>\n"\ | |
"<p>Error code %(code)d.\n"\ | |
"<p>Message: %(message)s.\n"\ | |
"<p>Error code explanation: %(code)d = %(explain)s.\n"\ | |
"</body>\n" | |
def date_time_string(): | |
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( | |
"Sun", 28, "Nov", 2011, 3, 18, 14) | |
def SimpleApplication(environ, start_response): | |
req = Request(environ, start_response) | |
Method = req.environ['REQUEST_METHOD'] | |
if Method == 'HEAD': | |
req.send_file(environ['PATH_INFO'], isget=False) | |
elif Method == 'GET': | |
req.send_file(environ['PATH_INFO']) | |
elif Method == 'POST': | |
buf = [] | |
rfile = environ['wsgi.input'] | |
data = rfile.read(int(environ['CONTENT_LENGTH'])) | |
req.app_run(data) | |
else: | |
req.send_error(501, "Unsupported method (%r)" % Method) | |
return '' | |
class Request(object): | |
def __init__(self, environ, start_response): | |
self.environ = environ | |
self._start_response = start_response | |
self._outheaders = [] | |
self._write = None | |
self.base_path = environ.get('app.base_path', '/static') | |
def app_run(self, data,content_type="application/octet-stream", status=200): | |
if self.environ['PATH_INFO'] != '/send': | |
self.send_error(501, "Unsupported path (%r)" % self.environ['PATH_INFO']) | |
return | |
print(binascii.hexlify(data)) | |
self.send_response(status) | |
self.send_header("Content-type", content_type) | |
path_info = self.environ['PATH_INFO'] | |
self.send_header("Content-Length", str(len(data))) | |
self.send_header("Last-Modified", date_time_string()) | |
self.end_headers() | |
self._write(data) | |
def send_file(self, path, content_type="text/html", status=200, isget=True): | |
if path == '/': | |
path = '.' + self.base_path + '/index.html' | |
elif path.startswith(self.base_path): | |
if path.endswith('index.html'): | |
self.send_error(404, "File not found") | |
return | |
path = '.' + path | |
else: | |
path = '.' + self.base_path + path | |
if not os.path.exists(path): | |
self.send_error(404, "File not found") | |
return | |
self.send_response(status) | |
self.send_header("Content-type", content_type) | |
size = os.stat(path).st_size | |
self.send_header("Content-Length", str(size)) | |
self.send_header("Last-Modified", date_time_string()) | |
self.end_headers() | |
if isget and size > 0: | |
f = open(path, "rb") | |
self._write(f.read()) | |
else: | |
self._write('') | |
def send_error(self, status, message): | |
try: | |
short, explain = BaseHTTPRequestHandler.responses[status] | |
except: | |
explain = '???' | |
self.send_response(status) | |
self.send_header("Content-type", "text/html") | |
self.send_header("Connection", "close") | |
self.end_headers() | |
self._write(ERROR_HTML % {'code': status, 'message': message, 'explain': explain}) | |
def send_response(self, status): | |
self._status = status | |
def send_header(self, key, value): | |
self._outheaders.append((key, value)) | |
def end_headers(self): | |
status = "%d OK" % self._status | |
self._write = self._start_response(status, self._outheaders) | |
application = SimpleApplication |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment