Benjamin Rutt | 15 Dec 02:54 2009

complete producer/consumer example

How does the below example look as a complete producer/consumer example?  If it's well received, perhaps we can add it to the online documentation at in the "Further Reading" section?  I always felt that the producer/consumer (a.k.a. high volume streaming) docs lacked a real example that users could download and run.
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""
import random
from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
class Producer:
    """Send back the requested number of random integers to the client."""
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
    def stopProducing(self):
class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
Use on the client:
$ telnet localhost 1234
Connected to localhost.
Escape character is '^]'.
how many random integers do you want?
Connection closed by foreign host.
Use on the server (observe how the server pauses production sometimes - this happens when a large data set is requested by the client):
$ ./
listening on 1234...
connection made from IPv4Address(TCP, '', 54859)
connection lost from IPv4Address(TCP, '', 54859)
connection made from IPv4Address(TCP, '', 54864)
pausing connection from IPv4Address(TCP, '', 54864)
pausing connection from IPv4Address(TCP, '', 54864)
pausing connection from IPv4Address(TCP, '', 54864)
pausing connection from IPv4Address(TCP, '', 54864)
connection lost from IPv4Address(TCP, '', 54864)

Benjamin Rutt
Twisted-Python mailing list
Twisted-Python <at>