Low level API documentation¶
The router and the parts¶
-
class
pylm.parts.core.
BypassInbound
(name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=9223372036854775807)[source]¶ Generic inbound part that does not connect to the router.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- socket_type – ZMQ inbound socket type
- reply – True if the listening socket blocks waiting a reply
- bind – True if the component has to bind instead of connect.
- logger – Logger instance
- cache – Access to the server cache
-
class
pylm.parts.core.
BypassOutbound
(name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=9223372036854775807)[source]¶ Generic inbound component that does not connect to the broker.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- socket_type – ZMQ inbound socket type
- reply – True if the listening socket blocks waiting a reply
- bind – True if the socket has to bind instead of connect
- logger – Logger instance
- cache – Access to the cache of the server
-
class
pylm.parts.core.
Inbound
(name, listen_address, socket_type, reply=True, broker_address='inproc://broker', bind=False, logger=None, cache=None, messages=9223372036854775807)[source]¶ Generic part that connects a REQ socket to the broker, and a socket to an inbound external service.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- socket_type – ZMQ inbound socket type
- reply – True if the listening socket blocks waiting a reply
- broker_address – ZMQ socket address for the broker
- bind – True if socket has to bind, instead of connect.
- logger – Logger instance
- cache – Cache for shared data in the server
- messages – Maximum number of inbound messages. Defaults to infinity.
-
handle_feedback
(message_data)[source]¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
class
pylm.parts.core.
Outbound
(name, listen_address, socket_type, reply=True, broker_address='inproc://broker', bind=False, logger=None, cache=None, messages=9223372036854775807)[source]¶ Generic part that connects a REQ socket to the broker, and a socket to an inbound external service.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- socket_type – ZMQ inbound socket type
- reply – True if the listening socket blocks waiting a reply
- broker_address – ZMQ socket address for the broker,
- bind – True if the socket has to bind instead of connect.
- logger – Logger instance
- cache – Access to the cache of the server
- messages – Maximum number of inbound messages. Defaults to infinity.
-
handle_feedback
(message_data)[source]¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
class
pylm.parts.core.
Router
(inbound_address='inproc://inbound', outbound_address='inproc://outbound', logger=None, cache=None, messages=9223372036854775807)[source]¶ Router for the internal event-loop. It is a ROUTER socket that blocks waiting for the parts to send something. This is more a bus than a broker.
Parameters: - inbound_address – Valid ZMQ bind address for inbound parts
- outbound_address – Valid ZMQ bind address for outbound parts
- logger – Logger instance
- cache – Global cache of the server
- messages – Maximum number of inbound messages. Defaults to infinity.
- messages – Number of messages allowed before the router starts buffering.
-
register_inbound
(name, route='', block=False, log='')[source]¶ Register component by name.
Parameters: - name – Name of the component. Each component has a name, that uniquely identifies it to the broker
- route – Each message that the broker gets from the component may be routed to another component. This argument gives the name of the target component for the message.
- block – Register if the component is waiting for a reply.
- log – Log message for each inbound connection.
Returns:
The server templates¶
-
class
pylm.parts.servers.
ServerTemplate
(logging_level=20, router_messages=9223372036854775807)[source]¶ Low-level tool to build a server from parts.
Parameters: logging_level – A correct logging level from the logging module. Defaults to INFO. It has important attributes that you may want to override, like
Cache: The key-value database that the server should use Logging_level: Controls the log output of the server. Router: Here’s the router, you may want to change its attributes too. -
preset_cache
(**kwargs)[source]¶ Send the following keyword arguments as cache variables. Useful for configuration variables that the workers or the clients fetch straight from the cache.
Parameters: kwargs –
-
register_bypass
(part, name='', listen_address='', **kwargs)[source]¶ Register a bypass part to this server
Parameters: - part – part class
- name – part name
- listen_address – Valid ZeroMQ address listening to the exterior
- kwargs – Additional keyword arguments to pass to the part
-
register_inbound
(part, name='', listen_address='', route='', block=False, log='', **kwargs)[source]¶ Register inbound part to this server.
Parameters: - part – part class
- name – Name of the part
- listen_address – Valid ZeroMQ address listening to the exterior
- route – Outbound part it routes to
- block – True if the part blocks waiting for a response
- log – Log message in DEBUG level for each message processed.
- kwargs – Additional keyword arguments to pass to the part
-
register_outbound
(part, name='', listen_address='', route='', log='', **kwargs)[source]¶ Register outbound part to this server
Parameters: - part – part class
- name – Name of the part
- listen_address – Valid ZeroMQ address listening to the exterior
- route – Outbound part it routes the response (if there is) to
- log – Log message in DEBUG level for each message processed
- kwargs – Additional keyword arguments to pass to the part
-
Services¶
-
class
pylm.parts.services.
CacheService
(name, listen_address, logger=None, cache=None, messages=9223372036854775807)[source]¶ Cache service for clients and workers
-
class
pylm.parts.services.
HttpService
(name, hostname, port, broker_address='inproc://broker', logger=None, cache=None)[source]¶ Similar to PullService, but the connection offered is an HTTP server that deals with inbound messages.
ACHTUNG: this thing is deliberately single threaded
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
-
class
pylm.parts.services.
PubService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807, pipelined=False, server=None)[source]¶ PullService binds to a socket waits for messages from a push-pull queue.
Parameters: - name – Name of the service
- listen_address – ZMQ socket address to bind to
- broker_address – ZMQ socket address of the broker
- logger – Logger instance
- messages – Maximum number of messages. Defaults to infinity.
- pipelined – Defaults to False. Pipelined if publishes to a server, False if publishes to a client.
- server – Name of the server, necessary to pipeline messages.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
handle_stream
(message)[source]¶ Handle the stream of messages.
Parameters: message – The message about to be sent to the next step in the cluster Returns: topic (str) and message (PalmMessage) The default behaviour is the following. If you leave this function unchanged and pipeline is set to False, the topic is the ID of the client, which makes the message return to the client. If the pipeline parameter is set to True, the topic is set as the name of the server and the step of the message is incremented by one.
You can alter this default behaviour by overriding this function. Take into account that the message is also available in this function, and you can change other parameters like the stage or the function.
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
class
pylm.parts.services.
PullService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ PullService binds to a socket waits for messages from a push-pull queue.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
-
class
pylm.parts.services.
PushPullService
(name, push_address, pull_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ Push-Pull Service to connect to workers
-
handle_feedback
(message_data)[source]¶ To be overriden. Handles the feedback from the broker :param message_data: :return:
-
-
class
pylm.parts.services.
PushService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ PullService binds to a socket waits for messages from a push-pull queue.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
-
class
pylm.parts.services.
RepBypassService
(name, listen_address, logger=None, cache=None, messages=9223372036854775807)[source]¶ Generic connection that opens a Rep socket and bypasses the broker.
-
recv
(reply_data=None)¶ Receives, yields and returns reply_data if needed
Parameters: reply_data – Message to send if connection needs an answer.
-
-
class
pylm.parts.services.
RepService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ RepService binds to a given socket and returns something.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
-
class
pylm.parts.services.
WorkerPullService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ This is a particular pull service that does not modify the messages that the broker sends.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
-
class
pylm.parts.services.
WorkerPushService
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ This is a particular push service that does not modify the messages that the broker sends.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
Gateways¶
-
class
pylm.parts.gateways.
GatewayDealer
(name='', listen_address='inproc://gateway_router', broker_address='inproc://broker', cache=None, logger=None, messages=9223372036854775807)[source]¶ Generic component that connects a REQ socket to the broker, and a socket to an inbound external service.
This part is a companion for the gateway router, and has to connect to it to work properly:
-->| v--------------------------------------| |-->Gateway Router ---> |-\ /->| --> *Dealer* --| <--| | \/ | | /\ | Workers -> Inbound -> |-/ \->| --> Outbound --> Workers
Parameters: - broker_address – ZMQ socket address for the broker,
- logger – Logger instance
- cache – Access to the cache of the server
- messages – Maximum number of inbound messages. Defaults to infinity.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
class
pylm.parts.gateways.
GatewayRouter
(name='gateway_router', listen_address='inproc://gateway_router', broker_address='inproc://broker', cache=<pylm.persistence.kv.DictDB object>, logger=None, messages=9223372036854775807)[source]¶ Router that allows a parallel server to connect to multiple clients. It also allows to recv messages from a dealer socket that feeds back the output from the same router. The goal is to provide blocking jobs to multiple clients.
Parameters: - broker_address – Broker address
- cache – K-v database for the cache
- logger – Logger class
- messages – Number of messages until it is shut down
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
class
pylm.parts.gateways.
HttpGateway
(name='', listen_address='inproc://gateway_router', hostname='', port=8888, cache=<pylm.persistence.kv.DictDB object>, logger=None)[source]¶ HTTP Gateway that adapts an HTTP server to a PALM master
Parameters: - name – Name of the part
- listen_address – Address listening for reentrant messages
- hostname – Hostname for the HTTP server
- port – Port for the HTTP server
- cache – Cache of the master
- logger – Logger class
-
class
pylm.parts.gateways.
MyServer
(server_address, RequestHandlerClass, bind_and_activate=True)[source]¶ Server that handles multiple requests
-
close_request
(request)¶ Called to clean up an individual request.
-
fileno
()¶ Return socket file number.
Interface required by selector.
-
finish_request
(request, client_address)¶ Finish one request by instantiating RequestHandlerClass.
-
get_request
()¶ Get the request and client address from the socket.
May be overridden.
-
handle_error
(request, client_address)¶ Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
-
handle_request
()¶ Handle one request, possibly blocking.
Respects self.timeout.
-
handle_timeout
()¶ Called if no new request arrives within self.timeout.
Overridden by ForkingMixIn.
-
process_request
(request, client_address)¶ Start a new thread to process the request.
-
process_request_thread
(request, client_address)¶ Same as in BaseServer but as a thread.
In addition, exception handling is done here.
-
serve_forever
(poll_interval=0.5)¶ Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread.
-
server_activate
()¶ Called by constructor to activate the server.
May be overridden.
-
server_bind
()¶ Override server_bind to store the server name.
-
server_close
()¶ Called to clean-up the server.
May be overridden.
-
service_actions
()¶ Called by the serve_forever() loop.
May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop.
-
shutdown
()¶ Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock.
-
shutdown_request
(request)¶ Called to shutdown and close an individual request.
-
verify_request
(request, client_address)¶ Verify the request. May be overridden.
Return True if we should proceed with this request.
-
Connections¶
-
class
pylm.parts.connections.
HttpConnection
(name, listen_address, reply=True, broker_address='inproc://broker', logger=None, cache=None, max_workers=4, messages=9223372036854775807)[source]¶ Similar to PushConnection. An HTTP client deals with outbound messages.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
-
class
pylm.parts.connections.
PullBypassConnection
(name, listen_address, logger=None, messages=9223372036854775807)[source]¶ Generic connection that opens a Sub socket and bypasses the broker.
-
recv
(reply_data=None)¶ Receives, yields and returns reply_data if needed
Parameters: reply_data – Message to send if connection needs an answer.
-
-
class
pylm.parts.connections.
PullConnection
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ PullConnection is a component that connects a REQ socket to the broker, and a PULL socket to an external service.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- broker_address – ZMQ socket address for the broker
- logger – Logger instance
- messages – Maximum number of inbound messages. Defaults to infinity.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
class
pylm.parts.connections.
PushBypassConnection
(name, listen_address, logger=None, messages=9223372036854775807)[source]¶ Generic connection that sends a message to a sub service. Good for logs or metrics.
-
class
pylm.parts.connections.
PushConnection
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ PushConnection is a component that connects a REQ socket to the broker, and a PUSH socket to an external service.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
-
class
pylm.parts.connections.
RepConnection
(name, listen_address, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ RepConnection is a component that connects a REQ socket to the broker, and a REP socket to an external service.
Parameters: - name – Name of the component
- listen_address – ZMQ socket address to listen to
- broker_address – ZMQ socket address for the broker
- logger – Logger instance
- messages – Maximum number of inbound messages. Defaults to infinity.
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –
-
start
()¶ Call this function to start the component
-
class
pylm.parts.connections.
SubConnection
(name, listen_address, previous, broker_address='inproc://broker', logger=None, cache=None, messages=9223372036854775807)[source]¶ Part that connects to a Pub service and subscribes to its message queue
Parameters: - name –
- listen_address –
- previous –
- broker_address –
- logger –
- cache –
- messages –
-
handle_feedback
(message_data)¶ Abstract method. Handles the feedback from the broker
Parameters: message_data –
-
reply_feedback
()¶ Abstract method. Returns the feedback if the component has to reply.
-
scatter
(message_data)¶ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker.
Parameters: message_data –