Friday, March 03, 2006

Line Filtering With Twisted

Unlike POE, the Twisted framework does not allow for generic stream filters.

This means that in order to pre- or post-process certain data-streams, one cannot reuse processing code used by other classes. I stumbled across this while trying to parse the output of a running process.

Take the following code for example. It listens on a socket, and when a connection is received, it spawns a process and redirects its output to the socket.
# test1.py
from twisted.internet import protocol, reactor
import sys

class TestProcessProtocol( protocol.ProcessProtocol ):
def outReceived( self, data ):
self.lineReceived( data )

def lineReceived( self, line ):
"""Override This"""

class AdminServerProtocol( protocol.Protocol ):
def connectionMade( self ):
p = TestProcessProtocol()
p.lineReceived = self.lineReceived
p.outConnectionLost = self.transport.loseConnection
reactor.spawnProcess( p, "./sample_module.py", ["sample_module.py"] )

def lineReceived( self, line ):
self.transport.write( "GOT LINE: " + line )

f = protocol.Factory()
f.protocol = AdminServerProtocol

reactor.listenTCP( 1079, f )
reactor.run()

Here's the spawned program.
# sample_module.py

import sys

for y in range(5):
print "line 1"
print "line 2"
print "line 3"
print "line 4"

# Flush STDOUT. Added for clarity.
sys.stdout.flush()

Let's see what happens when I run test1.py and connect to it.
$ telnet localhost 1079
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
GOT LINE: line 1
line 2
line 3
line 4
GOT LINE: line 1
line 2
line 3
line 4
GOT LINE: line 1
line 2
line 3
line 4
GOT LINE: line 1
line 2
line 3
line 4
GOT LINE: line 1
line 2
line 3
line 4
Connection closed by foreign host.

Hmmm... not quite what we expected. It would be nice to receive the data line-by-line instead of the whole buffer when it's flushed. Now, we could modify the spawned process to flush at every line, but sometimes we don't have that luxury. Also, if the process was rapidly spewing out data, or very long lines, we could end-up with partial (incomplete) lines.

Unlike POE, we can't just apply a filter to the stream and let loose. And unfortunately, protocol.LineReceiver can't help use because it only works with sockets.

The solution would be to subclass ProcessProtocol, and use that as our parent class.
from twisted.internet import protocol, reactor
import sys

class LineProcessProtocol( protocol.ProcessProtocol ):
from os import linesep as newline
__out_line_buffer = ""

def outReceived( self, data ):
# Fill buffer
self.__out_line_buffer += data

# Split lines
lines = self.__out_line_buffer.splitlines()

if not data.endswith( self.newline ):
self.__out_line_buffer = lines.pop()
else:
self.__out_line_buffer = ""

for line in lines:
self.lineReceived( line )

def lineReceived( self, line ):
"""Override This"""

def writeLine( self, data ):
self.transport.write( data + self.newline )


class TestProcessProtocol( LineProcessProtocol ):
pass

class AdminServerProtocol( protocol.Protocol ):
def connectionMade( self ):
p = TestProcessProtocol()
p.lineReceived = self.lineReceived
p.outConnectionLost = self.transport.loseConnection
reactor.spawnProcess( p, "./sample_module.py", ["sample_module.py"] )

def lineReceived( self, line ):
self.transport.write( "GOT LINE: " + line + "\n")

f = protocol.Factory()
f.protocol = AdminServerProtocol

reactor.listenTCP( 1079, f )
reactor.run()

Above, we inheret LineProcessProtocol from ProcessProtocol, and create a string-buffer. Everytime data is received, we fill the string buffer and split up the lines. We then feed any complete lines to lineReceived, while leaving only incomplete lines in the string buffer.

Let's see what we get.
$ telnet localhost 1079
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
GOT LINE: line 1
GOT LINE: line 2
GOT LINE: line 3
GOT LINE: line 4
GOT LINE: line 1
GOT LINE: line 2
GOT LINE: line 3
GOT LINE: line 4
GOT LINE: line 1
GOT LINE: line 2
GOT LINE: line 3
GOT LINE: line 4
GOT LINE: line 1
GOT LINE: line 2
GOT LINE: line 3
GOT LINE: line 4
GOT LINE: line 1
GOT LINE: line 2
GOT LINE: line 3
GOT LINE: line 4
Connection closed by foreign host.

Perfect! We can now use our LineProcessProtocol instead of the stock ProcessProtocol to parse process output.

Now the question is, how easy is it to use Twisted's component architecture to create an interface (ILineReceiver), and adapters for the various classes?

Or better still, how easy would it be to implement a generic stream framework within Twisted, making all this unnecessary, while still enjoying some POE goodness?

3 comments:

  1. good post!

    i too have lately moved to python-twisted from POE. while i really miss some poe features, the elegance of python outweighs it.

    ReplyDelete
  2. Thanks for this. I will definately be using it.

    ReplyDelete
  3. Useful code. Thanks.

    ReplyDelete