Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist
View chunkypushproducer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
if __name__ == '__main__':
import sys
import chunkypushproducer
raise SystemExit(chunkypushproducer.main(sys.argv))
 
from zope.interface import implementer
 
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import react, deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import clientFromString, connectProtocol
 
def chunkyWorkload(reactor, protocol):
while True:
protocol.send(b"Hello world " * 1024 * 10)
yield deferLater(reactor, 2, lambda: None)
 
 
 
@implementer(IPushProducer)
class WorkloadProtocol(Protocol):
def __init__(self, workload):
self.workload = workload
 
 
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = self.workload(self.transport.reactor, self)
self.task = cooperate(gen)
 
 
def send(self, data):
self.transport.write(data)
 
 
 
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
 
 
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
 
 
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
 
def connectionLost(self, reason):
print 'Connection lost'
self.task.stop()
 
 
def connect(reactor, endpointDescription):
endpoint = clientFromString(reactor, endpointDescription)
protocol = WorkloadProtocol(chunkyWorkload)
d = connectProtocol(endpoint, protocol)
d.addCallback(lambda proto: proto.task.whenDone())
return d
 
 
 
def main(argv):
return react(connect, argv[1:])
View chunkypushproducer.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
Run `nc -l -p 8181`.
 
Then run `python chunkypushproducer.py tcp:localhost:port=8181`.
 
Depending on your platform/hardware, you may need to tweak the
multipliers or sleep interval in `chunkyWorkload` to trigger
interesting behavior. These values seem to do a pretty good
job on my system, though. The expected output is something like:
 
Workload paused
Workload resumed
Workload paused
Workload resumed
Workload paused
Workload resumed
Workload paused
Workload resumed
 
While the nc window fills up with massive spam. This demonstrates
that producers are paused and resumed based on available send buffer
space, even if they are not continuously writing data (as this example
demonstrates, writes may be scheduled on a time-based system; the
producer will still be paused and resumed).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.