2009-12-03

Building a comet enabled http server in python using BaseHTTPServer and coroutines

In my last post I demonstrated how greenlet can be used to implement the go chaining example. Here's an implementation of a scheduler/Channel library which can be used to implement servers using coroutines and asynchronous IO. It provides a SocketServer mix-in which can be combined with BaseHTTPServer to implement a comet enabled server which can support a high number of concurrent connections.

To demonstrate the capabilities of the library, an example of a handler for BaseHTTPServer is shown to tackle the c10k problem, and to be an excellent comet enabled server.

Please note that there's nothing strange with the handler implementation. By providing a thread or fork compatible implementation of Channel, it should be possible to run it with the builtin forking or threading SocketServer mixins.
import BaseHTTPServer

class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    waiters = [] # global list we use to track all clients waiting to be notified
    def do_GET(self):
        # send headers
        self.send_response(200)
        self.send_header('Content-type', 'text/plain')
        self.end_headers()

        if self.path == '/wait': # wait for result
            c = Channel()
            self.waiters.append(c)
            n = c.read()

        elif self.path == '/notify': # notify everyone
            n = len(self.waiters)
            for i in self.waiters:
                i.write(n)
            del self.waiters[:]

        else: # current number of waiters
            n = len(self.waiters)

        self.wfile.write('%s' % n)

    def log_message(self, *args, **vargs):
        pass # mute log messages
Then we need a mix-in to handle scheduler/channel activities. This is where we put the magic, and is comperable to ThreadingMixIn and ForkingMixIn.
class ScheduledMixIn:
    "Mix-in class to handle each request in a new coroutine"

    def process_request(self, request, client_address):
        # the BaseHTTPServer framework uses only the "file protocol" for a file
        # descriptors, so we put the request in an object which will wrap all
        # IO calls using kqueue/epoll and schedule/Channel.
        request = DummySocket(ScheduledFile.fromSocket(request))
        @go
        def runner():
            self.finish_request(request, client_address)
            self.close_request(request)

    def handle_request(self):
        return self._handle_request_noblock()

    def serve_forever(self):
        while True:
            self.handle_request()

    def server_activate(self):
        self.socket.listen(self.request_queue_size)
        self.socket.setblocking(False)
        self.acceptStream = Channel() # use a channel for new connections
        def runner(n, eof):
            for i in xrange(n): # kqueue will provide the number of connections waiting
                try:
                    client = self.socket.accept()
                except socket.error, e:
                    if e.errno == errno.EAGAIN: # either epoll, a kernel bug or a race condition
                        break
                self.acceptStream.write(client)
            if not eof:
                return runner
        _goRead(self.socket.fileno(), runner)

    def get_request(self):
        return self.acceptStream.read()
To test this we will first start the server, create N clients that will connect and wait, then finally connect with a client that notifies everyone. At the same time we will continuously connect a client to get the status.
def testScheduledServer(n):
    "test http server with n clients"
    class ScheduledHTTPServer(ScheduledMixIn, BaseHTTPServer.HTTPServer):
        pass
    # start web server at a random port
    httpd = ScheduledHTTPServer(('', 0), TestHandler)
    address = httpd.server_name, httpd.server_port
    go(httpd.serve_forever)

    def httpClientGet(client, path):
        "super simple http client"
        try:
            client.write('GET %s HTTP/1.0\r\n\r\n' % path)

            data = ''.join(client)
            pos = data.find('\r\n\r\n')
            return data[:pos], data[pos+4:]
        finally:
            client.close()

    # spin up a few clients
    for i in xrange(n):
        def runner(client):
            header, body = httpClientGet(client, '/wait')
            assert int(body) == n
        go(partial(runner, ScheduledFile.connectTcp(address)))

    # wait until all clients are ready
    count = 0
    while count != n:
        header, body = httpClientGet(ScheduledFile.connectTcp(address), '/')
        count = int(body)

    # notify everyone
    header, body = httpClientGet(ScheduledFile.connectTcp(address), '/notify')
    assert int(body) == n

    # wait for everyone to finish
    count = -1
    while count:
        header, body = httpClientGet(ScheduledFile.connectTcp(address), '/')
        count = int(body)
Example run of testScheduledServer on a mbp 13" 2.53 GHz:
    % python naglfar/core.py 10000
    done 10000

    Time spent in user mode   (CPU seconds) : 10.567s
    Time spent in kernel mode (CPU seconds) : 3.344s
    Total time                              : 0:15.67s
    CPU utilisation (percentage)            : 88.7%
Even though it's only 10k clients, they are all running in the same process/thread as the server, which makes it 20k sockets. The amount of time spent in user mode vs kernel mode tells us that python makes up for about 75% of the cpu usage. Clearly room for improvements.

The next step would perhaps be to look into optimizing BaseHTTPServer. A faster language or implementation should make it possible to get closer to the 3.344s theoretical limit using the same design. It's also likely that the kernel itself could be better optimized for this kind of work. The only tuning performed was increasing the maximum number of descriptors.

The rest of the implementation can be found at github

2009-11-16

The Go Chaining example written in python for the stock cpython interpreter

"""The Go Chaining example written in Python for the stock cpython interpreter

There's a lot of talk these days about the new programming language Go from the plan9 guys at Google. While I think it's great that a "system" language will get good concurrency support, I find it strange that they seem to ignore prior art. The concurrency model looks a lot like what you find in Erlang, but with some differences in how they do channels and error handling.

So what about Python? Stackless Python is shown to outperform the chaining example from the tech talk at slide 34 by using tasklets and simple channels. Let's try to see what we can do for the standard CPython implementation.
"""
def main(n=100000):
    def runner(left, right):
        left.write(right.read() + 1)

    leftmost = left = Channel()
    for i in xrange(n):
        right = Channel()
        go(runner, left, right)
        left = right

    right.write(0)
    print leftmost.read()
"""
Then we need an implementation of something that can act like goroutines and channels. For a long time I've personally been using greenlet from the pypy project in various hobby projects. I'll provide a cut down version of a Channel implementation which should do the job for this example.

Greenlet is a coroutine implmentation, meaning threads/routines don't automatically yield to others; switching context must be done explicitly. While many may see this a disadvantage, I see it as an advantage that forces you to program in a reactive way. In my experience this fits perfectly with channels and the Actor model.
"""
from greenlet import greenlet, getcurrent
from functools import partial
from collections import deque

# This is just a job queue which we routinely pop to do more work. There's no
# "switch thread after N time" mecanism, so each job needs to behave.
queue = deque()
def go(callable, *args, **vargs):
    "create a new coroutine for callable(*args, **vargs)"
    g = greenlet(partial(callable, *args, **vargs), scheduler) # scheduler must be parent
    queue.append(g.switch)

def scheduler():
    while queue:
        queue.popleft()()
scheduler = greenlet(scheduler)

class Channel(object):
    "A asynchronous channel"
    def __init__(self):
        self.q = deque()
        self.waiting = []

    def write(self, msg):
        "write to the channel"
        self.q.append(msg)
        # notify everyone
        queue.extend(self.waiting)
        self.waiting = []

    def read(self):
        "read from the channel, blocking if it's empty"
        while not self.q:
            # block until we have data
            self.waiting.append(getcurrent().switch)
            scheduler.switch()
        return self.q.popleft()

if __name__ == "__main__":
    main()
"""
Example run on my computer (mbp 13" 2.53GHz)
chaining.go:
% time sh -c "6g chaining.go && 6l chaining.6 && ./6.out"
100000

Time spent in user mode   (CPU seconds) : 0.760s
Time spent in kernel mode (CPU seconds) : 0.534s
Total time                              : 0:01.31s
CPU utilisation (percentage)            : 98.4%
Times the process was swapped           : 0
Times of major page faults              : 0
Times of minor page faults              : 0

chaining.py:
% time python chaining.py
100000

Time spent in user mode   (CPU seconds) : 3.761s
Time spent in kernel mode (CPU seconds) : 0.830s
Total time                              : 0:04.61s
CPU utilisation (percentage)            : 99.5%
Times the process was swapped           : 0
Times of major page faults              : 0
Times of minor page faults              : 0
So this is in no way a real benchmark (I even include the compilation time for go), but it shows that the almost naively simple and old Python implementation is not completely off target. I kind of expected the difference to be much larger. Furhter more I've added no special syntax, done no customization to the standard distribution, only added one c-extension module, and created a full asynchronous Channel implementation on top of it all.

CPython is still in the game!
"""