Using TCP sockets in Python

2009-11-16

It seems like there are a lot of people confused about how to make correct use of the BSD sockets interface without having data disappear in the middle of a session. For building servers, you can make use of the SocketServer module to hide some of the annoyances, but for clients you've either got to deal with sockets or use a third party library, which never ends up being as fun as it sounds.

We'll start with a simple example... Open a socket, send a string, then close it.

from socket import socket

sock = socket()
sock.connect(('1.2.3.4', 1234))
sock.send('Hello world!\n')
sock.close()

This looks like a pretty simple example, but it exemplifies the pitfalls of sockets programming fairly well. The first problem that becomes apparent is that while the socket() constructor defaults to AF_INET, SOCK_STREAM (IPv4 TCP socket) arguments now, this may not be true in the future. The most likely change would be AF_INET6 for IPv6 support. While this call probably won't fail on an IPv6 native system, the IP address passed to the connect method would end up being in the wrong format. The simple solution is to pass DNS names rather than IP addresses and ensure that you catch any exceptions that might arise from nonexistant names.

This leads to the next problem: exception handling. The socket constructor could throw an exception if the system encounters an "out of FDs" condition. While this is fairly uncommon, you'll want to catch it to make your code more robust. The connect method could throw an exception for a number of reasons. If the IP or hostname you pass in cannot be parsed or resolved, or if all available source ports have been exhausted (extremely unlikely, requires at least 65536 established connections). The send method could fail if the connection has been closed or interrupted in some manner, although it's more likely that it simply wouldn't send data. Finally, the close method will only fail if the socket has already been closed. In our example above, there's no other way the socket object could be closed, so we really don't need to worry much about that. Additionally, this line may even be extraneous because Python closes sockets upon garbage collection. But it's best to include it for correctness.

Sending data can be a bit tricky, especially if you're trying to send a lot of it. The socket's send() method is not guaranteed to send all of the data you pass it. Instead, it returns the number of bytes that were actually sent and expects your application to handle retransmission of the unsent portion. However, Python provides a convenience method called sendall() that makes sure all of your data is sent before returning. The right way:

sock.sendall('Hello world\n')

If, for some reason, you need to know exactly what bytes were sent, even under an error condition, you can also do it the old-fashioned way:

buffer = 'Hello world\n'
while buffer:
	bytes = sock.send(buffer)
	buffer = buffer[bytes:]

This approach can be a bit inefficient memory-wise, but will allow you to access the unsent portion of buffer if sock.send throws an exception.

Time to try receiving some data! There are many ways to handle incoming data, depending on your application's needs. For now, I'll illustrate how you might handle a common line-based protocol.

def readlines(sock, recv_buffer=4096, delim='\n'):
	buffer = ''
	data = True
	while data:
		data = sock.recv(recv_buffer)
		buffer += data

		while buffer.find(delim) != -1:
			line, buffer = buffer.split('\n', 1)
			yield line
	return

This method will continuously call recv() until the connection is closed, and supply a generator interface to the caller so that implementing your protocol becomes as simple as:

for line in readlines(sock):
	# Do something with a line of data

The socket object also provides a makefile() method that returns a file object based on the socket's file descriptor, allowing implementations like this:

for line in sock.makefile('r'):
	# Do something with a line of data

The makefile approach is likely more efficient and will perform better, but may not be as compatible on multiple platforms and does not give you as much control over the recv buffer size and delimiter.

Unfortunately, recv() is a blocking call. If you'd like your program to continue doing other things while waiting for data, you have several choices: call setblocking(False) on your socket, causing recv to throw an exception if there's no data available, or use the select module to occasionally poll the socket for data, or make use of platform-specific calls like epoll and kqueue, or just handle sockets in a separate thread or process. It all depends on what your application needs. Let's take a look at a simple select() call, as most of the other methods are variants of this.

from select import select
from socket import socket

sock = socket()
sock.connect(('1.2.3.4', 1234))

while True:
	readable, writable, exceptional = select([sock], [], [], timeout=0)
	if readable:
		data = sock.recv(4096)
		# Handle the incoming data
	# Continue other tasks here

In this case, we're only using select to check the status of a single socket, but as you can see, it supports polling lists of sockets for readable, writable, and exception states. The return value is a three item tuple, containing any sockets that were found to be in the given states. Once the data is cleared from a socket, you can continue to perform other tasks in your application, eventually looping around to the select() call to poll for more data. This is an extremely efficient way to handle a large number of connections and use often used in implementing servers in order to avoid having to spawn new processes or threads for each connection, further consuming system resources.

I hope this post has cleared up some of the confusion around sockets programming. If you have questions or comments, don't be afraid to let me know.

Using scapy without root privileges

2009-11-03

I've been working on some scripts that interact with SNMP devices and after much research and frustration settled on the fact that all of the SNMP libraries available in Python can do everything. The NetSNMP bindings are incomplete, the PySNMP API changes every few months, and both are poorly documented. For these reasons, I started using Scapy to create and parse SNMP packets.

It's not terribly difficult to get some useful code with scapy and if you take a close look, the scapy source often provides concrete examples of what you're trying to do. For the purposes of this article, let's try writing an SNMP walk script.

from scapy import *

def walk(ip, community, oid_prefix):
    nextoid = oid_prefix
	while nextoid.startswith(oid_prefix):
		p = IP(dst=ip)/UDP(sport=RandShort())/SNMP(community=community, PDU=SNMPnext(varbindlist=[SNMPvarbind(oid=nextoid)]))
		r = sr1(p)
		oid = r[SNMPvarbind].oid.val
		if oid.startswith(oid_prefix):
			yield (oid, r[SNMPvarbind].value)
		else:
			break
		nextoid = oid

This is mostly just a re-implementation of scapy's builtin snmpwalk() method, except that it doesn't traverse the tree infinitely. For the most part, this code works. However, it requires root privileges to run because scapy uses raw sockets to send packets and pcap to receive them. Clearly this is not ideal.

The simplest workaround I've found (with help from Jordan) is to create a regular socket object in python, then use scapy to generate and parse the payloads.

from scapy import *
from socket import socket, AF_INET, SOCK_DGRAM

def walk(ip, community, oid_prefix):
	sock = socket(AF_INET, SOCK_DGRAM)
	sock.connect((ip, 161))

	nextoid = oid_prefix
	while nextoid.startswith(oid_prefix):
		p = SNMP(community=community, PDU=SNMPnext(varbindlist=[SNMPvarbind(oid=nextoid)]))
		buf = str(p)
		while buf:
			bytes = sock.send(buf)
			buf = buf[bytes:]

		r = SNMP(sock.recv(4096))
		oid = r[SNMPvarbind].oid.val
		if oid.startswith(oid_prefix):
			yield (oid, r[SNMPvarbind].value)
		else:
			break

By using the standard BSD sockets approach, you can avoid the requirement to have root privileges by giving up control of the IP and transport layers to the OS. This should work equally well for SOCK_STREAM (TCP) and SOCK_DGRAM (UDP) socket types. This approach appears to be much faster than scapy's native pcap based method, because pcap isn't parsing every packet on the interface to find the responses.

Implementing HTTP Live Streaming

2009-07-13

Apple's recently released HTTP Live Streaming provides a clear, concise method for streaming audio and video content to browsers and mobile devices without the need for proprietary formats or browser plugins (I'm looking at you, Adobe). Instead, Apple's spec relies upon already common and relatively open formats.

The reference for implementing this functionality is Apple's HTTP Live Streaming Overview. Please read this document before continuing. I'll wait.

Right, so the jist is that you need to serve an m3u8 playlist containing metadata and a pointer to a URL containing a valid MPEG2 transport stream. As my preferred language is Python and the general convention for creating web services in Python is to use WSGI (PEP 333) we'll start with a basic WSGI application.

from flup.server.fcgi import WSGIServer
from threading import Thread
from socket import socket
from select import select
from Queue import Queue
import re

class LiveHTTPServer(object):
	def __init__(self):
		self.urls = [
			('^/stream.m3u8$', self.playlist),
			('^/stream.ts$', self.stream),
		]
		self.urls = [(re.compile(pattern), func) for pattern, func in urls]
		self.queues = []

	def __call__(self, environ, start_response):
		for pattern, func in self.urls:
			match = pattern.match(environ['PATH_INFO'])
			if match:
				return func(start_response, match)
		start_response('404 Not Found', [('Content-type', 'text/plain')])
		return ['404 Not Found']

So, there's not too much magic here. When the server object is instantiated, it compiles a list of regular expressions and maps them to instance methods. The WSGI server will call __call__ which attempts to match each of the regexes on the path of the request, calling the associated method if matched or sending a 404 response if not. Note that Ian Bicking's WebOb library is a much simpler way to perform a lot of these tasks, but doesn't provide an easy way to send chunked responses, which are necessary for the MPEG2 transport stream. Just ignore all the extra imports for now, we'll get to them in a minute.

	def playlist(self, start_response, match):
		start_response('200 OK', [('Content-type', 'application/x-mpegURL')])
		return ['''#EXTM3U
#EXTINF:10,
http://video.example.org/stream.ts
#EXT-X-ENDLIST''']

This method implementes the /stream.m3u8 response as required my Apple's spec. The M3U standard says that the EXTINF attribute should have a value of -1 for ongoing streams or those of unknown length, but the iPhone rejected the playlist given anything other than a positive integer in this field.

	def stream(self, start_response, match):
		start_response('200 OK', [('Content-type', 'video/MP2T')])
		q = Queue()
		self.queues.append(q)
		while True:
			try:
				yield q.get()
			except:
				if q in self.queues:
					self.queues.remove(q)
				return

This is where the tricky part actually happens. We create a Queue that will be filled with the MPEG2 data from another thread and start blocking on it, passing the data as a chunked response as soon as it's available. If anything goes wrong (eg. client disconnect) then we remove this stream's queue from the list and return. If this server were to handle a large number of clients, we might want to set a max queue size to avoid filling up memory with data destined for a slow or unresponsive client. It might also be useful to perform some locking on the queues list, to avoid contention between threads. I'll leave that as an exercise for the reader.

def input_loop(app):
	sock = socket()
	sock.bind(('', 9999))
	sock.listen(1)
	while True:
		print 'Waiting for input stream'
		sd, addr = sock.accept()
		print 'Accepted input stream from', addr
		data = True
		while data:
			readable = select([sd], [], [], 0.1)[0]
			for s in readable:
				data = s.recv(1024)
				if not data:
					break
				for q in app.queues:
					q.put(data)
		print 'Lost input stream from', addr

This method serves as the feeder for all of the client queues. It listens for a single connection on port 9999 and puts any received data into all available client queues. If the feeder stream is lost, it will go back to waiting for a new connection.

if __name__ == '__main__':
	app = LiveHTTPServer()
	server = WSGIServer(app, bindAddress=('', 9998))

	t1 = Thread(target=input_loop, args=[app])
	t1.setDaemon(True)
	t1.start()

	server.run()

Finally we tie it all together by instantiating the WSGI application and server and starting a separate thread for the input loop. The flup.server.fcgi.WSGIServer class will act as a FastCGI server that can act as a backend for any number of web servers. If you'd rather not use FastCGI, you should be able to drop in any other WSGI server as long as it supports multiple concurrent requests, otherwise any client after the first will just block waiting for the transport stream.

For my application, I used gstreamer to connect to the input socket and provide an MPEG2 transport stream. This is trivial to do using gst-launch assuming you've got the proper plugins installed.

gst-launch alsasrc device=hw:0,4 ! ffenc_libmp3lame ! ffmux_mpegts ! tcpclientsink host=video.example.org port=9999

HTTP live streaming a radio scanner

2009-07-13

As my obsession with everything radio related continues, I found myself wanting a way to listen to arbitrary radio frequencies from my iPhone. I looked at several solutions along these lines, but settled for using an off-the-shelf radio scanner as a frontend due to the low cost, relative ease of use, and wideband reception. My portable scanner, an ICOM IC-RX7 certainly has all of the features necessary to accomplish this task, but is limited in that it's small package makes it difficult to setup a discriminator tap. A discriminator tap is a radio modification that allows you to capture the baseband radio signal before any real processing is done by the audio frontend. This is useful if you want to decode FSK encoded signals for instance.

So, rather than putting holes in my pretty portable radio, I opted to buy a new one. I figured this would be a good time to fill in for a few of the features the RX7 is lacking, like trunk tracking. After a bit of research and deciding where my price points were, I ended up with the Uniden BCT8 scanner. It supports a few common analog voice trunking systems (EDACS, LTR, Motorola) and has a serial port for controlling and programming.

As my platform of choice is Linux, I started searching for software to interface with the radio over the serial port and came up short. However, I did manage to find Uniden's serial protocol reference for the BCT8 and proceeded to implement a few Python modules to serve my purposes. As a side note: Uniden apparently cannot design a sane protocol and their engineers should be shot on sight.

With the eventual goal of getting scanner audio and control on my iPhone, I began to tackle the task of taking audio input from ALSA and pushing it over the network in some format that the iPhone can comprehend. My initial solution was to use the quite convenient (if somewhat undocumented) pyAudio library to capture raw PCM data from the sound card and multicast it out over my network. This worked fairly well for listening using my laptop, but would require some serious development work to get working on the iPhone (A custom Audio Unit Generator using multicast sockets to recieve the stream and somehow transforming it into something Core Audio can comprehend). Even if I did get that working, there would be limitations... It wouldn't work on a network without multicast, audio would be transmitted uncompressed, and the possibility of high-latency or dropped packets was high.

As I was working on the multicast server, Apple announced a new HTTP Live Streaming feature in the iPhone 3.0 firmware. A quick look at the documentation make it look fairly simple, as long as I could get my PCM audio stream into AAC or MP3 in an MPEG2 Transport Stream. The encoding part had proven more difficult than I thought and wouldn't work with straight UNIX pipes. I was also unable to find a good library for muxing MPEG2 Transport Streams... I figured that in the worst case I could use a Python module I wrote a few months ago for demuxing MPEG2-TS, but that would require quite a bit of work and may be filled with bugs.

Eventually I found gstreamer, which solves a lot of the above problems. I built a gstreamer pipeline that took input from alsasrc, compressed it using ffenc_libmp3lame, muxed using ffmux_mpegts, and output using tcpclientsink. The tcpclientsink is then connected to a Python app that reads input from a source socket and outputs the data as a chunked HTTP response to any open connections. I used WSGI to implement a simple server that handles the protocol portion of Apple's HTTP live streaming spec. In order to handle multiple clients without too much hassle, the streaming server uses flup to implement a FastCGI server that's frontended by nginx.

After bringing everything up and connecting the iPhone, I get a nice audio stream on the iPhone. There are still a few drawbacks to this approach, the most obvious being that I can't control the scanner from the phone. I'm hoping that Apple provides a way to integrate a live streaming connection directly into an app without popping up the QuickTime view, but I may run out of luck there. Worst case, I can provide some status information about the scanner (currently tuned frequency, talkgroup, etc) as a video stream in the transport stream and a separate web app for performing control actions.

I'll go into more detail about the implementation of the various pieces here in another post, but in the meantime the scanner's stream is available here: http://neohippie.net/scanner/ and is accessible using an iPhone with 3.0 firmware or VLC. Theoretically it should also work with QuickTime X, but that hasn't been released yet so I haven't been able to test it.