Erik Gorset's Blog
→ Is radix sort faster than quicksort for integer arrays?
← The Go Chaining example written in python for the stock cpython interpreter
Building a comet enabled http server in python using BaseHTTPServer and coroutines
In my last post I demonstrated how greenlet can be used to implement the go chaining example. Here's an implementation of a scheduler/Channel library which can be used to implement servers using coroutines and asynchronous IO. It provides a SocketServer mix-in which can be combined with BaseHTTPServer to implement a comet enabled server which can support a high number of concurrent connections.
To demonstrate the capabilities of the library, an example of a handler for BaseHTTPServer is shown to tackle the c10k problem, and to be an excellent comet enabled server.
Please note that there's nothing strange with the handler implementation. By providing a thread or fork compatible implementation of Channel, it should be possible to run it with the builtin forking or threading SocketServer mixins.
import BaseHTTPServer
class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
waiters = [] # global list we use to track all clients waiting to be notified
def do_GET(self):
# send headers
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
if self.path == '/wait': # wait for result
c = Channel()
self.waiters.append(c)
n = c.read()
elif self.path == '/notify': # notify everyone
n = len(self.waiters)
for i in self.waiters:
i.write(n)
del self.waiters[:]
else: # current number of waiters
n = len(self.waiters)
self.wfile.write('%s' % n)
def log_message(self, *args, **vargs):
pass # mute log messages
Then we need a mix-in to handle scheduler/channel activities. This is where we put the magic, and is comperable to ThreadingMixIn and ForkingMixIn.
class ScheduledMixIn:
"Mix-in class to handle each request in a new coroutine"
def process_request(self, request, client_address):
# the BaseHTTPServer framework uses only the "file protocol" for a file
# descriptors, so we put the request in an object which will wrap all
# IO calls using kqueue/epoll and schedule/Channel.
request = ScheduledFile.fromSocket(request)
@go
def runner():
self.finish_request(request, client_address)
self.close_request(request)
def handle_request(self):
return self._handle_request_noblock()
def serve_forever(self):
while True:
self.handle_request()
def server_activate(self):
self.socket.listen(self.request_queue_size)
self.socket.setblocking(False)
self.acceptStream = Channel() # use a channel for new connections
def runner(n, eof):
for i in xrange(n): # kqueue will provide the number of connections waiting
try:
client = self.socket.accept()
except socket.error, e:
if e.errno == errno.EAGAIN: # either epoll, a kernel bug or a race condition
break
# FIXME: more error handling?
raise
self.acceptStream.write(client)
if not eof:
return runner
_goRead(self.socket.fileno(), runner)
def get_request(self):
return self.acceptStream.read()
To test this we will first start the server, create N clients that will connect and wait, then finally connect with a client that notifies everyone. At the same time we will continuously connect a client to get the status.
def testScheduledServer(n):
"test http server with n clients"
class ScheduledHTTPServer(ScheduledMixIn, BaseHTTPServer.HTTPServer):
pass
# start web server at a random port
httpd = ScheduledHTTPServer(('', 0), TestHandler)
address = httpd.server_name, httpd.server_port
go(httpd.serve_forever)
def httpClientGet(client, path):
"super simple http client"
try:
client.write('GET %s HTTP/1.0\r\n\r\n' % path)
data = ''.join(client)
pos = data.find('\r\n\r\n')
return data[:pos], data[pos+4:]
finally:
client.close()
# spin up a few clients
for i in xrange(n):
def runner(client):
header, body = httpClientGet(client, '/wait')
assert int(body) == n
go(partial(runner, ScheduledFile.connectTcp(address)))
# wait until all clients are ready
count = 0
while count != n:
header, body = httpClientGet(ScheduledFile.connectTcp(address), '/')
count = int(body)
# notify everyone
header, body = httpClientGet(ScheduledFile.connectTcp(address), '/notify')
assert int(body) == n
# wait for everyone to finish
count = -1
while count:
header, body = httpClientGet(ScheduledFile.connectTcp(address), '/')
count = int(body)
Example run of testScheduledServer on a mbp 13" 2.53 GHz:
% python naglfar/core.py 10000
done 10000
Time spent in user mode (CPU seconds) : 10.567s
Time spent in kernel mode (CPU seconds) : 3.344s
Total time : 0:15.67s
CPU utilisation (percentage) : 88.7%
Even though it's only 10k clients, they are all running in the same process/thread as the server, which makes it 20k sockets. The amount of time spent in user mode vs kernel mode tells us that python makes up for about 75% of the cpu usage. Clearly room for improvements.
The next step would perhaps be to look into optimizing BaseHTTPServer. A faster language or implementation should make it possible to get closer to the 3.344s theoretical limit using the same design. It's also likely that the kernel itself could be better optimized for this kind of work. The only tuning performed was increasing the maximum number of descriptors.
The rest of the implementation can be found at github
Posted 2009-12-03, by Erik Gorset, Colombo, Sri Lanka.