Skip to content
Create a gist now

Instantly share code, notes, and snippets.

Embed URL


Subversion checkout URL

You can clone with
Download ZIP
WebSocket protocol for Diesel

WebSocket protocol for Diesel

Not tested, but a potential echo + timeout example:

from diesel import Application, Service, until_eol, sleep, bytes
from websocket import WebSocket
import time

def skip_one():
    yield bytes(1)
    yield up(until_eol())

def hi_server():
    while True:
        inp, to = yield (skip_one, sleep(3))
        if to:
            yield '%s timeout!\n' % time.asctime()
            yield 'you said %s' % inp

app = Application()
app.add_service(Service(WebSocket(hi_server), 8000))
Websocket for Diesel.
from diesel import until
BEGINFRAME, ENDFRAME = '\x00', '\xff'
HTTP/1.1 101 Web Socket Protocol Handshake\r\n\
Upgrade: WebSocket\r\n\
Connection: Upgrade\r\n'''
class WebSocketError(Exception): pass
class Request(object):
'''WebSocket request.
def __init__(self, method, path, version):
self.method, self.path, self.version = method, path, version
def parse_headers(self, headers):
self.headers = dict([
line.split(': ')
for line in headers.split('\r\n')
class WebSocket(object):
'''WebSocket service.
def __init__(self, app): = app()
def __call__(self, addr):
'''WebSocket handler.
request_line = yield until('\r\n')
headers = yield until('\r\n\r\n')
req = Request(*request_line.strip().split(' '))
origin = 'WebSocket-Origin: %s' % req.headers['Origin']
loc = 'WebSocket-Location: ws://%s' % req.headers['Host'] + req.path
yield '%s\r\n%s\r\n\r\n' % (origin, loc)
while True:
start, out = yield (bytes(1),
if start:
if not start == BEGINFRAME:
raise WebSocketError()
incoming = yield (until(ENDFRAME))
yield out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.