Server features

All servers have built-in features that are useful to build a manageable cluster. This section explains how to use and to configure them. It builds upon the examples of Servers.

Errors

You are probably wondering what happens if there is a bug in any of your functions. Of course, your server will not crash. You must try really hard to force one exception in one of the servers and crash it completely. The user part of the server runs within an exception handler, that outputs the full exception traceback without side effects.

For instance, take the simplest of the examples, the one in the introduction, and add an obvious bug in the foo function.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from pylm.servers import Server


class MyServer(Server):
    def foo(self, message):
        self.logger.warning('Got a message')
        print(x)
        return b'you sent me ' + message


if __name__ == '__main__':
    server = MyServer('my_server', 'tcp://127.0.0.1:5555',
                      'tcp://127.0.0.1:5556', 'tcp://127.0.0.1:5557')
    server.start()

Of course, this triggers a NameError, because the variable x was not defined within the user function. The result is that the server logs the error:

$> python server.py
2016-08-26 09:41:59,053 - my_server - WARNING - Got a message
2016-08-26 09:41:59,053 - my_server - ERROR - User function gave an error
2016-08-26 09:41:59,054 - my_server - ERROR - Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.5/site-packages/pylm/servers.py", line 117, in start
    result = user_function(message.payload)
  File "server.py", line 7, in foo
    print(x)
NameError: name 'x' is not defined

...

After the error has been logged as such, the server keeps on running and waiting for more input.

Logging

Each server, independently on its variant, has a built-in logger with the usual Python’s logging levels. You can find them in the logging module of the standard library. The following example, that builds upon the previous one illustrates how to use the logging capabilities.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pylm.servers import Master
import logging


class MyMaster(Master):
    def __init__(self, *args, **kwargs):
        self.counter = 0
        super(MyMaster, self).__init__(*args, **kwargs)
    
    def scatter(self, message):
        for i in range(3):
            yield message

    def gather(self, message):
        self.counter += 1

        if self.counter == 30:
            self.logger.critical('Changing the payload of the message')
            yield self.change_payload(message, b'final message')
        else:
            yield message

server = MyMaster(name='server',
                  db_address='tcp://127.0.0.1:5559',
                  pull_address='tcp://127.0.0.1:5555',
                  pub_address='tcp://127.0.0.1:5556',
                  worker_pull_address='tcp://127.0.0.1:5557',
                  worker_push_address='tcp://127.0.0.1:5558',
                  log_level=logging.WARNING)

if __name__ == '__main__':
    server.start()

The server sets the WARNING logging level, and then logs as critical when it changes the payload of the last message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pylm.servers import Worker
import sys


class MyWorker(Worker):
    def __init__(self, *args, **kwargs):
        self.ncalls = 0
        super(MyWorker, self).__init__(*args, **kwargs)
    
    def foo(self, message):
        self.ncalls += 1
        data = self.get('cached')

        if self.ncalls%10 == 0:
            self.logger.info('Processed {} messages'.format(self.ncalls))
            
        return self.name.encode('utf-8') + data + message

server = MyWorker(sys.argv[1], db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()

The worker server implementation just adds a counter, and each time it processes ten messages, it logs as info the number of messages processed. The output of the master is then:

$> python master.py
2016-08-26 08:08:38,679 - server - CRITICAL - Changing the payload of the message

And the output of any of the workers (the two workers are probably doing exactly the same amount of work) is:

$> python worker.py worker1
2016-08-26 08:08:38,672 - worker1 - INFO - Processed 10 messages

Playing with the stream of messages

Servers have an interesting feature that can be used to seriously tune how the message stream moves through the cluster. In a couple of sections you will learn about pipelines (The Pipeline component) and hubs (The Hub server), and how several steps can be connected forming a complete streaming pipeline of messages.