Asynchronous ZPublisher stream iterator
import StringIO
import threading
from zope.interface import implements
from ZPublisher.Iterators import IStreamIterator
from ZServer.PubCore.ZEvent import Wakeup
from zope.globalrequest import getRequest
class zhttp_channel_async_wrapper(object):
"""Medusa channel wrapper to defer producers until released"""
def __init__(self, channel):
# (executed within the current Zope worker thread)
self._channel = channel
self._mutex = threading.Lock()
self._deferred = []
self._released = False
self._content_length = 0
def _push(self, producer, send=1):
if (isinstance(producer, str)
and producer.startswith('HTTP/1.1 200 OK')):
# Fix Content-Length to match the real content length
# (an alternative would be to use chunked encoding)
producer = producer.replace(
'Content-Length: 0\r\n',
'Content-Length: {0:s}\r\n'.format(str(self._content_length))
self._channel.push(producer, send)
def push(self, producer, send=1):
# (executed within the current Zope worker thread)
with self._mutex:
if not self._released:
self._deferred.append((producer, send))
self._push(producer, send)
def release(self, content_length):
# (executed within the exclusive async thread)
self._content_length = content_length
with self._mutex:
for producer, send in self._deferred:
self._push(producer, send)
self._released = True
Wakeup() # wake up the asyncore loop to read our results
def __getattr__(self, key):
return getattr(self._channel, key)
class AsyncWorkerStreamIterator(StringIO.StringIO):
"""Stream iterator to publish the results of the given func"""
def __init__(self, func, response, streamsize=1 << 16):
# (executed within the current Zope worker thread)
# Init buffer
self._streamsize = streamsize
# Wrap the Medusa channel to wait for the func results
self._channel = response.stdout._channel
self._wrapped_channel = zhttp_channel_async_wrapper(self._channel)
response.stdout._channel = self._wrapped_channel
# Set content-length as required by ZPublisher
response.setHeader('content-length', '0')
# Fire the given func in a separate thread
self.thread = threading.Thread(target=func, args=(self.callback,))
def callback(self, data):
# (executed within the exclusive async thread)
def next(self):
# (executed within the main thread)
if not self.closed:
data =
if not data:
return data
raise StopIteration
def __len__(self):
return len(self.getvalue())
def slow_ok_worker(callback):
# (executed within the exclusive async thread)
import time
def slow_ok():
"""The publishable example method"""
# (executed within the current Zope worker thread)
request = getRequest()
return AsyncWorkerStreamIterator(slow_ok_worker, request.response)
