Skip to content

Instantly share code, notes, and snippets.

@zhangchunlin
Created July 31, 2015 13:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhangchunlin/c73bcee92b86e0e0f853 to your computer and use it in GitHub Desktop.
Save zhangchunlin/c73bcee92b86e0e0f853 to your computer and use it in GitHub Desktop.
uliweb gevent long polling test
#coding=utf-8
from uliweb import expose, functions
import gevent
from gevent.queue import Queue, Empty
import json as json_
data_source = Queue()
def producer():
while True:
data_source.put_nowait('Hello World')
gevent.sleep(1)
#print "producer output"
g_producer = None
@expose('/')
def index():
global g_producer
if not g_producer:
g_producer = gevent.spawn(producer)
response.content_type = 'application/json'
while True:
try:
datum = data_source.get(timeout=5)
#print "consumer got input"
yield json_.dumps(datum) + '\n'
except Empty:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment