Monthly Archives: June 2010

Using Jython as a CLI frontend to HBase

HBase, the well known non-relational distributed database, comes with a console program to perform various operations on a HBase cluster. I’ve personally found this tool to be a bit limited and I’ve toyed around the idea of writing my own. Since HBase only comes with a Java driver for direct access and the various RPC interfaces such as Thrift don’t offer the full set of functions over HBase, I decided to go for Jython and to directly use the Java API. This article will show a mock-up of such a tool.

The idea is to provide a simple Python API over the HBase one and couple it with a Python interpreter. This means, it offers the possibility to perform any Python (well Jython) operations whilst operating on HBase itself with an easier API than the Java one.

Note also that the tool uses the WSPBus already described in an earlier article to control the process itself. You will therefore need CherryPy’s latest revision.

# -*- coding: utf-8 -*-
import sys
import os
import code
import readline
import rlcompleter
 
from org.apache.hadoop.hbase import HBaseConfiguration, \
     HTableDescriptor, HColumnDescriptor
from org.apache.hadoop.hbase.client import HBaseAdmin, \
     HTable, Put, Get, Scan
 
import logging
from logging import handlers
 
from cherrypy.process import wspbus
from cherrypy.process import plugins
 
class StaveBus(wspbus.Bus):
    def __init__(self):
        wspbus.Bus.__init__(self)
        self.open_logger()
        self.subscribe("log", self._log)
 
        sig = plugins.SignalHandler(self)
        if sys.platform[:4] == 'java':
            del sig.handlers['SIGUSR1']
            sig.handlers['SIGUSR2'] = self.graceful
            self.log("SIGUSR1 cannot be set on the JVM platform. Using SIGUSR2 instead.")
 
            # See http://bugs.jython.org/issue1313
            sig.handlers['SIGINT'] = self._jython_handle_SIGINT
        sig.subscribe()
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)
 
    def _jython_handle_SIGINT(self, signum=None, frame=None):
        # See http://bugs.jython.org/issue1313
        self.log('Keyboard Interrupt: shutting down bus')
        self.exit()
 
class HbaseConsolePlugin(plugins.SimplePlugin):
    def __init__(self, bus):
        plugins.SimplePlugin.__init__(self, bus)
        self.console = HbaseConsole()
 
    def start(self):
        self.console.setup()
        self.console.run()
 
class HbaseConsole(object):
    def __init__(self):
        # we provide this instance to the underlying interpreter
        # as the interface to operate on HBase
        self.namespace = {'c': HbaseCommand()}
 
    def setup(self):
        readline.set_completer(rlcompleter.Completer(self.namespace).complete)
        readline.parse_and_bind("tab:complete")
        import user
 
    def run(self):
        code.interact(local=self.namespace)
 
class HbaseCommand(object):
    def __init__(self, conf=None, admin=None):
        self.conf = conf
        if not conf:
            self.conf = HBaseConfiguration()
        self.admin = admin
        if not admin:
            self.admin = HBaseAdmin(self.conf)
 
    def table(self, name):
        return HTableCommand(name, self.conf, self.admin)
 
    def list_tables(self):
        return self.admin.listTables().tolist()
 
class HTableCommand(object):
    def __init__(self, name, conf, admin):
        self.conf = conf
        self.admin = admin
        self.name = name
        self._table = None
 
    def row(self, name):
        if not self._table:
            self._table = HTable(self.conf, self.name)
        return HRowCommand(self._table, name)
 
    def create(self, families=None):
        desc = HTableDescriptor(self.name)
        if families:
            for family in families:
                desc.addFamily(HColumnDescriptor(family))
        self.admin.createTable(desc)
        self._table = HTable(self.conf, self.name)
        return self._table
 
    def scan(self, start_row=None, end_row=None, filter=None):
        if not self._table:
            self._table = HTable(self.conf, self.name)
 
        sc = None
        if start_row and filter:
            sc = Scan(start_row, filter)
        elif start_row and end_row:
            sc = Scan(start_row, end_row)
        elif start_row:
            sc = Scan(start_row)
        else:
            sc = Scan()
        s = self._table.getScanner(sc)
        while True:
            r = s.next()
            if r is None:
                raise StopIteration()
 
            yield r
 
    def delete(self):
        self.disable()
        self.admin.deleteTable(self.name)
 
    def disable(self):
        self.admin.disableTable(self.name)
 
    def enable(self):
        self.admin.enableTable(self.name)
 
    def exists(self):
        return self.admin.tableExists(self.name)
 
    def list_families(self):
        desc = HTableDescriptor(self.name)
        return desc.getColumnFamilies()
 
class HRowCommand(object):
    def __init__(self, table, rowname):
        self.table = table
        self.rowname = rowname
 
    def put(self, family, column, value):
        p = Put(self.rowname)
        p.add(family, column, value)
        self.table.put(p)
 
    def get(self, family, column):
        r = self.table.get(Get(self.rowname))
        v = r.getValue(family, column)
        if v is not None:
            return v.tostring()
 
 
if __name__ == '__main__':
    bus = StaveBus()
    HbaseConsolePlugin(bus).subscribe()
    bus.start()
    bus.block()

To test the tool, you can simply grab the latest copy of HBase and run:

hbase-0.20.4$ ./bin/start-hbase.sh

Then you need to configure your classpath so that it includes all the HBase dependencies. To determine them:

$ ps auwx|grep java|grep org.apache.hadoop.hbase.master.HMaster|perl -pi -e "s/.*classpath //"

Copy the full list of jars and export CLASSPATH with it. (This is from the HBase wiki on Jython and HBase).

Next you have to add an extra jar to the classpath so that Jython supports readline:

$ export CLASSPATH=$CLASSPATH:$HOME/jython2.5.1/extlibs/libreadline-java-0.8.jar

Make sure you’ll install libreadline-java as well.

Now, that your environment is setup, save the code above under a script named stave.py and run it as follow:

$ jython stave.py
Python 2.5.1 (Release_2_5_1:6813, Sep 26 2009, 13:47:54) 
[Java HotSpot(TM) Server VM (Sun Microsystems Inc.)] on java1.6.0_20
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> c.table('myTable').create(families=['aFamily:'])
>>> c.table('myTable').list_families()
array(org.apache.hadoop.hbase.HColumnDescriptor)
>>> c.table('myTable').row('aRow').put('aFamily', 'aColumn', 'hello world!')
>>> c.table('myTable').row('aRow').get('aFamily', 'aColumn')
'hello world!'
>>> list(c.table('myTable').scan())
[keyvalues={aRow/aFamily:aColumn/1277645421824/Put/vlen=12}]

You can import any Python module available to your Jython environment as well of course.

I will probably extend this tool over time but in the meantime I hope you’ll find it a useful canvas to operate HBase.

A quick chat WebSockets/AMQP client

In my previous article I described how to plug WebSockets into AMQP using Tornado and pika. As a follow-up, I’ll show you how this can be used to write the simplest chat client.

First we create a web handler for Tornado that will return a web page containing the Javascript code that will connect and converse with our WebSockets endpoint following the WebSockets API.

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        username = "User%d" % random.randint(0, 100)
        self.write("""<html>
        <head>
          <script type='application/javascript' src='/static/jquery-1.4.2.min.js'> </script>
          <script type='application/javascript'>
            $(document).ready(function() {
              var ws = new WebSocket('ws://localdomain.dom:8888/ws');
              ws.onmessage = function (evt) {
                 $('#chat').val($('#chat').val() + evt.data + '\\n');                  
              };
              $('#chatform').submit(function() {
                 ws.send('%(username)s: ' + $('#message').val());
                 $('#message').val("");
                 return false;
              });
            });
          </script>
        </head>
        <body>
        <form action='/ws' id='chatform' method='post'>
          <textarea id='chat' cols='35' rows='10'></textarea>
          <br />
          <label for='message'>%(username)s: </label><input type='text' id='message' />
          <input type='submit' value='Send' />
          </form>
        </body>
        </html>
        """ % {'username': username})

Every time, the user enters a message and submits it too our WebSockets endpoint which, in return, will forward any messages back to the client. These will be appended to the textarea.

Internally, each client gets notified of any message through AMQP and the bus. Indeed the WebSockets handler are subscribed to a channel that will be notified every time the AMQP server pushes data to the consumer. A side effect of this is that the Javascript code above doesn’t update the textarea when it sends the message the user has entered, but when the server sends it back.

Let’s see how we had to change the Tornado application to support that handler as well as the serving of jQuery as a static resource (you need the jQuery toolkit in the same directory as the Python module).

 
if __name__ == '__main__':
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/ws", WebSocket2AMQP),
        ], static_path=".", bus=bus)
 
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
 
    bus.subscribe("main", poll)
    WS2AMQPPlugin(bus).subscribe()
    bus.start()
    bus.block()

The code is here.

Once the server is running, open two browser windows and access http://localhost:8888/. You should be able to type messages in one and see them appears in both windows.

Note:

This has been tested against the latest Chrome release. You will need to either set the “localdomain.dom” or provide the IP address of your network interface in the Javascript above since Chrome doesn’t allow for localhost nor 127.0.0.1.

Plugging AMQP and WebSockets

In my last article, I discussed the way the WSPBus could help your management of Python processes. This time, I’ll show how the bus can help plugging in heterogeneous frameworks and manage them properly too.

The following example will plug the WebSockets and AMQP together in order to channel data in and out of a WebSockets channel into AMQP exchanges and queues. For this, we’ll be using the Tornado web framework to handle the WebSockets side and pika for the AMQP one.

pika uses the Python built-in asyncore module to perform the non-blocking socket operations whilst Tornado comes with its own main loop on top of select or poll. Since Tornado doesn’t offer a single function call to iterate once, we’ll be directly using their main loop to block the process and therefore won’t be using the bus’ own block method.

Let’s see how the bus looks like

 class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)
 
        self.ioloop = tornado.ioloop.IOLoop.instance()
        self.ioloop.add_callback(self.call_main)
 
    def call_main(self):
        self.publish('main')
        time.sleep(0.1)
        self.ioloop.add_callback(self.call_main)
 
    def block(self):
        ioloop = tornado.ioloop.IOLoop.instance()
        try:
            ioloop.start()
        except KeyboardInterrupt:
            ioloop.stop()
            self.exit()
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)

Next we create a plugin that will subscribe to the bus and which will be in charge for the AMQP communication.

class WS2AMQPPlugin(plugins.SimplePlugin):
    def __init__(self, bus):
        plugins.SimplePlugin.__init__(self, bus)
        self.conn = pika.AsyncoreConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.conn.channel()
        self.channel.exchange_declare(exchange="X", type="direct", durable=False)
        self.channel.queue_declare(queue="Q", durable=False, exclusive=False)
        self.channel.queue_bind(queue="Q", exchange="X", routing_key="")
 
        self.channel.basic_consume(self.amqp2ws, queue="Q")
 
        self.bus.subscribe("ws2amqp", self.ws2amqp)
        self.bus.subscribe("stop", self.cleanup)
 
    def cleanup(self):
        self.bus.unsubscribe("ws2amqp", self.ws2amqp)
        self.bus.unsubscribe("stop", self.cleanup)
        self.channel.queue_delete(queue="Q")
        self.channel.exchange_delete(exchange="X")
        self.conn.close()
 
    def amqp2ws(self, ch, method, header, body):
        self.bus.publish("amqp2ws", body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
 
    def ws2amqp(self, message):
        self.bus.log("Publishing to AMQP: %s" % message)
        self.channel.basic_publish(exchange="X", routing_key="", body=message)

The interesting bits are the amqp2ws and ws2amqp methods. The former is called anytime the AMQP broker pushes data to our AMQP consumer, we then use the bus to publish the message to any interested subscribers. The latter publishes to AMQP messages that come from the WebSockets channel.

Next let’s see the Tornado WebSockets handler.

class WebSocket2AMQP(websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        websocket.WebSocketHandler.__init__(self, *args, **kwargs)
        self.settings['bus'].subscribe("amqp2ws", self.push_message)
 
    def open(self):
        self.receive_message(self.on_message)
 
    def on_message(self, message):
        self.settings['bus'].publish("ws2amqp", message)
        self.write_message(message)
        self.receive_message(self.on_message)
 
    def on_connection_close(self):
        self.settings['bus'].unsubscribe("amqp2ws", self.push_message)
 
    def push_message(self, message):
        self.write_message(message)

The on_message method is called whenever data is received from the client, the push_message is used to push data to the client.

Finally, we setup the plug everything together:

if __name__ == '__main__':
    application = tornado.web.Application([
        (r"/ws", WebSocket2AMQP),
        ], bus=bus)
 
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
 
    bus.subscribe("main", poll)
    WS2AMQPPlugin(bus).subscribe()
    bus.start()
    bus.block()

Notice the fact we subscribe the asyncore poll function to the main channel of the bus so that pika works properly as if we had called asyncore.loop()

The code can be found here.

Managing your process with the CherryPy’s bus

CherryPy is a successful small web framework which over the years has built up its performances as well as its stability. To do so, Robert Brewer, the main CherryPy’s architect has introduced what is called the Web Site Process Bus (WSPBus). The idea is to manage a Python process by providing it with a bus to which one can publish or subscribe for events. CherryPy’s implementation of the bus comes with a set of pubsub handlers for very basic operations such as responding to system signals, handle thread creation and deletion, drop process privileges and handle PID files. The bus mechanism can help your handling of sub-processes so that they start, run and terminates gracefully. Let’s see how.

Create your bus

First, you need to create a bus instance. This could be as simple as this.

1
2
from cherrypy.process import wspbus
bus = wspbus.Bus()

If you want to log through the bus, you will need further work since the bus doesn’t create a logger by default. Let’s see an example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
import logging
from logging import handlers
 
from cherrypy.process import wspbus
 
class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)

Not much, just creating a logger and subscribing the bus log channel to an instance method.

Associate the bus with the main process

Before we move on to the management of sub-process, let’s see how we can manage the main Python process already with our bus above.

For this, let’s imagine a bank placing stock orders, those orders will be handled by a broker running in a sub-process.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import random
import string
from multiprocessing import Process
 
class Bank(object):
    def __init__(self, queue):
        self.bus = MyBus(Bank.__name__)
        self.queue = queue
        self.bus.subscribe("main", self.randomly_place_order)
        self.bus.subscribe("exit", self.terminate)
 
    def randomly_place_order(self):
        order = random.sample(['BUY', 'SELL'], 1)[0]
        code = random.sample(string.ascii_uppercase, 4)
        amount = random.randint(0, 100)
 
        message = "%s %s %d" % (order, ''.join(code), amount)
 
        self.bus.log("Placing order: %s" % message)
 
        self.queue.put(message)
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def terminate(self):
        self.bus.unsubscribe("main", self.randomly_place_order)
        self.bus.unsubscribe("exit", self.terminate)

As you can see, not much again here, we simply associate a bus with the bank object. We also register to the exit channel of the bus so that when we terminated, we can do some cleanup. It’s good use to unregister from the bus.

We don’t actually care where those orders come from so we randomly generate them. The orders are placed every time the bus iterates its loop. This is done by attaching to the main channel of the bus.

We use a process queue to communicate with the broker’s sub-process.

Associate the bus with a sub-process

Handling the sub-process is actually similar to handling the main process. Let’s see the broker implementation for example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from Queue import Empty
 
class Broker(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue
        self.bus = MyBus(Broker.__name__)
        self.bus.subscribe("main", self.check)
 
    def check(self):
        try:
            message = self.queue.get_nowait()
        except Empty:
            return
 
        if message == "stop":
            self.bus.unsubscribe("main", self.check)
            self.bus.exit()
        elif message.startswith("BUY"):
            self.buy(*message.split(' ', 2)[1:])
        elif message.startswith("SELL"):
            self.sell(*message.split(' ', 2)[1:])
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def stop(self):
        self.queue.put("stop")
 
    def buy(self, code, amount):
        self.bus.log("BUY order placed for %s %s" % (amount, code))
 
    def sell(self, code, amount):
        self.bus.log("SELL order placed for %s %s" % (amount, code))

Several things are to be noticed. First we register once again to the bus’ main channel a method that checks the shared queue for incoming data. Whenever the incoming message is “stop”, we exit the bus altogether, thus leaving the sub-process, since it was blocked on the bus loop.

Note that the stop method could be called by the parent process if you needed to programatically stop the sub-process.

Put it all together

Run the code above as follow:

1
2
3
4
5
6
7
8
9
if __name__ == '__main__':
    from multiprocessing import Queue
    queue = Queue()
 
    broker = Broker(queue)
    broker.start()
 
    bank = Bank(queue)
    bank.run()

This creates the shared queue, starts the sub-process that runs the broker and finally starts the bank within the main process.

You should see a bunch of messages in the console and if you hit Ctrl-C, this will stop both processes cleanly.

And here we are, we now manage processes and sub-processes with a clean solution. The CherryPy process bus is an elegant add-on to your toolbox that I can only highly advise to consider in the future. The WSPBus implementation is part of the main CherryPy package (CherryPy 3.x), so you’ll have to install it all, even if you don’t require the HTTP framework. But don’t let that hold you back since the HTTP framework isn’t required for the bus to be used.

Happy coding!

The code is here.