Source code for pylm.parts.core

# Pylm, a framework to build components for high performance distributed
# applications. Copyright (C) 2016 NFQ Solutions
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from pylm.parts.messages_pb2 import PalmMessage
from uuid import uuid4
import traceback
import zmq
import sys


zmq_context = zmq.Context.instance()


[docs]class Router(object): """ Router for the internal event-loop. It is a ROUTER socket that blocks waiting for the parts to send something. This is more a bus than a broker. :param inbound_address: Valid ZMQ bind address for inbound parts :param outbound_address: Valid ZMQ bind address for outbound parts :param logger: Logger instance :param cache: Global cache of the server :param messages: Maximum number of inbound messages. Defaults to infinity. :param messages: Number of messages allowed before the router starts buffering. """ def __init__(self, inbound_address="inproc://inbound", outbound_address="inproc://outbound", logger=None, cache=None, messages=sys.maxsize): # Socket that will listen to the inbound parts self.inbound = zmq_context.socket(zmq.ROUTER) self.inbound.bind(inbound_address) self.inbound_address = inbound_address self.inbound_components = {} # Socket that listens to the outbound parts self.outbound = zmq_context.socket(zmq.ROUTER) self.outbound.bind(outbound_address) self.outbound_address = outbound_address self.outbound_components = {} # Other utilities self.logger = logger self.messages = messages # Cache for the server self.cache = cache
[docs] def register_inbound(self, name, route='', block=False, log=''): """ Register component by name. :param name: Name of the component. Each component has a name, that uniquely identifies it to the broker :param route: Each message that the broker gets from the component may be routed to another component. This argument gives the name of the target component for the message. :param block: Register if the component is waiting for a reply. :param log: Log message for each inbound connection. :return: """ self.inbound_components[name.encode('utf-8')] = { 'route': route.encode('utf-8'), 'block': block, 'log': log }
[docs] def register_outbound(self, name, route='', log=''): """ Register outbound component by name :param name: Name of the component :param route: Each message sent back to the component can be routed :param log: Logging for each message that comes from the router. :return: """ self.outbound_components[name.encode('utf-8')] = { 'route': route.encode('utf-8'), 'log': log }
def start(self): self.logger.info('Launch router') for part in self.inbound_components: route = self.inbound_components[part]['route'].decode('utf-8') if not route: route = 'exterior' self.logger.info( 'Inbound {} connects to {}'.format( part.decode('utf-8'), route ) ) for part in self.outbound_components: route = self.outbound_components[part]['route'].decode('utf-8') if not route: route = 'exterior' self.logger.info( 'Outbound {} connects to {}'.format( part.decode('utf-8'), route ) ) for i in range(self.messages): component, empty, message_data = self.inbound.recv_multipart() # Routing from inbound to outbound route_to = self.inbound_components[component]['route'] block = self.inbound_components[component]['block'] if route_to: self.logger.debug('Router: {} routing to {}'.format(component, route_to)) self.outbound.send_multipart([route_to, empty, message_data]) from_outbound = self.outbound.recv_multipart() if len(from_outbound) == 3: # From a REP socket [route_to, empty, feedback] = from_outbound elif len(from_outbound) == 2: # From a DEALER socket [route_to, feedback] = from_outbound else: self.logger.error('Error in Router:') self.logger.error('Message badly formatted from {}'.format(route_to)) # And now drop the message continue # Rerouting from outbound to a different outbound. # You can reroute only once, and if inbound blocks it gets # the message from the first outbound. This can be used to # implement weird routing schemes. reroute = self.outbound_components[route_to]['route'] if reroute: self.outbound.send_multipart([reroute, empty, feedback]) self.outbound.recv_multipart() # The inbound is blocked, but sometimes it can process the # feedback. You can use this to remove the block thing, because # now you can redirect from outbound to outbound. if block: self.inbound.send_multipart([component, empty, feedback]) else: self.inbound.send_multipart([component, empty, b'1']) else: self.inbound.send_multipart([component, empty, b'1']) return b'router' def cleanup(self): self.inbound.close() self.outbound.close()
[docs]class Inbound(object): """ Generic part that connects a REQ socket to the broker, and a socket to an inbound external service. :param name: Name of the component :param listen_address: ZMQ socket address to listen to :param socket_type: ZMQ inbound socket type :param reply: True if the listening socket blocks waiting a reply :param broker_address: ZMQ socket address for the broker :param bind: True if socket has to bind, instead of connect. :param logger: Logger instance :param cache: Cache for shared data in the server :param messages: Maximum number of inbound messages. Defaults to infinity. """ def __init__(self, name, listen_address, socket_type, reply=True, broker_address="inproc://broker", bind=False, logger=None, cache=None, messages=sys.maxsize): self.name = name.encode('utf-8') self.listen_to = zmq_context.socket(socket_type) self.bind = bind self.listen_address = listen_address self.broker = zmq_context.socket(zmq.REQ) self.broker.identity = self.name self.broker.connect(broker_address) self.logger = logger self.cache = cache self.messages = messages self.reply = reply self.last_message = b'' def _translate_to_broker(self, message): """ Translate the message that the component has got to be digestible by the router. To be refactored :param message_data: Message data from the component to the router """ if not message.cache: message.cache = str(uuid4()) return message def _translate_from_broker(self, message_data): """ Translate the message that the component gets from the router to the output format :param message_data: Data from the router """ return message_data
[docs] def scatter(self, message_data): """ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker. :param message_data: """ yield message_data
[docs] def handle_feedback(self, message_data): """ Abstract method. Handles the feedback from the broker :param message_data: """ self.last_message = message_data
[docs] def reply_feedback(self): """ Abstract method. Returns the feedback if the component has to reply. """ return self.last_message
[docs] def start(self): """ Call this function to start the component """ message = PalmMessage() if self.bind: self.listen_to.bind(self.listen_address) else: self.listen_to.connect(self.listen_address) self.logger.info('{} successfully started'.format(self.name)) for i in range(self.messages): self.logger.debug('{} blocked waiting messages'.format(self.name)) message_data = self.listen_to.recv() self.logger.debug('{} Got inbound message'.format(self.name)) try: message.ParseFromString(message_data) for scattered in self.scatter(message): scattered = self._translate_to_broker(scattered) self.broker.send(scattered.SerializeToString()) self.logger.debug('{} blocked waiting for broker'.format( self.name)) self.handle_feedback(self.broker.recv()) if self.reply: self.listen_to.send(self.reply_feedback()) except: self.logger.error('Exception in scatter or routing.') lines = traceback.format_exception(*sys.exc_info()) self.logger.exception(lines[0]) if self.reply: self.listen_to.send(b'0') return self.name
def cleanup(self): self.broker.close() self.listen_to.close()
[docs]class Outbound(object): """ Generic part that connects a REQ socket to the broker, and a socket to an inbound external service. :param name: Name of the component :param listen_address: ZMQ socket address to listen to :param socket_type: ZMQ inbound socket type :param reply: True if the listening socket blocks waiting a reply :param broker_address: ZMQ socket address for the broker, :param bind: True if the socket has to bind instead of connect. :param logger: Logger instance :param cache: Access to the cache of the server :param messages: Maximum number of inbound messages. Defaults to infinity. """ def __init__(self, name, listen_address, socket_type, reply=True, broker_address="inproc://broker", bind=False, logger=None, cache=None, messages=sys.maxsize): self.name = name.encode('utf-8') self.listen_to = zmq_context.socket(socket_type) self.bind = bind self.listen_address = listen_address self.broker = zmq_context.socket(zmq.REP) self.broker.identity = self.name self.broker.connect(broker_address) self.logger = logger self.cache = cache self.messages = messages self.reply = reply self.last_message = b'' def _translate_to_broker(self, message: PalmMessage): """ Translate the message that the component has got to be digestible by the broker :param message: :return: """ return message def _translate_from_broker(self, message: PalmMessage): """ Translate the message that the component gets from the router to the output format :param message: """ return message
[docs] def scatter(self, message_data): """ Abstract method. Picks a message and returns a generator that multiplies the messages to the broker. :param message_data: """ yield message_data
[docs] def handle_feedback(self, message_data): """ Abstract method. Handles the feedback from the broker :param message_data: """ self.last_message = message_data
[docs] def reply_feedback(self): """ Abstract method. Returns the feedback if the component has to reply. """ return self.last_message
[docs] def start(self): """ Call this function to start the component """ message = PalmMessage() if self.bind: self.listen_to.bind(self.listen_address) else: self.listen_to.connect(self.listen_address) self.logger.info('{} successfully started'.format(self.name)) for i in range(self.messages): self.logger.debug('{} blocked waiting for broker'.format(self.name)) message_data = self.broker.recv() self.logger.debug('{} Got message from broker'.format(self.name)) message_data = self._translate_from_broker(message_data) message.ParseFromString(message_data) for scattered in self.scatter(message): self.listen_to.send(scattered.SerializeToString()) self.logger.debug('{} Sent message'.format(self.name)) if self.reply: feedback = self.listen_to.recv() feedback = self._translate_to_broker(feedback) self.handle_feedback(feedback) self.broker.send(self.reply_feedback()) return self.name
def cleanup(self): self.listen_to.close() self.broker.close()
[docs]class BypassInbound(object): """ Generic inbound part that does not connect to the router. :param name: Name of the component :param listen_address: ZMQ socket address to listen to :param socket_type: ZMQ inbound socket type :param reply: True if the listening socket blocks waiting a reply :param bind: True if the component has to bind instead of connect. :param logger: Logger instance :param cache: Access to the server cache """ def __init__(self, name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=sys.maxsize): self.name = name.encode('utf-8') self.listen_to = zmq_context.socket(socket_type) self.bind = bind self.listen_address = listen_address self.logger = logger self.reply = reply self.cache = cache self.messages = messages
[docs] def recv(self, reply_data=None): """ Receives, yields and returns reply_data if needed :param reply_data: Message to send if connection needs an answer. """ message_data = self.listen_to.recv() if self.reply: self.listen_to.send(reply_data) return message_data
def start(self): if self.bind: self.listen_to.bind(self.listen_address) else: self.listen_to.connect(self.listen_address) for i in range(self.messages): self.recv() return self.name def cleanup(self): self.listen_to.close()
[docs]class BypassOutbound(object): """ Generic inbound component that does not connect to the broker. :param name: Name of the component :param listen_address: ZMQ socket address to listen to :param socket_type: ZMQ inbound socket type :param reply: True if the listening socket blocks waiting a reply :param bind: True if the socket has to bind instead of connect :param logger: Logger instance :param cache: Access to the cache of the server """ def __init__(self, name, listen_address, socket_type, reply=True, bind=False, logger=None, cache=None, messages=sys.maxsize): self.name = name.encode('utf-8') self.listen_to = zmq_context.socket(socket_type) if bind: self.listen_to.bind(listen_address) else: self.listen_to.connect(listen_address) self.listen_address = listen_address self.logger = logger self.reply = reply self.cache = cache self.messages = messages def send(self, message_data): self.listen_to.send(message_data) if self.reply: message_data = self.listen_to.recv() return message_data