mirror of https://github.com/CIRCL/AIL-framework
				
				
				
			
		
			
				
	
	
		
			159 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			159 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| #!/usr/bin/python2.7
 | |
| """
 | |
| The ``ZMQ PubSub`` Modules
 | |
| ==========================
 | |
| 
 | |
| """
 | |
| 
 | |
| import zmq, ConfigParser, redis, pprint, os, sys
 | |
| #from pubsublogger import publisher
 | |
| 
 | |
| class PubSub(object):
 | |
|     """
 | |
|     The PubSub class is a ``Virtual Class`` which regroup the shared attribute
 | |
|     of a Publisher ZeroMQ and a Subcriber ZeroMQ
 | |
| 
 | |
|     :param file_conf: -- (str) The filepath of the configuration file used (.cfg)
 | |
|     :param log_channel: -- (str) The channel used as a log channel
 | |
|     :param ps_name: -- (str) The "ID" of the Publisher/Subcriber
 | |
| 
 | |
|     :return: PubSub Object
 | |
| 
 | |
|     ..note:: The ps_name was implemented to separate subscriber queues in redis
 | |
|     when they are listening on a same "stream"
 | |
|     ..seealso:: Method of the ZMQSub class
 | |
| 
 | |
|     ..todo:: Create Implementing a log channel as an attribute of this virtual class.
 | |
| 
 | |
|     """
 | |
|     def __init__(self, file_conf, log_channel, ps_name):
 | |
|         self._ps_name = ps_name
 | |
|         self._config_parser = ConfigParser.ConfigParser()
 | |
|         self._config_file = file_conf # "./packages/config.cfg"
 | |
| 
 | |
|         self._config_parser.read(self._config_file)
 | |
| 
 | |
|         self._context_zmq = zmq.Context()
 | |
| 
 | |
|         #self._logging_publisher_channel = log_channel # "Default"
 | |
|         #publisher.channel(self._logging_publisher_channel)
 | |
| 
 | |
| 
 | |
| class ZMQPub(PubSub):
 | |
|     """
 | |
|     This class create a ZMQ Publisher which is able to send_message to a choosen socket.
 | |
| 
 | |
|     :param pub_config_section: -- (str) The name of the section in the config file to get the settings
 | |
| 
 | |
|     :return: ZMQPub Object
 | |
| 
 | |
|     :Example:
 | |
|     Extract of the config file:
 | |
|         [PubSub_Categ]
 | |
|         adress = tcp://127.0.0.1:5003
 | |
| 
 | |
|     Creating the object and sending message:
 | |
|         MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg')
 | |
| 
 | |
|         msg = "categ1"+" "+"Im the data sent on the categ1 channel"
 | |
|         MyPublisher.send_message(msg)
 | |
| 
 | |
|     ..note:: The ps_name attribute for a publisher is "optionnal" but still required to be
 | |
|     instantiated correctly.
 | |
| 
 | |
|     """
 | |
|     def __init__(self, file_conf, pub_config_section, ps_name):
 | |
|         super(ZMQPub, self).__init__(file_conf, "Default", ps_name)
 | |
| 
 | |
|         self._pub_config_section = pub_config_section
 | |
|         self._pubsocket = self._context_zmq.socket(zmq.PUB)
 | |
|         self._pub_adress = self._config_parser.get(self._pub_config_section, "adress")
 | |
| 
 | |
|         self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress"))
 | |
| 
 | |
|     def send_message(self, message):
 | |
|         """Send a message throught the publisher socket"""
 | |
|         self._pubsocket.send(message)
 | |
| 
 | |
| 
 | |
| class ZMQSub(PubSub):
 | |
|     """
 | |
|     This class create a ZMQ Subcriber which is able to receive message directly or
 | |
|     throught redis as a buffer.
 | |
| 
 | |
|     The redis buffer is usefull when the subcriber do a time consuming job which is
 | |
|     desynchronising it from the stream of data received.
 | |
|     The redis buffer ensures that no data will be loss waiting to be processed.
 | |
| 
 | |
|     :param sub_config_section: -- (str) The name of the section in the config file to get the settings
 | |
|     :param channel: -- (str) The listening channel of the Subcriber.
 | |
| 
 | |
|     :return: ZMQSub Object
 | |
| 
 | |
|     :Example:
 | |
|     Extract of the config file:
 | |
|         [PubSub_Global]
 | |
|         adress = tcp://127.0.0.1:5000
 | |
|         channel = filelist
 | |
| 
 | |
|     Creating the object and receiving data + pushing to redis (redis buffering):
 | |
| 
 | |
|         r_serv = redis.StrictRedis(
 | |
|             host = 127.0.0.1,
 | |
|             port = 6387,
 | |
|             db = 0)
 | |
| 
 | |
|         channel = cfg.get("PubSub_Global", "channel")
 | |
|         MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate")
 | |
| 
 | |
|         while True:
 | |
|             MySubscriber.get_and_lpush(r_serv)
 | |
| 
 | |
| 
 | |
|     Inside another script use this line to retrive the data from redis.
 | |
|         ...
 | |
|         while True:
 | |
|             MySubscriber.get_msg_from_queue(r_serv)
 | |
|         ...
 | |
| 
 | |
|     ..note:: If you don't want any redis buffering simply use the "get_message" method
 | |
| 
 | |
|     """
 | |
|     def __init__(self, file_conf, sub_config_section, channel, ps_name):
 | |
|         super(ZMQSub, self).__init__(file_conf, "Default", ps_name)
 | |
| 
 | |
|         self._sub_config_section = sub_config_section
 | |
|         self._subsocket = self._context_zmq.socket(zmq.SUB)
 | |
|         self._sub_adress = self._config_parser.get(self._sub_config_section, "adress")
 | |
| 
 | |
|         self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress"))
 | |
| 
 | |
|         self._channel = channel
 | |
|         self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel)
 | |
| 
 | |
|     def get_message(self):
 | |
|         """
 | |
|         Get the first sent message from a Publisher.
 | |
|         :return: (str) Message from Publisher
 | |
| 
 | |
|         """
 | |
|         return self._subsocket.recv()
 | |
| 
 | |
|     def get_and_lpush(self, r_serv):
 | |
|         """
 | |
|         Get the first sent message from a Publisher and storing it in redis
 | |
| 
 | |
|         ..note:: This function also create a set named "queue" for monitoring needs
 | |
| 
 | |
|         """
 | |
|         r_serv.sadd("queues",self._channel+self._ps_name)
 | |
|         r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv())
 | |
| 
 | |
|     def get_msg_from_queue(self, r_serv):
 | |
|         """
 | |
|         Get the first sent message from a Redis List
 | |
| 
 | |
|         :return: (str) Message from Publisher
 | |
| 
 | |
|         """
 | |
|         return r_serv.rpop(self._channel+self._ps_name) |