Low level API documentation

The router and the parts

class pylm.parts.core.BypassInbound(name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=9223372036854775807)[source]

Generic inbound part that does not connect to the router.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • socket_type – ZMQ inbound socket type
  • reply – True if the listening socket blocks waiting a reply
  • bind – True if the component has to bind instead of connect.
  • logger – Logger instance
  • cache – Access to the server cache
recv(reply_data=None)[source]

Receives, yields and returns reply_data if needed

Parameters:reply_data – Message to send if connection needs an answer.
class pylm.parts.core.BypassOutbound(name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=9223372036854775807)[source]

Generic inbound component that does not connect to the broker.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • socket_type – ZMQ inbound socket type
  • reply – True if the listening socket blocks waiting a reply
  • bind – True if the socket has to bind instead of connect
  • logger – Logger instance
  • cache – Access to the cache of the server
class pylm.parts.core.Inbound(name, listen_address, socket_type, reply=True, broker_address='inproc://broker', bind=False, logger=None, cache=None, messages=9223372036854775807)[source]

Generic part that connects a REQ socket to the broker, and a socket to an inbound external service.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • socket_type – ZMQ inbound socket type
  • reply – True if the listening socket blocks waiting a reply
  • broker_address – ZMQ socket address for the broker
  • bind – True if socket has to bind, instead of connect.
  • logger – Logger instance
  • cache – Cache for shared data in the server
  • messages – Maximum number of inbound messages. Defaults to infinity.
handle_feedback(message_data)[source]

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()[source]

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)[source]

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.core.Outbound(name, listen_address, socket_type, reply=True, broker_address='inproc://broker', bind=False, logger=None, cache=None, messages=9223372036854775807)[source]

Generic part that connects a REQ socket to the broker, and a socket to an inbound external service.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • socket_type – ZMQ inbound socket type
  • reply – True if the listening socket blocks waiting a reply
  • broker_address – ZMQ socket address for the broker,
  • bind – True if the socket has to bind instead of connect.
  • logger – Logger instance
  • cache – Access to the cache of the server
  • messages – Maximum number of inbound messages. Defaults to infinity.
handle_feedback(message_data)[source]

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()[source]

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)[source]

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.core.Router(inbound_address='inproc://inbound', outbound_address='inproc://outbound', logger=None, cache=None, messages=9223372036854775807)[source]

Router for the internal event-loop. It is a ROUTER socket that blocks waiting for the parts to send something. This is more a bus than a broker.

Parameters:
  • inbound_address – Valid ZMQ bind address for inbound parts
  • outbound_address – Valid ZMQ bind address for outbound parts
  • logger – Logger instance
  • cache – Global cache of the server
  • messages – Maximum number of inbound messages. Defaults to infinity.
  • messages – Number of messages allowed before the router starts buffering.
register_inbound(name, route='', block=False, log='')[source]

Register component by name.

Parameters:
  • name – Name of the component. Each component has a name, that uniquely identifies it to the broker
  • route – Each message that the broker gets from the component may be routed to another component. This argument gives the name of the target component for the message.
  • block – Register if the component is waiting for a reply.
  • log – Log message for each inbound connection.
Returns:

register_outbound(name, route='', log='')[source]

Register outbound component by name

Parameters:
  • name – Name of the component
  • route – Each message sent back to the component can be routed
  • log – Logging for each message that comes from the router.
Returns:

The server templates

class pylm.parts.servers.ServerTemplate(logging_level=20, router_messages=9223372036854775807)[source]

Low-level tool to build a server from parts.

Parameters:logging_level – A correct logging level from the logging module. Defaults to INFO.

It has important attributes that you may want to override, like

Cache:The key-value database that the server should use
Logging_level:Controls the log output of the server.
Router:Here’s the router, you may want to change its attributes too.
preset_cache(**kwargs)[source]

Send the following keyword arguments as cache variables. Useful for configuration variables that the workers or the clients fetch straight from the cache.

Parameters:kwargs
register_bypass(part, name='', listen_address='', **kwargs)[source]

Register a bypass part to this server

Parameters:
  • part – part class
  • name – part name
  • listen_address – Valid ZeroMQ address listening to the exterior
  • kwargs – Additional keyword arguments to pass to the part
register_inbound(part, name='', listen_address='', route='', block=False, log='', **kwargs)[source]

Register inbound part to this server.

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes to
  • block – True if the part blocks waiting for a response
  • log – Log message in DEBUG level for each message processed.
  • kwargs – Additional keyword arguments to pass to the part
register_outbound(part, name='', listen_address='', route='', log='', **kwargs)[source]

Register outbound part to this server

Parameters:
  • part – part class
  • name – Name of the part
  • listen_address – Valid ZeroMQ address listening to the exterior
  • route – Outbound part it routes the response (if there is) to
  • log – Log message in DEBUG level for each message processed
  • kwargs – Additional keyword arguments to pass to the part
start()[source]

Start the server with all its parts.

Services

class pylm.parts.services.CacheService(name, listen_address, logger=None, cache=None, messages=9223372036854775807)[source]

Cache service for clients and workers

class pylm.parts.services.HttpService(name, hostname, port, broker_address='inproc://broker', logger=None, cache=None)[source]

Similar to PullService, but the connection offered is an HTTP server that deals with inbound messages.

ACHTUNG: this thing is deliberately single threaded

debug()[source]

Starts the component and serves the http server forever.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Starts the component and serves the http server forever.

class pylm.parts.services.PubService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807, pipelined=False, server=None)[source]

PullService binds to a socket waits for messages from a push-pull queue.

Parameters:
  • name – Name of the service
  • listen_address – ZMQ socket address to bind to
  • broker_address – ZMQ socket address of the broker
  • logger – Logger instance
  • messages – Maximum number of messages. Defaults to infinity.
  • pipelined – Defaults to False. Pipelined if publishes to a server, False if publishes to a client.
  • server – Name of the server, necessary to pipeline messages.
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
handle_stream(message)[source]

Handle the stream of messages.

Parameters:message – The message about to be sent to the next step in the cluster
Returns:topic (str) and message (PalmMessage)

The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.

You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.

reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.services.PullService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

PullService binds to a socket waits for messages from a push-pull queue.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.services.PushPullService(name, push_address, pull_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

Push-Pull Service to connect to workers

handle_feedback(message_data)[source]

To be overriden. Handles the feedback from the broker :param message_data: :return:

reply_feedback()[source]

To be overriden. Returns the feedback if the component has to reply. :return:

scatter(message_data)[source]

To be overriden. Picks a message and returns a generator that multiplies the messages to the broker. :param message_data: :return:

class pylm.parts.services.PushService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

PullService binds to a socket waits for messages from a push-pull queue.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.services.RepBypassService(name, listen_address, logger=None, cache=None, messages=9223372036854775807)[source]

Generic connection that opens a Rep socket and bypasses the broker.

recv(reply_data=None)

Receives, yields and returns reply_data if needed

Parameters:reply_data – Message to send if connection needs an answer.
class pylm.parts.services.RepService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

RepService binds to a given socket and returns something.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.services.WorkerPullService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

This is a particular pull service that does not modify the messages that the broker sends.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.services.WorkerPushService(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

This is a particular push service that does not modify the messages that the broker sends.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

Gateways

class pylm.parts.gateways.GatewayDealer(name='', listen_address='inproc://gateway_router', broker_address='inproc://broker', cache=None, logger=None, messages=9223372036854775807)[source]

Generic component that connects a REQ socket to the broker, and a socket to an inbound external service.

This part is a companion for the gateway router, and has to connect to it to work properly:

-->|         v--------------------------------------|
   |-->Gateway Router ---> |-\  /->| --> *Dealer* --|
<--|                       |  \/   |
                           |  /\   |
     Workers -> Inbound -> |-/  \->| --> Outbound --> Workers
Parameters:
  • broker_address – ZMQ socket address for the broker,
  • logger – Logger instance
  • cache – Access to the cache of the server
  • messages – Maximum number of inbound messages. Defaults to infinity.
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.gateways.GatewayRouter(name='gateway_router', listen_address='inproc://gateway_router', broker_address='inproc://broker', cache=<pylm.persistence.kv.DictDB object>, logger=None, messages=9223372036854775807)[source]

Router that allows a parallel server to connect to multiple clients. It also allows to recv messages from a dealer socket that feeds back the output from the same router. The goal is to provide blocking jobs to multiple clients.

Parameters:
  • broker_address – Broker address
  • cache – K-v database for the cache
  • logger – Logger class
  • messages – Number of messages until it is shut down
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.gateways.HttpGateway(name='', listen_address='inproc://gateway_router', hostname='', port=8888, cache=<pylm.persistence.kv.DictDB object>, logger=None)[source]

HTTP Gateway that adapts an HTTP server to a PALM master

Parameters:
  • name – Name of the part
  • listen_address – Address listening for reentrant messages
  • hostname – Hostname for the HTTP server
  • port – Port for the HTTP server
  • cache – Cache of the master
  • logger – Logger class
class pylm.parts.gateways.MyServer(server_address, RequestHandlerClass, bind_and_activate=True)[source]

Server that handles multiple requests

close_request(request)

Called to clean up an individual request.

fileno()

Return socket file number.

Interface required by selector.

finish_request(request, client_address)

Finish one request by instantiating RequestHandlerClass.

get_request()

Get the request and client address from the socket.

May be overridden.

handle_error(request, client_address)

Handle an error gracefully. May be overridden.

The default is to print a traceback and continue.

handle_request()

Handle one request, possibly blocking.

Respects self.timeout.

handle_timeout()

Called if no new request arrives within self.timeout.

Overridden by ForkingMixIn.

process_request(request, client_address)

Start a new thread to process the request.

process_request_thread(request, client_address)

Same as in BaseServer but as a thread.

In addition, exception handling is done here.

serve_forever(poll_interval=0.5)

Handle one request at a time until shutdown.

Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread.

server_activate()

Called by constructor to activate the server.

May be overridden.

server_bind()

Override server_bind to store the server name.

server_close()

Called to clean-up the server.

May be overridden.

service_actions()

Called by the serve_forever() loop.

May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.

shutdown()

Stops the serve_forever loop.

Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock.

shutdown_request(request)

Called to shutdown and close an individual request.

verify_request(request, client_address)

Verify the request. May be overridden.

Return True if we should proceed with this request.

Connections

class pylm.parts.connections.HttpConnection(name, listen_address, reply=True, broker_address='inproc://broker', logger=None, cache=None, max_workers=4, messages=9223372036854775807)[source]

Similar to PushConnection. An HTTP client deals with outbound messages.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component

class pylm.parts.connections.PullBypassConnection(name, listen_address, logger=None, messages=9223372036854775807)[source]

Generic connection that opens a Sub socket and bypasses the broker.

recv(reply_data=None)

Receives, yields and returns reply_data if needed

Parameters:reply_data – Message to send if connection needs an answer.
class pylm.parts.connections.PullConnection(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

PullConnection is a component that connects a REQ socket to the broker, and a PULL socket to an external service.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • broker_address – ZMQ socket address for the broker
  • logger – Logger instance
  • messages – Maximum number of inbound messages. Defaults to infinity.
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.connections.PushBypassConnection(name, listen_address, logger=None, messages=9223372036854775807)[source]

Generic connection that sends a message to a sub service. Good for logs or metrics.

class pylm.parts.connections.PushConnection(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

PushConnection is a component that connects a REQ socket to the broker, and a PUSH socket to an external service.

handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.connections.RepConnection(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

RepConnection is a component that connects a REQ socket to the broker, and a REP socket to an external service.

Parameters:
  • name – Name of the component
  • listen_address – ZMQ socket address to listen to
  • broker_address – ZMQ socket address for the broker
  • logger – Logger instance
  • messages – Maximum number of inbound messages. Defaults to infinity.
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()

Call this function to start the component

class pylm.parts.connections.SubConnection(name, listen_address, previous, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]

Part that connects to a Pub service and subscribes to its message queue

Parameters:
  • name
  • listen_address
  • previous
  • broker_address
  • logger
  • cache
  • messages
handle_feedback(message_data)

Abstract method. Handles the feedback from the broker

Parameters:message_data
reply_feedback()

Abstract method. Returns the feedback if the component has to reply.

scatter(message_data)

Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.

Parameters:message_data
start()[source]

Call this function to start the component