Managing your process with the CherryPy’s bus

CherryPy is a successful small web framework which over the years has built up its performances as well as its stability. To do so, Robert Brewer, the main CherryPy’s architect has introduced what is called the Web Site Process Bus (WSPBus). The idea is to manage a Python process by providing it with a bus to which one can publish or subscribe for events. CherryPy’s implementation of the bus comes with a set of pubsub handlers for very basic operations such as responding to system signals, handle thread creation and deletion, drop process privileges and handle PID files. The bus mechanism can help your handling of sub-processes so that they start, run and terminates gracefully. Let’s see how.

Create your bus

First, you need to create a bus instance. This could be as simple as this.

1
2
from cherrypy.process import wspbus
bus = wspbus.Bus()

If you want to log through the bus, you will need further work since the bus doesn’t create a logger by default. Let’s see an example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
import logging
from logging import handlers
 
from cherrypy.process import wspbus
 
class MyBus(wspbus.Bus):
    def __init__(self, name=""):
        wspbus.Bus.__init__(self)
        self.open_logger(name)
        self.subscribe("log", self._log)
 
    def exit(self):
        wspbus.Bus.exit(self)
        self.close_logger()
 
    def open_logger(self, name=""):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.INFO)
        h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
        logger.addHandler(h)
 
        self.logger = logger
 
    def close_logger(self):
        for handler in self.logger.handlers:
            handler.flush()
            handler.close()
 
    def _log(self, msg="", level=logging.INFO):
        self.logger.log(level, msg)

Not much, just creating a logger and subscribing the bus log channel to an instance method.

Associate the bus with the main process

Before we move on to the management of sub-process, let’s see how we can manage the main Python process already with our bus above.

For this, let’s imagine a bank placing stock orders, those orders will be handled by a broker running in a sub-process.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import random
import string
from multiprocessing import Process
 
class Bank(object):
    def __init__(self, queue):
        self.bus = MyBus(Bank.__name__)
        self.queue = queue
        self.bus.subscribe("main", self.randomly_place_order)
        self.bus.subscribe("exit", self.terminate)
 
    def randomly_place_order(self):
        order = random.sample(['BUY', 'SELL'], 1)[0]
        code = random.sample(string.ascii_uppercase, 4)
        amount = random.randint(0, 100)
 
        message = "%s %s %d" % (order, ''.join(code), amount)
 
        self.bus.log("Placing order: %s" % message)
 
        self.queue.put(message)
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def terminate(self):
        self.bus.unsubscribe("main", self.randomly_place_order)
        self.bus.unsubscribe("exit", self.terminate)

As you can see, not much again here, we simply associate a bus with the bank object. We also register to the exit channel of the bus so that when we terminated, we can do some cleanup. It’s good use to unregister from the bus.

We don’t actually care where those orders come from so we randomly generate them. The orders are placed every time the bus iterates its loop. This is done by attaching to the main channel of the bus.

We use a process queue to communicate with the broker’s sub-process.

Associate the bus with a sub-process

Handling the sub-process is actually similar to handling the main process. Let’s see the broker implementation for example.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from Queue import Empty
 
class Broker(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue
        self.bus = MyBus(Broker.__name__)
        self.bus.subscribe("main", self.check)
 
    def check(self):
        try:
            message = self.queue.get_nowait()
        except Empty:
            return
 
        if message == "stop":
            self.bus.unsubscribe("main", self.check)
            self.bus.exit()
        elif message.startswith("BUY"):
            self.buy(*message.split(' ', 2)[1:])
        elif message.startswith("SELL"):
            self.sell(*message.split(' ', 2)[1:])
 
    def run(self):
        self.bus.start()
        self.bus.block(interval=0.01)
 
    def stop(self):
        self.queue.put("stop")
 
    def buy(self, code, amount):
        self.bus.log("BUY order placed for %s %s" % (amount, code))
 
    def sell(self, code, amount):
        self.bus.log("SELL order placed for %s %s" % (amount, code))

Several things are to be noticed. First we register once again to the bus’ main channel a method that checks the shared queue for incoming data. Whenever the incoming message is “stop”, we exit the bus altogether, thus leaving the sub-process, since it was blocked on the bus loop.

Note that the stop method could be called by the parent process if you needed to programatically stop the sub-process.

Put it all together

Run the code above as follow:

1
2
3
4
5
6
7
8
9
if __name__ == '__main__':
    from multiprocessing import Queue
    queue = Queue()
 
    broker = Broker(queue)
    broker.start()
 
    bank = Bank(queue)
    bank.run()

This creates the shared queue, starts the sub-process that runs the broker and finally starts the bank within the main process.

You should see a bunch of messages in the console and if you hit Ctrl-C, this will stop both processes cleanly.

And here we are, we now manage processes and sub-processes with a clean solution. The CherryPy process bus is an elegant add-on to your toolbox that I can only highly advise to consider in the future. The WSPBus implementation is part of the main CherryPy package (CherryPy 3.x), so you’ll have to install it all, even if you don’t require the HTTP framework. But don’t let that hold you back since the HTTP framework isn’t required for the bus to be used.

Happy coding!

The code is here.

11 thoughts on “Managing your process with the CherryPy’s bus

  1. Robert Brewer

    You don’t need to install CherryPy; I’d actually prefer it if people copy-and-pasted the Bus implementation from CherryPy into other projects, and then modified it with new plugins as needed. The grate thing about the Bus design is it’s a protocol, not a class–so reimplement it to your heart’s content :)

  2. Roman

    What advantages does it give comparing to existing message bus implementations, e.g. rabbitmq or dbus, except the fact that it doesn’t requite to configure and start additional daemons?

  3. Sylvain Hellegouarch Post author

    @Roman, they don’t aim at the same goal. The original idea behind the WSPBus was to provide a unified mechanism to manage a Python process’s states. Robert’s proposal was to design an event bus. It could have been designed in a different fashion, not requiring a bus.

    Now, because Robert’s specification defines a bus, an implementation could be using AMQP or dbus. The current implementation was not trying to simplify inter-process communication. I just thought my example would more interesting that way.

  4. David

    Sylvain,
    I am just starting to dig into Cherrypy’s bus implementation using your example as a template to learn from. One of the things I am missing or don’t understand is the “main” channel/event. When running your code, the console prints out that both bank & broker were starting then started but then the whole stack hangs/halts.

    Was the Bank class supposed to be a subclass of Process as well or am I missing something?

  5. Sylvain Hellegouarch Post author

    David,

    The process shouldn’t hang nor halt. Instead you should see messages between the main process and the sub-process such as:

    [2010-08-22 11:15:07,349] Bank – INFO – Placing order: BUY DXBZ 20
    [2010-08-22 11:15:07,358] Broker – INFO – BUY order placed for 20 DXBZ

    The main channel is called each time the bus performs one iteration. The Bank class doesn’t to subclass Process since it runs in the main Python process.

    Which version of CherryPy are you running? My code was tested against the last trunk version but should work with 3.2rc1.

  6. David

    Alright, now it all makes sense. I am using 3.1.2 which has a slightly different implementation of Bus.block & Bus.wait that doesn’t public to “main” on a set interval.

  7. bob

    random.sample([‘BUY’, ‘SELL’], 1)[0]
    should be written as
    random.choice([‘BUY’, ‘SELL’])

  8. Fer

    Hello,

    What is the difference with the plugins.SimplePlugin class, and the engin bus to communicate?

  9. paul

    i would also like to know this:-

    Hello,

    What is the difference with the plugins.SimplePlugin class, and the engin bus to communicate?

Comments are closed.