Skip to content

Instantly share code, notes, and snippets.

@exarkun
Last active December 19, 2015 02:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save exarkun/5884100 to your computer and use it in GitHub Desktop.
Save exarkun/5884100 to your computer and use it in GitHub Desktop.
Run `nc -l -p 8181`.
Then run `python chunkypushproducer.py tcp:localhost:port=8181`.
Depending on your platform/hardware, you may need to tweak the
multipliers or sleep interval in `chunkyWorkload` to trigger
interesting behavior. These values seem to do a pretty good
job on my system, though. The expected output is something like:
Workload paused
Workload resumed
Workload paused
Workload resumed
Workload paused
Workload resumed
Workload paused
Workload resumed
While the nc window fills up with massive spam. This demonstrates
that producers are paused and resumed based on available send buffer
space, even if they are not continuously writing data (as this example
demonstrates, writes may be scheduled on a time-based system; the
producer will still be paused and resumed).
if __name__ == '__main__':
import sys
import chunkypushproducer
raise SystemExit(chunkypushproducer.main(sys.argv))
from zope.interface import implementer
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import react, deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import clientFromString, connectProtocol
def chunkyWorkload(reactor, protocol):
while True:
protocol.send(b"Hello world " * 1024 * 10)
yield deferLater(reactor, 2, lambda: None)
@implementer(IPushProducer)
class WorkloadProtocol(Protocol):
def __init__(self, workload):
self.workload = workload
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = self.workload(self.transport.reactor, self)
self.task = cooperate(gen)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
self.task.stop()
def connect(reactor, endpointDescription):
endpoint = clientFromString(reactor, endpointDescription)
protocol = WorkloadProtocol(chunkyWorkload)
d = connectProtocol(endpoint, protocol)
d.addCallback(lambda proto: proto.task.whenDone())
return d
def main(argv):
return react(connect, argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment