# Pylm, a framework to build components for high performance distributed
# applications. Copyright (C) 2016 NFQ Solutions
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import zmq
import sys
import concurrent.futures
import traceback
from pylm.parts.core import Inbound, Outbound, \
BypassInbound, BypassOutbound, zmq_context
from urllib.request import Request, urlopen
from pylm.parts.messages_pb2 import PalmMessage
[docs]class RepConnection(Inbound):
"""
RepConnection is a component that connects a REQ socket to the broker, and a REP
socket to an external service.
:param name: Name of the component
:param listen_address: ZMQ socket address to listen to
:param broker_address: ZMQ socket address for the broker
:param logger: Logger instance
:param messages: Maximum number of inbound messages. Defaults to infinity.
"""
def __init__(self, name, listen_address, broker_address="inproc://broker",
logger=None, cache=None, messages=sys.maxsize):
super(RepConnection, self).__init__(
name,
listen_address,
zmq.REP,
reply=True,
broker_address=broker_address,
logger=logger,
cache=cache,
messages=messages
)
[docs]class SubConnection(Inbound):
"""
Part that connects to a Pub service and subscribes to its message queue
:param name:
:param listen_address:
:param previous:
:param broker_address:
:param logger:
:param cache:
:param messages:
"""
def __init__(self, name, listen_address, previous,
broker_address="inproc://broker", logger=None, cache=None,
messages=sys.maxsize):
super(SubConnection, self).__init__(
name,
listen_address,
zmq.SUB,
reply=False,
broker_address=broker_address,
logger=logger,
cache=cache,
messages=messages
)
self.previous = previous
[docs] def start(self):
"""
Call this function to start the component
"""
message = PalmMessage()
self.listen_to.setsockopt_string(zmq.SUBSCRIBE, self.previous)
self.listen_to.connect(self.listen_address)
self.logger.info('{} successfully started'.format(self.name))
for i in range(self.messages):
self.logger.debug('{} blocked waiting messages'.format(self.name))
message_data = self.listen_to.recv_multipart()[1]
self.logger.debug('{} Got inbound message'.format(self.name))
try:
message.ParseFromString(message_data)
for scattered in self.scatter(message):
scattered = self._translate_to_broker(scattered)
self.broker.send(scattered.SerializeToString())
self.logger.debug('{} blocked waiting for broker'.format(
self.name))
self.handle_feedback(self.broker.recv())
if self.reply:
self.listen_to.send(self.reply_feedback())
except:
self.logger.error('Error in scatter function in {}'.format(
self.name))
lines = traceback.format_exception(*sys.exc_info())
self.logger.exception(lines[0])
if self.reply:
self.listen_to.send(b'0')
return self.name
def cleanup(self):
self.broker.close()
self.listen_to.close()
[docs]class PullConnection(Inbound):
"""
PullConnection is a component that connects a REQ socket to the broker,
and a PULL socket to an external service.
:param name: Name of the component
:param listen_address: ZMQ socket address to listen to
:param broker_address: ZMQ socket address for the broker
:param logger: Logger instance
:param messages: Maximum number of inbound messages. Defaults to infinity.
"""
def __init__(self, name, listen_address, broker_address="inproc://broker",
logger=None, cache=None, messages=sys.maxsize):
super(PullConnection, self).__init__(
name,
listen_address,
zmq.PULL,
reply=False,
broker_address=broker_address,
logger=logger,
cache=cache,
messages=messages
)
[docs]class PushConnection(Outbound):
"""
PushConnection is a component that connects a REQ socket to the broker, and a PUSH
socket to an external service.
"""
def __init__(self, name, listen_address, broker_address="inproc://broker",
logger=None, cache=None, messages=sys.maxsize):
"""
:param name: Name of the component
:param listen_address: ZMQ socket address to listen to
:param broker_address: ZMQ socket address for the broker
:param logger: Logger instance
:param messages: Maximum number of inbound messages. Defaults to infinity.
:return:
"""
super(PushConnection, self).__init__(
name,
listen_address,
zmq.PUSH,
reply=False,
broker_address=broker_address,
logger=logger,
cache=cache,
messages=messages
)
[docs]class PushBypassConnection(BypassOutbound):
"""
Generic connection that sends a message to a sub service. Good for logs or metrics.
"""
def __init__(self, name, listen_address, logger=None, messages=sys.maxsize):
"""
:param name: Name of the connection
:param listen_address: ZMQ socket address to listen to.
:param logger: Logger instance
:return:
"""
super(PushBypassConnection, self).__init__(name, listen_address, zmq.PUSH,
reply=False, bind=False,
logger=logger)
[docs]class PullBypassConnection(BypassInbound):
"""
Generic connection that opens a Sub socket and bypasses the broker.
"""
def __init__(self, name, listen_address, logger=None, messages=sys.maxsize):
"""
:param name: Name of the connection
:param listen_address: ZMQ socket address to listen to
:param logger: Logger instance
:param messages:
:return:
"""
super(PullBypassConnection, self).__init__(name, listen_address, zmq.PULL,
reply=False, bind=False,
logger=logger)
[docs]class HttpConnection(Outbound):
"""
Similar to PushConnection. An HTTP client deals with outbound messages.
"""
def __init__(self,
name,
listen_address,
reply=True,
broker_address="inproc://broker",
logger=None,
cache=None,
max_workers=4,
messages=sys.maxsize):
self.name = name.encode('utf-8')
self.broker = zmq_context.socket(zmq.REP)
self.broker.identity = self.name
self.broker.connect(broker_address)
self.logger = logger
self.cache = cache
self.messages = messages
self.reply = reply
self.last_message = b''
self.url = listen_address
self.max_workers = max_workers
[docs] def start(self):
"""
Call this function to start the component
"""
message = PalmMessage()
def load_url(url, data):
request = Request(url, data=data)
response = urlopen(request)
return response.read()
for i in range(self.messages):
self.logger.debug('{} blocked waiting for broker'.format(self.name))
message_data = self.broker.recv()
self.logger.debug('{} Got message from broker'.format(self.name))
message_data = self._translate_from_broker(message_data)
message.ParseFromString(message_data)
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers) as executor:
future_to = [executor.submit(
load_url,
self.url,
scattered.SerializeToString()
) for scattered in self.scatter(message)]
for future in concurrent.futures.as_completed(future_to):
try:
feedback = future.result()
except Exception as exc:
self.logger.error('HttpConnection generated an error')
lines = traceback.format_exception(*sys.exc_info())
self.logger.exception(lines[0])
feedback = None
if self.reply:
feedback = self._translate_to_broker(feedback)
self.handle_feedback(feedback)
if feedback:
self.broker.send(self.reply_feedback())
else:
self.broker.send(message_data)