Tag Archives: websockets

A quick chat WebSockets/AMQP client

In my previous article I described how to plug WebSockets into AMQP using Tornado and pika. As a follow-up, I’ll show you how this can be used to write the simplest chat client.

First we create a web handler for Tornado that will return a web page containing the Javascript code that will connect and converse with our WebSockets endpoint following the WebSockets API.

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        username = "User%d" % random.randint(0, 100)
        self.write("""<html>
        <head>
          <script type='application/javascript' src='/static/jquery-1.4.2.min.js'> </script>
          <script type='application/javascript'>
            $(document).ready(function() {
              var ws = new WebSocket('ws://localdomain.dom:8888/ws');
              ws.onmessage = function (evt) {
                 $('#chat').val($('#chat').val() + evt.data + '\\n');                  
              };
              $('#chatform').submit(function() {
                 ws.send('%(username)s: ' + $('#message').val());
                 $('#message').val("");
                 return false;
              });
            });
          </script>
        </head>
        <body>
        <form action='/ws' id='chatform' method='post'>
          <textarea id='chat' cols='35' rows='10'></textarea>
          <br />
          <label for='message'>%(username)s: </label><input type='text' id='message' />
          <input type='submit' value='Send' />
          </form>
        </body>
        </html>
        """ % {'username': username})

Every time, the user enters a message and submits it too our WebSockets endpoint which, in return, will forward any messages back to the client. These will be appended to the textarea.

Internally, each client gets notified of any message through AMQP and the bus. Indeed the WebSockets handler are subscribed to a channel that will be notified every time the AMQP server pushes data to the consumer. A side effect of this is that the Javascript code above doesn’t update the textarea when it sends the message the user has entered, but when the server sends it back.

Let’s see how we had to change the Tornado application to support that handler as well as the serving of jQuery as a static resource (you need the jQuery toolkit in the same directory as the Python module).

 
if __name__ == '__main__':
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/ws", WebSocket2AMQP),
        ], static_path=".", bus=bus)
 
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
 
    bus.subscribe("main", poll)
    WS2AMQPPlugin(bus).subscribe()
    bus.start()
    bus.block()

The code is here.

Once the server is running, open two browser windows and access http://localhost:8888/. You should be able to type messages in one and see them appears in both windows.

Note:

This has been tested against the latest Chrome release. You will need to either set the “localdomain.dom” or provide the IP address of your network interface in the Javascript above since Chrome doesn’t allow for localhost nor 127.0.0.1.

Plugging AMQP and WebSockets

In my last article, I discussed the way the WSPBus could help your management of Python processes. This time, I’ll show how the bus can help plugging in heterogeneous frameworks and manage them properly too.

The following example will plug the WebSockets and AMQP together in order to channel data in and out of a WebSockets channel into AMQP exchanges and queues. For this, we’ll be using the Tornado web framework to handle the WebSockets side and pika for the AMQP one.

pika uses the Python built-in asyncore module to perform the non-blocking socket operations whilst Tornado comes with its own main loop on top of select or poll. Since Tornado doesn’t offer a single function call to iterate once, we’ll be directly using their main loop to block the process and therefore won’t be using the bus’ own block method.

Let’s see how the bus looks like

 class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)
 
        self.ioloop = tornado.ioloop.IOLoop.instance()
        self.ioloop.add_callback(self.call_main)
 
    def call_main(self):
        self.publish('main')
        time.sleep(0.1)
        self.ioloop.add_callback(self.call_main)
 
    def block(self):
        ioloop = tornado.ioloop.IOLoop.instance()
        try:
            ioloop.start()
        except KeyboardInterrupt:
            ioloop.stop()
            self.exit()
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)

Next we create a plugin that will subscribe to the bus and which will be in charge for the AMQP communication.

class WS2AMQPPlugin(plugins.SimplePlugin):
    def __init__(self, bus):
        plugins.SimplePlugin.__init__(self, bus)
        self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(exchange="X", type="direct", durable=False)
        self.channel.queue_declare(queue="Q", durable=False, exclusive=False)
        self.channel.queue_bind(queue="Q", exchange="X", routing_key="")
 
        self.channel.basic_consume(self.amqp2ws, queue="Q")
 
        self.bus.subscribe("ws2amqp", self.ws2amqp)
        self.bus.subscribe("stop", self.cleanup)
 
    def cleanup(self):
        self.bus.unsubscribe("ws2amqp", self.ws2amqp)
        self.bus.unsubscribe("stop", self.cleanup)
        self.channel.queue_delete(queue="Q")
        self.channel.exchange_delete(exchange="X")
        self.conn.close()
 
    def amqp2ws(self, ch, method, header, body):
        self.bus.publish("amqp2ws", body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
 
    def ws2amqp(self, message):
        self.bus.log("Publishing to AMQP: %s" % message)
        self.channel.basic_publish(exchange="X", routing_key="", body=message)

The interesting bits are the amqp2ws and ws2amqp methods. The former is called anytime the AMQP broker pushes data to our AMQP consumer, we then use the bus to publish the message to any interested subscribers. The latter publishes to AMQP messages that come from the WebSockets channel.

Next let’s see the Tornado WebSockets handler.

class WebSocket2AMQP(websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        websocket.WebSocketHandler.__init__(self, *args, **kwargs)
        self.settings['bus'].subscribe("amqp2ws", self.push_message)
 
    def open(self):
        self.receive_message(self.on_message)
 
    def on_message(self, message):
        self.settings['bus'].publish("ws2amqp", message)
        self.write_message(message)
        self.receive_message(self.on_message)
 
    def on_connection_close(self):
        self.settings['bus'].unsubscribe("amqp2ws", self.push_message)
 
    def push_message(self, message):
        self.write_message(message)

The on_message method is called whenever data is received from the client, the push_message is used to push data to the client.

Finally, we setup the plug everything together:

if __name__ == '__main__':
    application = tornado.web.Application([
        (r"/ws", WebSocket2AMQP),
        ], bus=bus)
 
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
 
    bus.subscribe("main", poll)
    WS2AMQPPlugin(bus).subscribe()
    bus.start()
    bus.block()

Notice the fact we subscribe the asyncore poll function to the main channel of the bus so that pika works properly as if we had called asyncore.loop()

The code can be found here.