Plugging AMQP and WebSockets

In my last article, I discussed the way the WSPBus could help your management of Python processes. This time, I’ll show how the bus can help plugging in heterogeneous frameworks and manage them properly too.

The following example will plug the WebSockets and AMQP together in order to channel data in and out of a WebSockets channel into AMQP exchanges and queues. For this, we’ll be using the Tornado web framework to handle the WebSockets side and pika for the AMQP one.

pika uses the Python built-in asyncore module to perform the non-blocking socket operations whilst Tornado comes with its own main loop on top of select or poll. Since Tornado doesn’t offer a single function call to iterate once, we’ll be directly using their main loop to block the process and therefore won’t be using the bus’ own block method.

Let’s see how the bus looks like

Next we create a plugin that will subscribe to the bus and which will be in charge for the AMQP communication.

The interesting bits are the amqp2ws and ws2amqp methods. The former is called anytime the AMQP broker pushes data to our AMQP consumer, we then use the bus to publish the message to any interested subscribers. The latter publishes to AMQP messages that come from the WebSockets channel.

Next let’s see the Tornado WebSockets handler.

The on_message method is called whenever data is received from the client, the push_message is used to push data to the client.

Finally, we setup the plug everything together:

Notice the fact we subscribe the asyncore poll function to the main channel of the bus so that pika works properly as if we had called asyncore.loop()

The code can be found here.

6 thoughts on “Plugging AMQP and WebSockets”

  1. I don’t know Tornado well, but doesn’t call_main run in the main thread? If so, the time.sleep call would block any requests, wouldn’t it?

  2. Sylvain, there is nothing special about it. Cyclone is a Twisted-based web framework, so txamqp will work as shown in its included example code.

  3. @Thomas: Yes you are right. But this is part of using an async framework, you have to be careful about long operations. Now, in this case, it was meant mostly as an example and I’m using the time.sleep to prevent the process from consuming my CPU like crazy.

Comments are closed.