Examples¶
Simple server and client communication¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from pylm.servers import Server
import logging
class MyServer(Server):
def foo(self, message):
self.logger.warning('Got a message')
return b'you sent me ' + message
if __name__ == '__main__':
server = MyServer('my_server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 | from pylm.clients import Client
client = Client('my_server', 'tcp://127.0.0.1:5555')
if __name__ == '__main__':
result = client.eval('my_server.foo', b'a message')
print('Client got: ', result)
|
Simple parallel server and client communication¶

1 2 3 4 5 6 7 8 9 10 11 12 | from pylm.servers import Master
server = Master(name='server',
pull_address='tcp://127.0.0.1:5555',
pub_address='tcp://127.0.0.1:5556',
worker_pull_address='tcp://127.0.0.1:5557',
worker_push_address='tcp://127.0.0.1:5558',
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from pylm.servers import Worker
from uuid import uuid4
import sys
class MyWorker(Worker):
def foo(self, message):
return self.name.encode('utf-8') + b' processed ' + message
server = MyWorker(str(uuid4()), 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
for response in client.job('server.foo',
repeat(b'a message', 10),
messages=10):
print(response)
|
Cache operation for the standalone parallel version¶
1 2 3 4 5 6 7 8 9 10 11 | from pylm.servers import Master
server = Master(name='server',
pull_address='tcp://127.0.0.1:5555',
pub_address='tcp://127.0.0.1:5556',
worker_pull_address='tcp://127.0.0.1:5557',
worker_push_address='tcp://127.0.0.1:5558',
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from pylm.servers import Worker
import sys
class MyWorker(Worker):
def foo(self, message):
data = self.get('cached')
return self.name.encode('utf-8') + data + message
server = MyWorker(sys.argv[1],
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
client.set(b' cached data ', 'cached')
print(client.get('cached'))
for response in client.job('server.foo', repeat(b'a message', 10), messages=10):
print(response)
|
Usage of the scatter function¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from pylm.servers import Master
class MyMaster(Master):
def scatter(self, message):
for i in range(3):
yield message
server = MyMaster(name='server',
pull_address='tcp://127.0.0.1:5555',
pub_address='tcp://127.0.0.1:5556',
worker_pull_address='tcp://127.0.0.1:5557',
worker_push_address='tcp://127.0.0.1:5558',
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from pylm.servers import Worker
import sys
class MyWorker(Worker):
def foo(self, message):
data = self.get('cached')
return self.name.encode('utf-8') + data + message
server = MyWorker(sys.argv[1],
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
client.set(b' cached data ', 'cached')
print(client.get('cached'))
for response in client.job('server.foo', repeat(b'a message', 10), messages=30):
print(response)
|
Usage of the gather function¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | from pylm.servers import Master
class MyMaster(Master):
def __init__(self, *args, **kwargs):
self.counter = 0
super(MyMaster, self).__init__(*args, **kwargs)
def scatter(self, message):
for i in range(3):
yield message
def gather(self, message):
self.counter += 1
if self.counter == 30:
yield self.change_payload(message, b'final message')
else:
yield message
server = MyMaster(name='server',
pull_address='tcp://127.0.0.1:5555',
pub_address='tcp://127.0.0.1:5556',
worker_pull_address='tcp://127.0.0.1:5557',
worker_push_address='tcp://127.0.0.1:5558',
db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 | from pylm.servers import Worker
import sys
class MyWorker(Worker):
def foo(self, message):
data = self.get('cached')
return self.name.encode('utf-8') + data + message
server = MyWorker(sys.argv[1], db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
client.set(b' cached data ', 'cached')
print(client.get('cached'))
for response in client.job('server.foo', repeat(b'a message', 10), messages=30):
print(response)
|
A pipelined message stream¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from pylm.servers import Server
import logging
class MyServer(Server):
def foo(self, message):
self.logger.warning('Got a message')
return b'you sent me ' + message
if __name__ == '__main__':
server = MyServer('my_server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
pipelined=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from pylm.servers import Pipeline
import logging
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.warning('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5560',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5561',
previous='my_server',
to_client=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 | from pylm.clients import Client
client = Client('my_server', 'tcp://127.0.0.1:5555',
sub_address='tcp://127.0.0.1:5561')
if __name__ == '__main__':
result = client.eval(['my_server.foo', 'my_pipeline.foo'], b'a message')
print('Client got: ', result)
|
A pipelined message stream forming a tee¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from pylm.servers import Server
import logging
class MyServer(Server):
def foo(self, message):
self.logger.warning('Got a message')
return b'you sent me ' + message
if __name__ == '__main__':
server = MyServer('my_server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
pipelined=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from pylm.servers import Pipeline
import logging
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.warning('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5560',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5561',
previous='my_server',
to_client=True,
log_level=logging.DEBUG
)
server.start()
|
Important
If the method of a pipeline does not return any value, pylm assumes that no message has to be delivered
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from pylm.servers import Pipeline
import logging
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.warning('Just echo, nothing else')
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5570',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5571',
previous='my_server',
to_client=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 | from pylm.clients import Client
client = Client('my_server', 'tcp://127.0.0.1:5555',
sub_address='tcp://127.0.0.1:5561')
if __name__ == '__main__':
result = client.eval(['my_server.foo', 'my_pipeline.foo'], b'a message')
print('Client got: ', result)
|
A pipelined message stream forming a tee and controls the stream of messages¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | from pylm.servers import Server
class MyServer(Server):
def __init__(self, *args, **kwargs):
super(MyServer, self).__init__(*args, **kwargs)
self.counter = 0
def foo(self, message):
self.logger.info('Got a message')
return b'you sent me ' + message
def handle_stream(self, message):
# if message is even
if self.counter % 2 == 0:
self.logger.info('Even')
topic = 'even'
else:
self.logger.info('Odd')
topic = 'odd'
# Remember to increment the stage
message.stage += 1
# Increment the message counter
self.counter += 1
return topic, message
if __name__ == '__main__':
server = MyServer('my_server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
pipelined=True)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from pylm.servers import Pipeline
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.info('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5560',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5561',
previous='even',
to_client=True)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | from pylm.servers import Pipeline
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.info('Echo: {}'.format(message.decode('utf-8')))
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5570',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5571',
previous='odd',
to_client=True)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('my_server', 'tcp://127.0.0.1:5555',
sub_address='tcp://127.0.0.1:5561')
if __name__ == '__main__':
for response in client.job(['my_server.foo', 'my_pipeline.foo'],
repeat(b'a message', 10),
messages=5):
print('Client got: ', response)
|
A pipelined message stream forming a tee and controls the stream of messages with a sink¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | from pylm.servers import Server
class MyServer(Server):
def __init__(self, *args, **kwargs):
super(MyServer, self).__init__(*args, **kwargs)
self.counter = 0
def foo(self, message):
self.logger.info('Got a message')
return b'you sent me ' + message
def handle_stream(self, message):
# if message is even
if self.counter % 2 == 0:
self.logger.info('Even')
topic = 'even'
else:
self.logger.info('Odd')
topic = 'odd'
# Remember to increment the stage
message.stage += 1
# Increment the message counter
self.counter += 1
return topic, message
if __name__ == '__main__':
server = MyServer('my_server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
pipelined=True)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from pylm.servers import Pipeline
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.info('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5570',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5571',
previous='odd',
to_client=False)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | from pylm.servers import Pipeline
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.info('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5560',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5561',
previous='even',
to_client=False)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from pylm.servers import Sink
import logging
class MySink(Sink):
def foo(self, message):
self.logger.warning('Got a message')
return b'and gathered ' + message
if __name__ == '__main__':
server = MySink('my_sink',
db_address='tcp://127.0.0.1:5580',
sub_addresses=['tcp://127.0.0.1:5561', 'tcp://127.0.0.1:5571'],
pub_address='tcp://127.0.0.1:5581',
previous=['my_pipeline', 'my_pipeline'],
to_client=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('my_server', 'tcp://127.0.0.1:5555',
sub_address='tcp://127.0.0.1:5581')
if __name__ == '__main__':
for response in client.job(['my_server.foo', 'my_pipeline.foo', 'my_sink.foo'],
repeat(b'a message', 10),
messages=10):
print('Client got: ', response)
|
Connecting a pipeline to a master¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from pylm.servers import Master
import logging
server = Master(name='server',
pull_address='tcp://127.0.0.1:5555',
pub_address='tcp://127.0.0.1:5556',
worker_pull_address='tcp://127.0.0.1:5557',
worker_push_address='tcp://127.0.0.1:5558',
db_address='tcp://127.0.0.1:5559',
pipelined=True)
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | from pylm.servers import Pipeline
import logging
class MyPipeline(Pipeline):
def foo(self, message):
self.logger.info('Got a message')
return b'and I pipelined ' + message
if __name__ == '__main__':
server = MyPipeline('my_pipeline',
db_address='tcp://127.0.0.1:5560',
sub_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5561',
previous='server',
to_client=True)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 | from pylm.servers import Worker
import sys
class MyWorker(Worker):
def foo(self, message):
self.logger.info('Processed')
return self.name.encode('utf-8') + b' processed ' + message
server = MyWorker(sys.argv[1], 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559',
sub_address='tcp://127.0.0.1:5561')
if __name__ == '__main__':
for response in client.job(['server.foo', 'my_pipeline.foo'],
repeat(b'a message', 10),
messages=10):
print(response)
|
Connecting a hub to a server¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from pylm.servers import Server
import logging
class MyServer(Server):
def foo(self, message):
self.logger.warning('Got a message')
return b'you sent me ' + message
if __name__ == '__main__':
server = MyServer('server',
db_address='tcp://127.0.0.1:5555',
pull_address='tcp://127.0.0.1:5556',
pub_address='tcp://127.0.0.1:5557',
pipelined=True,
log_level=logging.DEBUG
)
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from pylm.servers import Hub
import logging
server = Hub(name='hub',
sub_address='tcp://127.0.0.1:5557',
pub_address='tcp://127.0.0.1:5558',
worker_pull_address='tcp://127.0.0.1:5559',
worker_push_address='tcp://127.0.0.1:5560',
db_address='tcp://127.0.0.1:5561',
previous='server',
pipelined=False)
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 | from pylm.servers import Worker
import sys
class MyWorker(Worker):
def foo(self, message):
self.logger.info('Processed')
return self.name.encode('utf-8') + b' processed ' + message
server = MyWorker(sys.argv[1], 'tcp://127.0.0.1:5561')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 | from pylm.clients import Client
client = Client('server', 'tcp://127.0.0.1:5555',
sub_address='tcp://127.0.0.1:5558')
if __name__ == '__main__':
result = client.eval(['server.foo', 'hub.foo'], b'a message')
print('Client got: ', result)
|
Building a master server from its components¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | from pylm.parts.servers import ServerTemplate
from pylm.parts.services import PullService, PubService, WorkerPullService, WorkerPushService, \
CacheService
server = ServerTemplate()
db_address = 'tcp://127.0.0.1:5559'
pull_address = 'tcp://127.0.0.1:5555'
pub_address = 'tcp://127.0.0.1:5556'
worker_pull_address = 'tcp://127.0.0.1:5557'
worker_push_address = 'tcp://127.0.0.1:5558'
server.register_inbound(PullService, 'Pull', pull_address, route='WorkerPush')
server.register_inbound(WorkerPullService, 'WorkerPull', worker_pull_address, route='Pub')
server.register_outbound(WorkerPushService, 'WorkerPush', worker_push_address)
server.register_outbound(PubService, 'Pub', pub_address)
server.register_bypass(CacheService, 'Cache', db_address)
server.preset_cache(name='server',
db_address=db_address,
pull_address=pull_address,
pub_address=pub_address,
worker_pull_address=worker_pull_address,
worker_push_address=worker_push_address)
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 | import sys
from pylm.servers import Worker
class MyWorker(Worker):
def foo(self, message):
return self.name.encode('utf-8') + b' processed ' + message
server = MyWorker(sys.argv[1], 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
for response in client.job('server.foo', repeat(b'a message', 10), messages=10):
print(response)
|
Turning a master into a web server with the HTTP gateway¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | from pylm.parts.servers import ServerTemplate
from pylm.parts.services import WorkerPullService, WorkerPushService, \
CacheService
from pylm.parts.gateways import GatewayDealer, GatewayRouter, HttpGateway
server = ServerTemplate()
worker_pull_address = 'tcp://127.0.0.1:5557'
worker_push_address = 'tcp://127.0.0.1:5558'
db_address = 'tcp://127.0.0.1:5559'
server.register_inbound(GatewayRouter,
'gateway_router',
'inproc://gateway_router',
route='WorkerPush')
server.register_outbound(GatewayDealer,
'gateway_dealer',
listen_address='inproc://gateway_router')
server.register_bypass(HttpGateway,
name='HttpGateway',
listen_address='inproc://gateway_router',
hostname='localhost',
port=8888)
server.register_inbound(WorkerPullService, 'WorkerPull', worker_pull_address,
route='gateway_dealer')
server.register_outbound(WorkerPushService, 'WorkerPush', worker_push_address)
server.register_bypass(CacheService, 'Cache', db_address)
server.preset_cache(name='server',
db_address=db_address,
worker_pull_address=worker_pull_address,
worker_push_address=worker_push_address)
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 | import sys
from pylm.servers import Worker
class MyWorker(Worker):
def function(self, message):
return b'acknowledged'
server = MyWorker(sys.argv[1], db_address='tcp://127.0.0.1:5559')
if __name__ == '__main__':
server.start()
|
1 2 3 | import requests
print(requests.get('http://localhost:8888/function').content)
|
Using server-less infrastructure as workers via the HTTP protocol¶

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | from pylm.parts.servers import ServerTemplate
from pylm.parts.services import PullService, PubService, CacheService
from pylm.parts.connections import HttpConnection
server = ServerTemplate()
db_address = 'tcp://127.0.0.1:5559'
pull_address = 'tcp://127.0.0.1:5555'
pub_address = 'tcp://127.0.0.1:5556'
server.register_inbound(PullService, 'Pull', pull_address,
route='HttpConnection')
server.register_outbound(HttpConnection, 'HttpConnection',
'http://localhost:8888', route='Pub', max_workers=1)
server.register_outbound(PubService, 'Pub', pub_address)
server.register_bypass(CacheService, 'Cache', db_address)
server.preset_cache(name='server',
db_address=db_address,
pull_address=pull_address,
pub_address=pub_address)
if __name__ == '__main__':
server.start()
|
1 2 3 4 5 6 7 8 9 10 11 12 | from pylm.remote.server import RequestHandler, DebugServer, WSGIApplication
class MyHandler(RequestHandler):
def foo(self, payload):
return payload + b' processed online'
app = WSGIApplication(MyHandler)
if __name__ == '__main__':
server = DebugServer('localhost', 8888, MyHandler)
server.serve_forever()
|
1 2 3 4 5 6 7 8 | from pylm.clients import Client
from itertools import repeat
client = Client('server', 'tcp://127.0.0.1:5559')
if __name__ == '__main__':
for response in client.job('server.foo', repeat(b'a message', 10), messages=10):
print(response)
|