Servers

An example of the simplest server was presented in the Introduction. A single server running in a single process may be useful, but there are a million alternatives to pylm for that. The real usefulness of pylm arrives when the workload is so large that a single server is not capable of handling it. Here we introduce parallelism for the first time with the parallel standalone server.

A simple picture of the architecture of a parallel server is presented in the next figure. The client connects to a master process that manages an arbitrary number of workers. The workers act as slaves, and connect only to the master.

_images/parallel.png

Example of a pair client-master with workers for load balancing

The following is a simple example on how to configure and run a parallel server. Since the parallel server is designed to handle a large workload, the job method of the client expects a generator that creates a series of binary messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from pylm.servers import Master


server = Master(name='server',
                pull_address='tcp://127.0.0.1:5555',
                pub_address='tcp://127.0.0.1:5556',
                worker_pull_address='tcp://127.0.0.1:5557',
                worker_push_address='tcp://127.0.0.1:5558',
                db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from pylm.servers import Worker
from uuid import uuid4
import sys


class MyWorker(Worker):
    def foo(self, message):
        return self.name.encode('utf-8') + b' processed ' + message

server = MyWorker(str(uuid4()), 'tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()
    
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from pylm.clients import Client
from itertools import repeat

client = Client('server', 'tcp://127.0.0.1:5559')

if __name__ == '__main__':
    for response in client.job('server.foo',
                               repeat(b'a message', 10),
                               messages=10):
        print(response)

The master can be run as follows:

$> python master.py

And we can launch two workers as follows:

$> python worker.py worker1
$> python worker.py worker2

Finally, here’s how to run the client and its output:

$> python client.py
b'worker2 processed a message'
b'worker1 processed a message'
b'worker2 processed a message'
b'worker1 processed a message'
b'worker2 processed a message'
b'worker1 processed a message'
b'worker2 processed a message'
b'worker1 processed a message'
b'worker2 processed a message'
b'worker1 processed a message'

The communication between the master and the workers is a PUSH-PULL queue of ZeroMQ. This means that the most likely distribution pattern between the master and the workers follows a round-robin scheduling.

Again, this simple example shows very little of the capabilities of this pattern in pylm. We’ll introduce features step by step creating a manager with more and more capabilities.

Cache

One of the services that the master offers is a small key-value database that can be seen by all the workers. You can use that database with RPC-style using pylm.clients.Client.set(), pylm.clients.Client.get(), and pylm.clients.Client.delete() methods. Like the messages, the data to be stored in the database must be binary.

Note

Note that the calling convention of pylm.clients.Client.set() is not that conventional. Remember to pass first the value, and then the key if you want to use your own.

Important

The master stores the data in memory. Have that in mind if you plan to send lots of data to the master.

The following example is a little modification from the previous example. The client, previously to sending the job, it sets a value in the temporary cache of the master server. The workers, where the value of the cached variable is hardcoded within the function that is executed, get the value and they use it to build the response. The variations respect to the previous examples have been empasized.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from pylm.servers import Master

server = Master(name='server',
                pull_address='tcp://127.0.0.1:5555',
                pub_address='tcp://127.0.0.1:5556',
                worker_pull_address='tcp://127.0.0.1:5557',
                worker_push_address='tcp://127.0.0.1:5558',
                db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from pylm.servers import Worker
import sys


class MyWorker(Worker):
    def foo(self, message):
        data = self.get('cached')
        return self.name.encode('utf-8') + data + message

server = MyWorker(sys.argv[1],
                  db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from pylm.clients import Client
from itertools import repeat

client = Client('server', 'tcp://127.0.0.1:5559')

if __name__ == '__main__':
    client.set(b' cached data ', 'cached')
    print(client.get('cached'))
    
    for response in client.job('server.foo', repeat(b'a message', 10), messages=10):
        print(response)

And the output is the following:

$> python client.py
b' cached data '
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'

Scatter messages from the master to the workers

Master server has a useful method called pylm.servers.Master.scatter(), that is in fact a generator. For each message that the master gets from the inbound socket, this generator is executed. It is useful to modify the message stream in any conceivable way. In the following example, right at the highlighted lines, a new master server overrides this scatter generator with a new one that sends the message it gets three times.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from pylm.servers import Master


class MyMaster(Master):
    def scatter(self, message):
        for i in range(3):
            yield message

server = MyMaster(name='server',
                  pull_address='tcp://127.0.0.1:5555',
                  pub_address='tcp://127.0.0.1:5556',
                  worker_pull_address='tcp://127.0.0.1:5557',
                  worker_push_address='tcp://127.0.0.1:5558',
                  db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()

The workers are identical than in the previous example. Since each message that the client sends to the master is repeated three times, the client expects 30 messages instead of 10.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from pylm.clients import Client
from itertools import repeat

client = Client('server', 'tcp://127.0.0.1:5559')

if __name__ == '__main__':
    client.set(b' cached data ', 'cached')
    print(client.get('cached'))
    
    for response in client.job('server.foo', repeat(b'a message', 10), messages=30):
        print(response)

This is the (partially omitted) output of the client:

$> python client.py
b' cached data '
b'worker1 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'

...

b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'

Gather messages from the workers

You can also alter the message stream after the workers have done their job. The master server also includes a pylm.servers.Master.gather() method, that is a generator too, that is executed for each message. Being a generator, this means that gather has to yield, and that the result can be either no message, or an arbitrary amount of messages. To make this example a little more interesting, we will also disassemble one of these messages that were apparently just a bunch of bytes.

We will define a gather generator that counts the amount of messages, and when the message number 30 arrives, the final message, its payload is changed with a different binary string. This means that we need to add an attribute to the server for the counter, and we have to modify a message with pylm.servers.Master.change_payload().

See that this example is incremental respect to the previous one, and in consequence it uses the cache service and the scatter and the gather generators.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from pylm.servers import Master


class MyMaster(Master):
    def __init__(self, *args, **kwargs):
        self.counter = 0
        super(MyMaster, self).__init__(*args, **kwargs)
    
    def scatter(self, message):
        for i in range(3):
            yield message

    def gather(self, message):
        self.counter += 1

        if self.counter == 30:
            yield self.change_payload(message, b'final message')
        else:
            yield message

server = MyMaster(name='server',
                  pull_address='tcp://127.0.0.1:5555',
                  pub_address='tcp://127.0.0.1:5556',
                  worker_pull_address='tcp://127.0.0.1:5557',
                  worker_push_address='tcp://127.0.0.1:5558',
                  db_address='tcp://127.0.0.1:5559')

if __name__ == '__main__':
    server.start()
$> python client.py
b' cached data '
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker2 cached data a message'

...

b'worker2 cached data a message'
b'worker1 cached data a message'
b'worker2 cached data a message'
b'worker1 cached data a message'
b'Final message'