Erik Gorset's Blog
→ Is radix sort faster than quicksort for integer arrays?
← The Go Chaining example written in python for the stock cpython interpreter
Building a comet enabled http server in python using BaseHTTPServer and coroutines
In my last post I demonstrated how greenlet can be used to implement the go chaining example. Here's an implementation of a scheduler/Channel library which can be used to implement servers using coroutines and asynchronous IO. It provides a SocketServer mix-in which can be combined with BaseHTTPServer to implement a comet enabled server which can support a high number of concurrent connections.
To demonstrate the capabilities of the library, an example of a handler for BaseHTTPServer is shown to tackle the c10k problem, and to be an excellent comet enabled server.
Please note that there's nothing strange with the handler implementation. By providing a thread or fork compatible implementation of Channel, it should be possible to run it with the builtin forking or threading SocketServer mixins.
import BaseHTTPServer class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler): waiters = [] # global list we use to track all clients waiting to be notified def do_GET(self): # send headers self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() if self.path == '/wait': # wait for result c = Channel() self.waiters.append(c) n = c.read() elif self.path == '/notify': # notify everyone n = len(self.waiters) for i in self.waiters: i.write(n) del self.waiters[:] else: # current number of waiters n = len(self.waiters) self.wfile.write('%s' % n) def log_message(self, *args, **vargs): pass # mute log messages
Then we need a mix-in to handle scheduler/channel activities. This is where we put the magic, and is comperable to ThreadingMixIn and ForkingMixIn.
class ScheduledMixIn: "Mix-in class to handle each request in a new coroutine" def process_request(self, request, client_address): # the BaseHTTPServer framework uses only the "file protocol" for a file # descriptors, so we put the request in an object which will wrap all # IO calls using kqueue/epoll and schedule/Channel. request = ScheduledFile.fromSocket(request) @go def runner(): self.finish_request(request, client_address) self.close_request(request) def handle_request(self): return self._handle_request_noblock() def serve_forever(self): while True: self.handle_request() def server_activate(self): self.socket.listen(self.request_queue_size) self.socket.setblocking(False) self.acceptStream = Channel() # use a channel for new connections def runner(n, eof): for i in xrange(n): # kqueue will provide the number of connections waiting try: client = self.socket.accept() except socket.error, e: if e.errno == errno.EAGAIN: # either epoll, a kernel bug or a race condition break # FIXME: more error handling? raise self.acceptStream.write(client) if not eof: return runner _goRead(self.socket.fileno(), runner) def get_request(self): return self.acceptStream.read()
To test this we will first start the server, create N clients that will connect and wait, then finally connect with a client that notifies everyone. At the same time we will continuously connect a client to get the status.
def testScheduledServer(n): "test http server with n clients" class ScheduledHTTPServer(ScheduledMixIn, BaseHTTPServer.HTTPServer): pass # start web server at a random port httpd = ScheduledHTTPServer(('', 0), TestHandler) address = httpd.server_name, httpd.server_port go(httpd.serve_forever) def httpClientGet(client, path): "super simple http client" try: client.write('GET %s HTTP/1.0\r\n\r\n' % path) data = ''.join(client) pos = data.find('\r\n\r\n') return data[:pos], data[pos+4:] finally: client.close() # spin up a few clients for i in xrange(n): def runner(client): header, body = httpClientGet(client, '/wait') assert int(body) == n go(partial(runner, ScheduledFile.connectTcp(address))) # wait until all clients are ready count = 0 while count != n: header, body = httpClientGet(ScheduledFile.connectTcp(address), '/') count = int(body) # notify everyone header, body = httpClientGet(ScheduledFile.connectTcp(address), '/notify') assert int(body) == n # wait for everyone to finish count = -1 while count: header, body = httpClientGet(ScheduledFile.connectTcp(address), '/') count = int(body)
Example run of testScheduledServer on a mbp 13" 2.53 GHz:
% python naglfar/core.py 10000 done 10000 Time spent in user mode (CPU seconds) : 10.567s Time spent in kernel mode (CPU seconds) : 3.344s Total time : 0:15.67s CPU utilisation (percentage) : 88.7%
Even though it's only 10k clients, they are all running in the same process/thread as the server, which makes it 20k sockets. The amount of time spent in user mode vs kernel mode tells us that python makes up for about 75% of the cpu usage. Clearly room for improvements.
The next step would perhaps be to look into optimizing BaseHTTPServer. A faster language or implementation should make it possible to get closer to the 3.344s theoretical limit using the same design. It's also likely that the kernel itself could be better optimized for this kind of work. The only tuning performed was increasing the maximum number of descriptors.
The rest of the implementation can be found at github
Posted 2009-12-03, by Erik Gorset, Colombo, Sri Lanka.