High level API documentation

Servers

class pylm.servers.Hub(name: str, sub_address: str, pub_address: str, worker_pull_address: str, worker_push_address: str, db_address: str, previous: str, pipelined: bool = False, cache: object = <pylm.persistence.kv.DictDB object>, log_level: int = 20)[source]

A Hub is a pipelined Master.

Parameters:
  • name – Name of the server
  • sub_address – Valid address for the sub service
  • pub_address – Valid address for the pub service
  • worker_pull_address – Valid address for the pull-from-workers service
  • worker_push_address – Valid address for the push-to-workers service
  • db_address – Valid address to bind the Cache service
  • previous – Name of the previous server to subscribe to the queue.
  • pipelined – The stream is pipelined to another server.
  • cache – Key-value embeddable database. Pick from one of the supported ones
  • log_level – Logging level
change_payload(message: messages_pb2.PalmMessage, new_payload: bytes) → messages_pb2.PalmMessage

Change the payload of the message

Parameters:
  • message – The binary message to be processed
  • new_payload – The new binary payload
Returns:

Serialized message with the new payload

gather(message: messages_pb2.PalmMessage)

Gather function for outbound messages

Parameters:message – Binary message
Returns:Yield none, one or multiple binary messages
handle_stream(message: messages_pb2.PalmMessage)

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

preset_cache(**kwargs)

Send the following keyword arguments as cache variables. Useful for configuration variables that the workers or the clients fetch straight from the cache.

Parameters:kwargs
register_bypass(part, name='', listen_address='', **kwargs)

Register a bypass part to this server

Parameters:
  • part – part class
  • name – part name
  • listen_address – Valid ZeroMQ address listening to the exterior
  • kwargs – Additional keyword arguments to pass to the part
register_inbound(part, name='', listen_address='', route='', block=False, log='', **kwargs)

Register inbound part to this server.

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes to
  • block – True if the part blocks waiting for a response
  • log – Log message in DEBUG level for each message processed.
  • kwargs – Additional keyword arguments to pass to the part
register_outbound(part, name='', listen_address='', route='', log='', **kwargs)

Register outbound part to this server

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes the response (if there is) to
  • log – Log message in DEBUG level for each message processed
  • kwargs – Additional keyword arguments to pass to the part
scatter(message: messages_pb2.PalmMessage)

Scatter function for inbound messages

Parameters:message – Binary message
Returns:Yield none, one or multiple binary messages
start()

Start the server with all its parts.

class pylm.servers.Master(name: str, pull_address: str, pub_address: str, worker_pull_address: str, worker_push_address: str, db_address: str, pipelined: bool = False, cache: object = <pylm.persistence.kv.DictDB object>, log_level: int = 20)[source]

Standalone master server, intended to send workload to workers.

Parameters:
  • name – Name of the server
  • pull_address – Valid address for the pull service
  • pub_address – Valid address for the pub service
  • worker_pull_address – Valid address for the pull-from-workers service
  • worker_push_address – Valid address for the push-to-workers service
  • db_address – Valid address to bind the Cache service
  • pipelined – The output connects to a Pipeline or a Hub.
  • cache – Key-value embeddable database. Pick from one of the supported ones
  • log_level – Logging level
change_payload(message: messages_pb2.PalmMessage, new_payload: bytes) → messages_pb2.PalmMessage

Change the payload of the message

Parameters:
  • message – The binary message to be processed
  • new_payload – The new binary payload
Returns:

Serialized message with the new payload

gather(message: messages_pb2.PalmMessage)

Gather function for outbound messages

Parameters:message – Binary message
Returns:Yield none, one or multiple binary messages
handle_stream(message: messages_pb2.PalmMessage)

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

preset_cache(**kwargs)

Send the following keyword arguments as cache variables. Useful for configuration variables that the workers or the clients fetch straight from the cache.

Parameters:kwargs
register_bypass(part, name='', listen_address='', **kwargs)

Register a bypass part to this server

Parameters:
  • part – part class
  • name – part name
  • listen_address – Valid ZeroMQ address listening to the exterior
  • kwargs – Additional keyword arguments to pass to the part
register_inbound(part, name='', listen_address='', route='', block=False, log='', **kwargs)

Register inbound part to this server.

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes to
  • block – True if the part blocks waiting for a response
  • log – Log message in DEBUG level for each message processed.
  • kwargs – Additional keyword arguments to pass to the part
register_outbound(part, name='', listen_address='', route='', log='', **kwargs)

Register outbound part to this server

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes the response (if there is) to
  • log – Log message in DEBUG level for each message processed
  • kwargs – Additional keyword arguments to pass to the part
scatter(message: messages_pb2.PalmMessage)

Scatter function for inbound messages

Parameters:message – Binary message
Returns:Yield none, one or multiple binary messages
start()

Start the server with all its parts.

class pylm.servers.MuxWorker(name='', db_address='', push_address=None, pull_address=None, log_level=20, messages=9223372036854775807)[source]

Standalone worker for the standalone master which allow that user function returns an iterator a therefore the gather function of the Master recieve more messages.

Parameters:
  • name – Name assigned to this worker server
  • db_address – Address of the db service of the master
  • push_address – Address the workers push to. If left blank, fetches it from the master
  • pull_address – Address the workers pull from. If left blank, fetches it from the master
  • log_level – Log level for this server.
  • messages – Number of messages before it is shut down.
delete(key)

Deletes data in the server’s internal cache.

Parameters:key – Key of the data to be deleted
Returns:
get(key)

Gets a value from server’s internal cache

Parameters:key – Key for the data to be selected.
Returns:
set(value, key=None)

Sets a key value pare in the remote database.

Parameters:
  • key
  • value
Returns:

start()[source]

Starts the server

class pylm.servers.Pipeline(name, db_address, sub_address, pub_address, previous, to_client=True, log_level=20, messages=9223372036854775807)[source]

Minimal server that acts as a pipeline.

Parameters:
  • name (str) – Name of the server
  • db_address (str) – ZeroMQ address of the cache service.
  • sub_address (str) – Address of the pub socket of the previous server
  • pub_address (str) – Address of the pub socket
  • previous – Name of the previous server.
  • to_client – True if the message is sent back to the client. Defaults to True
  • log_level – Minimum output log level.
  • messages (int) – Total number of messages that the server processes. Useful for debugging.
echo(payload)

Echo utility function that returns the unchanged payload. This function is useful when the server is there as just to modify the stream of messages.

Returns:payload (bytes)
handle_stream(message)

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

start(cache_messages=9223372036854775807)

Start the server

Parameters:cache_messages – Number of messages the cache service handles before it shuts down. Useful for debugging
class pylm.servers.Server(name, db_address, pull_address, pub_address, pipelined=False, log_level=20, messages=9223372036854775807)[source]

Standalone and minimal server that replies single requests.

Parameters:
  • name (str) – Name of the server
  • db_address (str) – ZeroMQ address of the cache service.
  • pull_address (str) – Address of the pull socket
  • pub_address (str) – Address of the pub socket
  • pipelined – True if the server is chained to another server.
  • log_level – Minimum output log level.
  • messages (int) – Total number of messages that the server processes. Useful for debugging.
echo(payload)[source]

Echo utility function that returns the unchanged payload. This function is useful when the server is there as just to modify the stream of messages.

Returns:payload (bytes)
handle_stream(message)[source]

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

start(cache_messages=9223372036854775807)[source]

Start the server

Parameters:cache_messages – Number of messages the cache service handles before it shuts down. Useful for debugging
class pylm.servers.Sink(name, db_address, sub_addresses, pub_address, previous, to_client=True, log_level=20, messages=9223372036854775807)[source]

Minimal server that acts as a sink of multiple streams.

Parameters:
  • name (str) – Name of the server
  • db_address (str) – ZeroMQ address of the cache service.
  • sub_addresses (str) – List of addresses of the pub socket of the previous servers
  • pub_address (str) – Address of the pub socket
  • previous – List of names of the previous servers.
  • to_client – True if the message is sent back to the client. Defaults to True
  • log_level – Minimum output log level. Defaults to INFO
  • messages (int) – Total number of messages that the server processes. Defaults to Infty Useful for debugging.
echo(payload)

Echo utility function that returns the unchanged payload. This function is useful when the server is there as just to modify the stream of messages.

Returns:payload (bytes)
handle_stream(message)

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

start(cache_messages=9223372036854775807)

Start the server

Parameters:cache_messages – Number of messages the cache service handles before it shuts down. Useful for debugging
class pylm.servers.Worker(name='', db_address='', push_address=None, pull_address=None, log_level=20, messages=9223372036854775807)[source]

Standalone worker for the standalone master.

Parameters:
  • name – Name assigned to this worker server
  • db_address – Address of the db service of the master
  • push_address – Address the workers push to. If left blank, fetches it from the master
  • pull_address – Address the workers pull from. If left blank, fetches it from the master
  • log_level – Log level for this server.
  • messages – Number of messages before it is shut down.
delete(key)[source]

Deletes data in the server’s internal cache.

Parameters:key – Key of the data to be deleted
Returns:
get(key)[source]

Gets a value from server’s internal cache

Parameters:key – Key for the data to be selected.
Returns:
set(value, key=None)[source]

Sets a key value pare in the remote database.

Parameters:
  • key
  • value
Returns:

start()[source]

Starts the server

Clients

class pylm.clients.Client(server_name: str, db_address: str, push_address: str = None, sub_address: str = None, session: str = None, logging_level: int = 20, this_config=False)[source]

Client to connect to parallel servers

Parameters:
  • server_name – Server you are connecting to
  • db_address – Address for the cache service, for first connection or configuration.
  • push_address – Address of the push service of the server to pull from
  • sub_address – Address of the pub service of the server to subscribe to
  • session – Name of the pipeline if the session has to be reused
  • logging_level – Specify the logging level.
  • this_config – Do not fetch configuration from the server
delete(key)[source]

Deletes data in the server’s internal cache.

Parameters:key – Key of the data to be deleted
Returns:
eval(function, payload: bytes, messages: int = 1, cache: str = '')[source]

Execute single job.

Parameters:
  • function – Sting or list of strings following the format server.function.
  • payload – Binary message to be sent
  • messages – Number of messages expected to be sent back to the client
  • cache – Cache data included in the message
Returns:

If messages=1, the result data. If messages > 1, a list with the results

get(key)[source]

Gets a value from server’s internal cache

Parameters:key – Key for the data to be selected.
Returns:Value
job(function, generator, messages: int = 9223372036854775807, cache: str = '')[source]

Submit a job with multiple messages to a server.

Parameters:
  • function – Sting or list of strings following the format server.function.
  • payload – A generator that yields a series of binary messages.
  • messages – Number of messages expected to be sent back to the client. Defaults to infinity (sys.maxsize)
  • cache – Cache data included in the message
Returns:

an iterator with the messages that are sent back to the client.

set(value: bytes, key=None)[source]

Sets a key value pare in the remote database. If the key is not set, the function returns a new key. Note that the order of the arguments is reversed from the usual.

Warning

If the session attribute is specified, all the keys will be prepended with the session id.

Parameters:
  • value – Value to be stored
  • key – Key for the k-v storage
Returns:

New key or the same key