Tag Archives: cherrypy

ws4py – WebSocket client and server library for Python

Recently I released ws4py, a package that provides client and server WebSocket support for Python 2.6 and 2.7.

Let’s first have a quick overview of what ws4py offers for now:

  • WebSocket specification draft-10 of the current specification.
  • A threaded client. This gives a simple client that doesn’t require an external dependency.
  • A Tornado client. This client is based on Tornado 2.0 which is quite a popular way of running asynchronous networking code these days. Tornado provides its own server implementation so I didn’t include mine in ws4py.
  • A CherryPy extension so that you can integrate WebSocket from within your CherryPy 3.2.1 server.
  • A gevent server based on the popular gevent library. This is courtesy of Jeff Lindsay.
  • Based on Jeff’s work, a pure WSGI middleware as well (available in the current master branch only until the next release).
  • ws4py runs on Android devices thanks to the SL4A package

Hopefully more client and servers will be added along the way as well as Python 3.x support. The former should be rather simple to add due to the way I designed ws4py.

The main idea is to make a distinction between the bytes provider and the bytes processing. The former is essentially reading and writing bytes from the connected socket. The latter is the function of making something out of the received bytes based on the WebSocket specification. In most implementations I have seen so far, both are rather heavily intertwined making it difficult to use a different bytes provider.

ws4py tries a different path by relying on a great feature of Python: the possibility to send data back to a generator. For instance, the frame parsing yields the quantity of bytes each time it needs more and the caller feeds back the generator those bytes once they are received. In fact, the caller of a frame parser is a stream object which acts the same way. The caller of that stream object is in fact the bytes provider (a client or a server). The stream is in charge of aggregating frames into a WebSocket message. Thanks to that design, both the frame and stream objects are totally unaware of the bytes provider and can be easily adapted in various contexts (gevent, tornado, CherryPy, etc.).

On my TODO list for ws4py:

  • Upgrade to a more recent version of the specification
  • Python 3.x implementation
  • Better documentation, read, write documentation.
  • Better performances on very large WebSocket messages

Acceptance testing a CherryPy application with Robot Framework

I recently received the Python Testing Cookbook authored by Greg L. Turnquist and was happy to read about recipes on acceptance testing using Robot Framework. We’ve been using this tool at work for a few weeks now with great results. Greg shows how to test a web application using the Selenium Library extension for Robot Framework and I thought it’d be fun to demonstrate how to test a CherryPy application following his recipe. So here we go.

First some requirements:

$ mkvirtualenv --distribute --no-site-packages --unzip-setuptools acceptance
(acceptance)$ pip install cherrypy
(acceptance)$ pip install robotframework
(acceptance)$ pip install robotframework-seleniumlibrary

Let’s define a simple CherryPy application, which displays a input text where to type a message. When the submit button is pressed, the message is sent to the server and returned as-is. Well it’s an echo message really.

import cherrypy
 
__all__ = ['Echo']
 
class Echo(object):
    @cherrypy.expose
    def index(self):
        return """<html>
<head><title>Robot Framework Test for CherryPy</title></head>
<body>
<form method="post" action="/echo">
<input type="text" name="message" />
<input type="submit" />
</form>
</body>
</html>"""
 
    @cherrypy.expose
    def echo(self, message):
        return message
 
if __name__ == '__main__':
    cherrypy.quickstart(Echo())

Save the code above in a module named myapp.py

Next, we create an extension to Robot Framework that will manage CherryPy. Save the following in a module CherryPyLib.py. It’s important to respect that name since Robot Framework expects the module and its class to match in names.

import imp
import os, os.path
 
import cherrypy
 
class CherryPyLib(object):
    def setup_cherrypy(self, conf_file=None):
        """
        Configures the CherryPy engine and server using
        the built-in 'embedded' environment mode.
 
        If provided, `conf_file` is a path to a CherryPy
        configuration file used in addition.
        """
        cherrypy.config.update({"environment": "embedded"})
        if conf_file:
            cherrypy.config.update(conf_file)            
 
    def start_cherrypy(self):
        """
        Starts a CherryPy engine.
        """
        cherrypy.engine.start()
 
    def exit_cherrypy(self):
        """
        Terminates a CherryPy engine.
        """
        cherrypy.engine.exit()
 
    def mount_application(self, appmod, appcls, directory=None):
        """
        Mounts an application to be tested. `appmod` is the name
        of a Python module containing `appcls`. The module is
        looked for in the given directory. If not provided, we use
        the current one instead.
        """
        directory = directory or os.getcwd()
        file, filename, description = imp.find_module(appmod, [directory])
        mod = imp.load_module(appmod, file, filename, description)
        if hasattr(mod, appcls):
            cls = getattr(mod, appcls)
            app = cls()
            cherrypy.tree.mount(app)
        else:
            raise ImportError, "cannot import name %s from %s" % (appcls, appmod)

Note that we start and stop the CherryPy server during the test itself, meaning you don’t need to start it separately. Pure awesomeness.

Finally let’s write a straightforward acceptance test to validate the overall workflow of echoing a message using our little application.

***Settings***
Library	SeleniumLibrary
Library	CherryPyLib
Suite Setup	Start Dependencies
Suite Teardown	Shutdown Dependencies
Test Setup	Mount Application	myapp	Echo

***Variables***
${MSG}	Hello World
${HOST}	http://localhost:8080/

***Test Cases***
Echo ${MSG}
     Open Browser	${HOST}
     Input text		message		${MSG}
     Submit form
     Page Should Contain		${MSG}
     Close All Browsers

***Keywords***
Start Dependencies
    Setup Cherrypy
    Start CherryPy
    Start Selenium Server
    Sleep 	3s

Shutdown Dependencies
    Stop Selenium Server
    Exit CherryPy

Save the test above into a file named testmyapp.txt. You can finally run the test as follow:

(acceptance)$ pybot --pythonpath . testmyapp.txt

This will start CherryPy, Selenium’s proxy server and Firefox within which the test case will be run. Easy, elegant and powerful.

Hosting a Django application on a CherryPy server

Recently at work I’ve had the requirement to host a Django application in a CherryPy server. I first looked for various projects I knew were doing just that. Unfortunately, after trying them I was rather disapointed. Their approach is to provide a command similar to the famous Django runserver‘s one but I’ve found it to be more complex than necessary. So I wrote my own module that performs those operations by staying much closer to how CherryPy does work, most specifically by using the process bus coming with CherryPy.

I’m sharing a stripped down version of the module I wrote which shows how one could host a Django application in a CherryPy server. Hopefully this might help some of you.

# Python stdlib imports
import sys
import logging
import os, os.path
 
# Third-party imports
import cherrypy
from cherrypy.process import wspbus, plugins
from cherrypy import _cplogging, _cperror
from django.conf import settings
from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponseServerError
 
class Server(object):
    def __init__(self):
        self.base_dir = os.path.join(os.path.abspath(os.getcwd()), "cpdjango")
 
        conf_path = os.path.join(self.base_dir, "..", "server.cfg")
        cherrypy.config.update(conf_path)
 
        # This registers a plugin to handle the Django app
        # with the CherryPy engine, meaning the app will
        # play nicely with the process bus that is the engine.
        DjangoAppPlugin(cherrypy.engine, self.base_dir).subscribe()
 
    def run(self):
        engine = cherrypy.engine
        engine.signal_handler.subscribe()
 
        if hasattr(engine, "console_control_handler"):
            engine.console_control_handler.subscribe()
 
        engine.start()
        engine.block()
 
class DjangoAppPlugin(plugins.SimplePlugin):
    def __init__(self, bus, base_dir):
        """
        CherryPy engine plugin to configure and mount
        the Django application onto the CherryPy server.
        """
        plugins.SimplePlugin.__init__(self, bus)
        self.base_dir = base_dir
 
    def start(self):
        self.bus.log("Configuring the Django application")
 
        # Well this isn't quite as clean as I'd like so
        # feel free to suggest something more appropriate
        from cpdjango.settings import *
        app_settings = locals().copy()
        del app_settings['self']
        settings.configure(**app_settings)
 
        self.bus.log("Mounting the Django application")
        cherrypy.tree.graft(HTTPLogger(WSGIHandler()))
 
        self.bus.log("Setting up the static directory to be served")
        # We server static files through CherryPy directly
        # bypassing entirely Django
        static_handler = cherrypy.tools.staticdir.handler(section="/", dir="static",
                                                          root=self.base_dir)
        cherrypy.tree.mount(static_handler, '/static')
 
class HTTPLogger(_cplogging.LogManager):
    def __init__(self, app):
        _cplogging.LogManager.__init__(self, id(self), cherrypy.log.logger_root)
        self.app = app
 
    def __call__(self, environ, start_response):
        """
        Called as part of the WSGI stack to log the incoming request
        and its response using the common log format. If an error bubbles up
        to this middleware, we log it as such.
        """
        try:
            response = self.app(environ, start_response)
            self.access(environ, response)
            return response
        except:
            self.error(traceback=True)
            return HttpResponseServerError(_cperror.format_exc())
 
    def access(self, environ, response):
        """
        Special method that logs a request following the common
        log format. This is mostly taken from CherryPy and adapted
        to the WSGI's style of passing information.
        """
        atoms = {'h': environ.get('REMOTE_ADDR', ''),
                 'l': '-',
                 'u': "-",
                 't': self.time(),
                 'r': "%s %s %s" % (environ['REQUEST_METHOD'], environ['REQUEST_URI'], environ['SERVER_PROTOCOL']),
                 's': response.status_code,
                 'b': str(len(response.content)),
                 'f': environ.get('HTTP_REFERER', ''),
                 'a': environ.get('HTTP_USER_AGENT', ''),
                 }
        for k, v in atoms.items():
            if isinstance(v, unicode):
                v = v.encode('utf8')
            elif not isinstance(v, str):
                v = str(v)
            # Fortunately, repr(str) escapes unprintable chars, \n, \t, etc
            # and backslash for us. All we have to do is strip the quotes.
            v = repr(v)[1:-1]
            # Escape double-quote.
            atoms[k] = v.replace('"', '\\"')
 
        try:
            self.access_log.log(logging.INFO, self.access_log_format % atoms)
        except:
            self.error(traceback=True)
 
if __name__ == '__main__':
    Server().run()

You can find the code along side a minimal Django application showing how this works here (BSD licence). I used Django 1.3 to generate a default project but the code above works well with older version of Django.

Edit 16/03/2012: Thanks to Damien Tougas, I’ve wrapped up a better recipe for hosting a Django application into a CherryPy application server.

WebSocket for CherryPy 3.2

Just a quick note about the first draft of support for WebSocket in CherryPy. You can find the code here.

Note that this is still work in progress but does work against Chrome and the pywebsocket echo client. It supports draft-76 of the specification only and I’m waiting for the working-group to settle a bit more before making any further modification.

The updated code has started integrating draft-06 as well but this is a work in progress.

Running CherryPy on Android with SL4A

CherryPy runs on Android thanks to the SL4A project. So if you feel like running Python and your own web server on your Android device, well you can just do so. You’ve probably not heard something that awesome since the pizza delivery guy rung the door.

How to get on about it? Well that’s the surprise, CherryPy in itself doesn’t need to be patched. Granted I haven’t tried all the various tools provided by CherryPy but the server and the dispatching works just fine.

First, you need get the CherryPy source code, build and copy the resulting cherrypy package into the SL4A scripts directory.

Once you’ve plugged your phone to your machine through USB, run the next commands:

$ svn co http://svn.cherrypy.org/trunk cp3-trunk
$ cd cp3-trunk
$ python setup.py build
$ cp -r build/lib.linux-i686-2.6/cherrypy/ /media/usb0/sl4a/scripts/

Just change the path to match your environment. That’s it.

Now you can copy your own script, let’s assume you use something like below:

# -*- coding: utf-8 -*-
import logging
# The multiprocessing package isn't
# part of the ASE installation so
# we must disable multiprocessing logging
logging.logMultiprocessing = 0
 
import android
import cherrypy
 
class Root(object):
    def __init__(self):
        self.droid = android.Android()
 
    @cherrypy.expose
    def index(self):
        self.droid.vibrate()
        return "Hello from my phone"
 
    @cherrypy.expose
    def location(self):
        location = self.droid.getLastKnownLocation().result
        location = location.get('network', location.get('gps'))
        return "LAT: %s, LON: %s" % (location['latitude'],
                                     location['longitude'])
 
def run():
    cherrypy.config.update({'server.socket_host': '0.0.0.0'})
    cherrypy.quickstart(Root(), '/')
 
if __name__ == '__main__':
    run()

As you can see we must disable the multiprocessing logging since the multiprocessing package isn’t included with SL4A.

Save that script on your computer as cpdroid.py for example. Copy that file into the scripts directory of SL4A.

$ cp cpdroid.py /media/usb0/sl4a/scripts/

Unplug your phone and go to the SL4A application. Click on the cpdroid.py script, it should start fine. Then from your browser, go to http://phone_IP:8080/ and tada! You can also go to the /location path to get the geoloc of your phone.

Integrating SQLAlchemy into a CherryPy application

Quite often, people come on the CherryPy IRC channel asking about the way to use SQLAlchemy with CherryPy. There are a couple of good recipes on the tools wiki but I find them a little complex to begin with. Not to the recipes’ fault, many people don’t necessarily know about CherryPy tools and plugins at that stage.

The following recipe will try to make the example complete whilst as simple as possible to allow folks to start up with SQLAlchemy and CherryPy.

# -*- coding: utf-8 -*-
import os, os.path
 
import cherrypy
from cherrypy.process import wspbus, plugins
 
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
 
# Helper to map and register a Python class a db table
Base = declarative_base()
 
class Message(Base):
    __tablename__ = 'message'
    id = Column(Integer, primary_key=True)
    value =  Column(String)
 
    def __init__(self, message):
        Base.__init__(self)
        self.value = message
 
    def __str__(self):
        return self.value.encode('utf-8')
 
    def __unicode__(self):
        return self.value
 
    @staticmethod
    def list(session):
        return session.query(Message).all()
 
 
class SAEnginePlugin(plugins.SimplePlugin):
    def __init__(self, bus):
        """
        The plugin is registered to the CherryPy engine and therefore
        is part of the bus (the engine *is* a bus) registery.
 
        We use this plugin to create the SA engine. At the same time,
        when the plugin starts we create the tables into the database
        using the mapped class of the global metadata.
 
        Finally we create a new 'bind' channel that the SA tool
        will use to map a session to the SA engine at request time.
        """
        plugins.SimplePlugin.__init__(self, bus)
        self.sa_engine = None
        self.bus.subscribe("bind", self.bind)
 
    def start(self):
        db_path = os.path.abspath(os.path.join(os.curdir, 'my.db'))
        self.sa_engine = create_engine('sqlite:///%s' % db_path, echo=True)
        Base.metadata.create_all(self.sa_engine)
 
    def stop(self):
        if self.sa_engine:
            self.sa_engine.dispose()
            self.sa_engine = None
 
    def bind(self, session):
        session.configure(bind=self.sa_engine)
 
class SATool(cherrypy.Tool):
    def __init__(self):
        """
        The SA tool is responsible for associating a SA session
        to the SA engine and attaching it to the current request.
        Since we are running in a multithreaded application,
        we use the scoped_session that will create a session
        on a per thread basis so that you don't worry about
        concurrency on the session object itself.
 
        This tools binds a session to the engine each time
        a requests starts and commits/rollbacks whenever
        the request terminates.
        """
        cherrypy.Tool.__init__(self, 'on_start_resource',
                               self.bind_session,
                               priority=20)
 
        self.session = scoped_session(sessionmaker(autoflush=True,
                                                  autocommit=False))
 
    def _setup(self):
        cherrypy.Tool._setup(self)
        cherrypy.request.hooks.attach('on_end_resource',
                                      self.commit_transaction,
                                      priority=80)
 
    def bind_session(self):
        cherrypy.engine.publish('bind', self.session)
        cherrypy.request.db = self.session
 
    def commit_transaction(self):
        cherrypy.request.db = None
        try:
            self.session.commit()
        except:
            self.session.rollback()  
            raise
        finally:
            self.session.remove()
 
 
 
 
class Root(object):
    @cherrypy.expose
    def index(self):
        # print all the recorded messages so far
        msgs = [str(msg) for msg in Message.list(cherrypy.request.db)]
        cherrypy.response.headers['content-type'] = 'text/plain'
        return "Here are your list of messages: %s" % '\n'.join(msgs)
 
    @cherrypy.expose
    def record(self, msg):
        # go to /record?msg=hello world to record a "hello world" message
        m = Message(msg)
        cherrypy.request.db.add(m)
        cherrypy.response.headers['content-type'] = 'text/plain'
        return "Recorded: %s" % m
 
if __name__ == '__main__':
    SAEnginePlugin(cherrypy.engine).subscribe()
    cherrypy.tools.db = SATool()
    cherrypy.tree.mount(Root(), '/', {'/': {'tools.db.on': True}})
    cherrypy.engine.start()
    cherrypy.engine.block()

The general idea is to use the plugin mechanism to register functions on an engine basis and enable a tool that will provide an access to the SQLAlchemy session at request time.

Using Jython as a CLI frontend to HBase

HBase, the well known non-relational distributed database, comes with a console program to perform various operations on a HBase cluster. I’ve personally found this tool to be a bit limited and I’ve toyed around the idea of writing my own. Since HBase only comes with a Java driver for direct access and the various RPC interfaces such as Thrift don’t offer the full set of functions over HBase, I decided to go for Jython and to directly use the Java API. This article will show a mock-up of such a tool.

The idea is to provide a simple Python API over the HBase one and couple it with a Python interpreter. This means, it offers the possibility to perform any Python (well Jython) operations whilst operating on HBase itself with an easier API than the Java one.

Note also that the tool uses the WSPBus already described in an earlier article to control the process itself. You will therefore need CherryPy’s latest revision.

# -*- coding: utf-8 -*-
import sys
import os
import code
import readline
import rlcompleter
 
from org.apache.hadoop.hbase import HBaseConfiguration, \
     HTableDescriptor, HColumnDescriptor
from org.apache.hadoop.hbase.client import HBaseAdmin, \
     HTable, Put, Get, Scan
 
import logging
from logging import handlers
 
from cherrypy.process import wspbus
from cherrypy.process import plugins
 
class StaveBus(wspbus.Bus):
    def __init__(self):
        wspbus.Bus.__init__(self)
        self.open_logger()
        self.subscribe("log", self._log)
 
        sig = plugins.SignalHandler(self)
        if sys.platform[:4] == 'java':
            del sig.handlers['SIGUSR1']
            sig.handlers['SIGUSR2'] = self.graceful
            self.log("SIGUSR1 cannot be set on the JVM platform. Using SIGUSR2 instead.")
 
            # See http://bugs.jython.org/issue1313
            sig.handlers['SIGINT'] = self._jython_handle_SIGINT
        sig.subscribe()
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)
 
    def _jython_handle_SIGINT(self, signum=None, frame=None):
        # See http://bugs.jython.org/issue1313
        self.log('Keyboard Interrupt: shutting down bus')
        self.exit()
 
class HbaseConsolePlugin(plugins.SimplePlugin):
    def __init__(self, bus):
        plugins.SimplePlugin.__init__(self, bus)
        self.console = HbaseConsole()
 
    def start(self):
        self.console.setup()
        self.console.run()
 
class HbaseConsole(object):
    def __init__(self):
        # we provide this instance to the underlying interpreter
        # as the interface to operate on HBase
        self.namespace = {'c': HbaseCommand()}
 
    def setup(self):
        readline.set_completer(rlcompleter.Completer(self.namespace).complete)
        readline.parse_and_bind("tab:complete")
        import user
 
    def run(self):
        code.interact(local=self.namespace)
 
class HbaseCommand(object):
    def __init__(self, conf=None, admin=None):
        self.conf = conf
        if not conf:
            self.conf = HBaseConfiguration()
        self.admin = admin
        if not admin:
            self.admin = HBaseAdmin(self.conf)
 
    def table(self, name):
        return HTableCommand(name, self.conf, self.admin)
 
    def list_tables(self):
        return self.admin.listTables().tolist()
 
class HTableCommand(object):
    def __init__(self, name, conf, admin):
        self.conf = conf
        self.admin = admin
        self.name = name
        self._table = None
 
    def row(self, name):
        if not self._table:
            self._table = HTable(self.conf, self.name)
        return HRowCommand(self._table, name)
 
    def create(self, families=None):
        desc = HTableDescriptor(self.name)
        if families:
            for family in families:
                desc.addFamily(HColumnDescriptor(family))
        self.admin.createTable(desc)
        self._table = HTable(self.conf, self.name)
        return self._table
 
    def scan(self, start_row=None, end_row=None, filter=None):
        if not self._table:
            self._table = HTable(self.conf, self.name)
 
        sc = None
        if start_row and filter:
            sc = Scan(start_row, filter)
        elif start_row and end_row:
            sc = Scan(start_row, end_row)
        elif start_row:
            sc = Scan(start_row)
        else:
            sc = Scan()
        s = self._table.getScanner(sc)
        while True:
            r = s.next()
            if r is None:
                raise StopIteration()
 
            yield r
 
    def delete(self):
        self.disable()
        self.admin.deleteTable(self.name)
 
    def disable(self):
        self.admin.disableTable(self.name)
 
    def enable(self):
        self.admin.enableTable(self.name)
 
    def exists(self):
        return self.admin.tableExists(self.name)
 
    def list_families(self):
        desc = HTableDescriptor(self.name)
        return desc.getColumnFamilies()
 
class HRowCommand(object):
    def __init__(self, table, rowname):
        self.table = table
        self.rowname = rowname
 
    def put(self, family, column, value):
        p = Put(self.rowname)
        p.add(family, column, value)
        self.table.put(p)
 
    def get(self, family, column):
        r = self.table.get(Get(self.rowname))
        v = r.getValue(family, column)
        if v is not None:
            return v.tostring()
 
 
if __name__ == '__main__':
    bus = StaveBus()
    HbaseConsolePlugin(bus).subscribe()
    bus.start()
    bus.block()

To test the tool, you can simply grab the latest copy of HBase and run:

hbase-0.20.4$ ./bin/start-hbase.sh

Then you need to configure your classpath so that it includes all the HBase dependencies. To determine them:

$ ps auwx|grep java|grep org.apache.hadoop.hbase.master.HMaster|perl -pi -e "s/.*classpath //"

Copy the full list of jars and export CLASSPATH with it. (This is from the HBase wiki on Jython and HBase).

Next you have to add an extra jar to the classpath so that Jython supports readline:

$ export CLASSPATH=$CLASSPATH:$HOME/jython2.5.1/extlibs/libreadline-java-0.8.jar

Make sure you’ll install libreadline-java as well.

Now, that your environment is setup, save the code above under a script named stave.py and run it as follow:

$ jython stave.py
Python 2.5.1 (Release_2_5_1:6813, Sep 26 2009, 13:47:54) 
[Java HotSpot(TM) Server VM (Sun Microsystems Inc.)] on java1.6.0_20
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> c.table('myTable').create(families=['aFamily:'])
>>> c.table('myTable').list_families()
array(org.apache.hadoop.hbase.HColumnDescriptor)
>>> c.table('myTable').row('aRow').put('aFamily', 'aColumn', 'hello world!')
>>> c.table('myTable').row('aRow').get('aFamily', 'aColumn')
'hello world!'
>>> list(c.table('myTable').scan())
[keyvalues={aRow/aFamily:aColumn/1277645421824/Put/vlen=12}]

You can import any Python module available to your Jython environment as well of course.

I will probably extend this tool over time but in the meantime I hope you’ll find it a useful canvas to operate HBase.

Managing your process with the CherryPy’s bus

CherryPy is a successful small web framework which over the years has built up its performances as well as its stability. To do so, Robert Brewer, the main CherryPy’s architect has introduced what is called the Web Site Process Bus (WSPBus). The idea is to manage a Python process by providing it with a bus to which one can publish or subscribe for events. CherryPy’s implementation of the bus comes with a set of pubsub handlers for very basic operations such as responding to system signals, handle thread creation and deletion, drop process privileges and handle PID files. The bus mechanism can help your handling of sub-processes so that they start, run and terminates gracefully. Let’s see how.

Create your bus

First, you need to create a bus instance. This could be as simple as this.

1
2
from cherrypy.process import wspbus
bus = wspbus.Bus()

If you want to log through the bus, you will need further work since the bus doesn’t create a logger by default. Let’s see an example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
import logging
from logging import handlers
 
from cherrypy.process import wspbus
 
class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)

Not much, just creating a logger and subscribing the bus log channel to an instance method.

Associate the bus with the main process

Before we move on to the management of sub-process, let’s see how we can manage the main Python process already with our bus above.

For this, let’s imagine a bank placing stock orders, those orders will be handled by a broker running in a sub-process.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import random
import string
from multiprocessing import Process
 
class Bank(object):
    def __init__(self, queue):
        self.bus = MyBus(Bank.__name__)
        self.queue = queue
        self.bus.subscribe("main", self.randomly_place_order)
        self.bus.subscribe("exit", self.terminate)
 
    def randomly_place_order(self):
        order = random.sample(['BUY', 'SELL'], 1)[0]
        code = random.sample(string.ascii_uppercase, 4)
        amount = random.randint(0, 100)
 
        message = "%s %s %d" % (order, ''.join(code), amount)
 
        self.bus.log("Placing order: %s" % message)
 
        self.queue.put(message)
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def terminate(self):
        self.bus.unsubscribe("main", self.randomly_place_order)
        self.bus.unsubscribe("exit", self.terminate)

As you can see, not much again here, we simply associate a bus with the bank object. We also register to the exit channel of the bus so that when we terminated, we can do some cleanup. It’s good use to unregister from the bus.

We don’t actually care where those orders come from so we randomly generate them. The orders are placed every time the bus iterates its loop. This is done by attaching to the main channel of the bus.

We use a process queue to communicate with the broker’s sub-process.

Associate the bus with a sub-process

Handling the sub-process is actually similar to handling the main process. Let’s see the broker implementation for example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from Queue import Empty
 
class Broker(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue
        self.bus = MyBus(Broker.__name__)
        self.bus.subscribe("main", self.check)
 
    def check(self):
        try:
            message = self.queue.get_nowait()
        except Empty:
            return
 
        if message == "stop":
            self.bus.unsubscribe("main", self.check)
            self.bus.exit()
        elif message.startswith("BUY"):
            self.buy(*message.split(' ', 2)[1:])
        elif message.startswith("SELL"):
            self.sell(*message.split(' ', 2)[1:])
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def stop(self):
        self.queue.put("stop")
 
    def buy(self, code, amount):
        self.bus.log("BUY order placed for %s %s" % (amount, code))
 
    def sell(self, code, amount):
        self.bus.log("SELL order placed for %s %s" % (amount, code))

Several things are to be noticed. First we register once again to the bus’ main channel a method that checks the shared queue for incoming data. Whenever the incoming message is “stop”, we exit the bus altogether, thus leaving the sub-process, since it was blocked on the bus loop.

Note that the stop method could be called by the parent process if you needed to programatically stop the sub-process.

Put it all together

Run the code above as follow:

1
2
3
4
5
6
7
8
9
if __name__ == '__main__':
    from multiprocessing import Queue
    queue = Queue()
 
    broker = Broker(queue)
    broker.start()
 
    bank = Bank(queue)
    bank.run()

This creates the shared queue, starts the sub-process that runs the broker and finally starts the bank within the main process.

You should see a bunch of messages in the console and if you hit Ctrl-C, this will stop both processes cleanly.

And here we are, we now manage processes and sub-processes with a clean solution. The CherryPy process bus is an elegant add-on to your toolbox that I can only highly advise to consider in the future. The WSPBus implementation is part of the main CherryPy package (CherryPy 3.x), so you’ll have to install it all, even if you don’t require the HTTP framework. But don’t let that hold you back since the HTTP framework isn’t required for the bus to be used.

Happy coding!

The code is here.

CherryPy 3.1 has been released

Yesterday Robert Brewer released version 3.1 of the CherryPy product and I think this calls for a hurray! For those who’ve been using 3.0 they’ll be happy to know that the upgrade will be rather smooth and straightforward. The main API changes have taken place on the engine thanks to the new process bus but the rest of the API is pretty much identical. Overall the fantastic work Robert and many contributors have done was to fix remaining and new bugs making this release the most stable and high-performance of the CherryPy releases. Get it while it’s hot.

CherryPy in the field

Michael Schurter just posted a message on the main CherryPy users mailing-list asking developers using CherryPy to let the project team know about it. I want to support the idea as I would love our dusty success stories page being updated with new entries. I have a feeling that CherryPy is used quite a lot but mainly as a light HTTP framework in administration tools context and those projects quite likely are internals and don’t have much visibility outside of their scope. Nonetheless we’d be interested in knowing where CherryPy is used so please let us know.