Implementing HTTP Live Streaming

2009-07-13

Apple's recently released HTTP Live Streaming provides a clear, concise method for streaming audio and video content to browsers and mobile devices without the need for proprietary formats or browser plugins (I'm looking at you, Adobe). Instead, Apple's spec relies upon already common and relatively open formats.

The reference for implementing this functionality is Apple's HTTP Live Streaming Overview. Please read this document before continuing. I'll wait.

Right, so the jist is that you need to serve an m3u8 playlist containing metadata and a pointer to a URL containing a valid MPEG2 transport stream. As my preferred language is Python and the general convention for creating web services in Python is to use WSGI (PEP 333) we'll start with a basic WSGI application.

from flup.server.fcgi import WSGIServer
from threading import Thread
from socket import socket
from select import select
from Queue import Queue
import re

class LiveHTTPServer(object):
	def __init__(self):
		self.urls = [
			('^/stream.m3u8$', self.playlist),
			('^/stream.ts$', self.stream),
		]
		self.urls = [(re.compile(pattern), func) for pattern, func in urls]
		self.queues = []

	def __call__(self, environ, start_response):
		for pattern, func in self.urls:
			match = pattern.match(environ['PATH_INFO'])
			if match:
				return func(start_response, match)
		start_response('404 Not Found', [('Content-type', 'text/plain')])
		return ['404 Not Found']

So, there's not too much magic here. When the server object is instantiated, it compiles a list of regular expressions and maps them to instance methods. The WSGI server will call __call__ which attempts to match each of the regexes on the path of the request, calling the associated method if matched or sending a 404 response if not. Note that Ian Bicking's WebOb library is a much simpler way to perform a lot of these tasks, but doesn't provide an easy way to send chunked responses, which are necessary for the MPEG2 transport stream. Just ignore all the extra imports for now, we'll get to them in a minute.

	def playlist(self, start_response, match):
		start_response('200 OK', [('Content-type', 'application/x-mpegURL')])
		return ['''#EXTM3U
#EXTINF:10,
http://video.example.org/stream.ts
#EXT-X-ENDLIST''']

This method implementes the /stream.m3u8 response as required my Apple's spec. The M3U standard says that the EXTINF attribute should have a value of -1 for ongoing streams or those of unknown length, but the iPhone rejected the playlist given anything other than a positive integer in this field.

	def stream(self, start_response, match):
		start_response('200 OK', [('Content-type', 'video/MP2T')])
		q = Queue()
		self.queues.append(q)
		while True:
			try:
				yield q.get()
			except:
				if q in self.queues:
					self.queues.remove(q)
				return

This is where the tricky part actually happens. We create a Queue that will be filled with the MPEG2 data from another thread and start blocking on it, passing the data as a chunked response as soon as it's available. If anything goes wrong (eg. client disconnect) then we remove this stream's queue from the list and return. If this server were to handle a large number of clients, we might want to set a max queue size to avoid filling up memory with data destined for a slow or unresponsive client. It might also be useful to perform some locking on the queues list, to avoid contention between threads. I'll leave that as an exercise for the reader.

def input_loop(app):
	sock = socket()
	sock.bind(('', 9999))
	sock.listen(1)
	while True:
		print 'Waiting for input stream'
		sd, addr = sock.accept()
		print 'Accepted input stream from', addr
		data = True
		while data:
			readable = select([sd], [], [], 0.1)[0]
			for s in readable:
				data = s.recv(1024)
				if not data:
					break
				for q in app.queues:
					q.put(data)
		print 'Lost input stream from', addr

This method serves as the feeder for all of the client queues. It listens for a single connection on port 9999 and puts any received data into all available client queues. If the feeder stream is lost, it will go back to waiting for a new connection.

if __name__ == '__main__':
	app = LiveHTTPServer()
	server = WSGIServer(app, bindAddress=('', 9998))

	t1 = Thread(target=input_loop, args=[app])
	t1.setDaemon(True)
	t1.start()

	server.run()

Finally we tie it all together by instantiating the WSGI application and server and starting a separate thread for the input loop. The flup.server.fcgi.WSGIServer class will act as a FastCGI server that can act as a backend for any number of web servers. If you'd rather not use FastCGI, you should be able to drop in any other WSGI server as long as it supports multiple concurrent requests, otherwise any client after the first will just block waiting for the transport stream.

For my application, I used gstreamer to connect to the input socket and provide an MPEG2 transport stream. This is trivial to do using gst-launch assuming you've got the proper plugins installed.

gst-launch alsasrc device=hw:0,4 ! ffenc_libmp3lame ! ffmux_mpegts ! tcpclientsink host=video.example.org port=9999

Next post - HTTP live streaming a radio scanner