Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:06
Show Gist options
  • Save val314159/067edcf3039ad202c5d8 to your computer and use it in GitHub Desktop.
Save val314159/067edcf3039ad202c5d8 to your computer and use it in GitHub Desktop.
source .v/bin/activate
run_qsvr () { python -u; }
run () { python -u; }
launch_qsvr () { python -u 1>LOG 2>&1 &}
launch () { python -u 1>LOG 2>&1 &}
unlaunch () { killall -9 python; }
cln () { rm -fr .? *.pyc *~ 2>/dev/null; }
if [ -e v ];then echo skipping; else virtualenv .v; fi
pip install gevent
pip install leveldb
#pip install bottle leveldb gevent
Queue Server
from gevent import sleep,monkey; monkey.patch_all()
from gevent.queue import Queue,Empty
from os import environ as E
import os,sys,bottle,leveldb,json,time
from cors import add_headers
def remote_port(): return bottle.request.environ['REMOTE_PORT']
def close_connect():
print "XX CLOSE CONNECT", remote_port()
q = RemotePortToQ.pop(remote_port())
print "QAZ close X zzzzz XXX", remote_port(), id(q)
# clean up
del QM[str(id(q))]
return True
# monkey patch it!
import gevent.pywsgi
_old_sendall = gevent.pywsgi.WSGIHandler._sendall
def _new_sendall(*a,**kw):
try: return _old_sendall(*a,**kw)
except: return close_connect()
gevent.pywsgi.WSGIHandler._sendall = _new_sendall
def pushseq(_,x): _.append(x); return x
def DB(_=[]): return(_[0] if _ else pushseq( _, leveldb.LevelDB('.q') ))
def MsgIter(ch,k0,kn): return DB().RangeIter(ch+'.'+k0,ch+'.'+kn)
def _put_msg(channel,msg):
DB().Put('%s.%g' % (channel,time.time()), msg) ; return msg
def since(channel,when):
return dict(results=[ (k,DB().Get(k)) for k,v in MsgIter(ch,'',since)])
def zap(channel,until):
return dict(len=len([DB().Delete(k) for k,v in MsgIter(ch,until,'~')]))
def watch():return open('watch.html')
def send(**kw):
QM[ kw.pop('to_whom') ].put(kw)
return '{}'
def send_all(**kw):
for k,q in QM.iteritems(): q.put(kw)
return dict(len=len(QM))
def who(): return dict(results=QM.keys())
def stream2():
myid = str(id(q))
QM[myid] = q
RemotePortToQ[remote_port()] = q
print "QAZ open X b XXX", remote_port(), id(q)
bottle.response.content_type = 'text/event-stream'
bottle.response.cache_control = 'no-cache'
# Set client-side auto-reconnect timeout, ms.
yield 'retry: 100\n\ndata: hello '+myid+'\n\n'
while 1:
arr = []
data = q.get()
if 'id' in data: arr.append('id: %s' % data['id'])
if 'event' in data: arr.append('event: %s' % data['event'])
dat = data['data']
if type(dat)!=list: arr.append('data: '+json.dumps(dat))
for x in dat: arr.append('data: '+json.dumps(x))
yield '\n'.join(arr)
if __name__=='__main__''gevent',port=E['PORT'],debug=True)
<!DOCTYPE html>
<html><head><meta charset="UTF-8" />
<script src=" "></script>
$(document).ready(function() {
var es = new EventSource("stream");
es.onmessage = function (e) {
+ "<p>Event: "+e.lastEventId+"::"+e.type+", data: ""</p>");
es.addEventListener('xmessage', function(e){
+ "<p>Event: "+e.lastEventId+"::"+e.type+", data: ""</p>");
}, false);
<div id="log" style="font-family: courier; font-size: 0.75em;"></div>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment