mirror of https://github.com/CIRCL/AIL-framework
88 lines
3.2 KiB
Python
Executable File
88 lines
3.2 KiB
Python
Executable File
#!/usr/bin/env python2
|
|
# -*-coding:UTF-8 -*
|
|
"""
|
|
Queue helper module
|
|
============================
|
|
|
|
This module subscribe to a Publisher stream and put the received messages
|
|
into a Redis-list waiting to be popped later by others scripts.
|
|
|
|
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
|
the same Subscriber name in both of them.
|
|
|
|
"""
|
|
import redis
|
|
import ConfigParser
|
|
import os
|
|
import zmq
|
|
|
|
|
|
class Redis_Queues(object):
|
|
|
|
def __init__(self, conf_section, conf_channel, subscriber_name):
|
|
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
|
print configfile
|
|
if not os.path.exists(configfile):
|
|
raise Exception('Unable to find the configuration file. \
|
|
Did you set environment variables? \
|
|
Or activate the virtualenv.')
|
|
self.config = ConfigParser.ConfigParser()
|
|
self.config.read(configfile)
|
|
self.subscriber_name = subscriber_name
|
|
|
|
self.sub_channel = self.config.get(conf_section, conf_channel)
|
|
self.redis_channel = self.sub_channel + self.subscriber_name
|
|
|
|
# Redis Queue
|
|
config_section = "Redis_Queues"
|
|
self.r_queues = redis.StrictRedis(
|
|
host=self.config.get(config_section, "host"),
|
|
port=self.config.getint(config_section, "port"),
|
|
db=self.config.getint(config_section, "db"))
|
|
|
|
def zmq_sub(self, conf_section):
|
|
sub_address = self.config.get(conf_section, 'adress')
|
|
context = zmq.Context()
|
|
self.sub_socket = context.socket(zmq.SUB)
|
|
self.sub_socket.connect(sub_address)
|
|
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
|
|
|
|
def zmq_pub(self, config_section, config_channel):
|
|
context = zmq.Context()
|
|
self.pub_socket = context.socket(zmq.PUB)
|
|
self.pub_socket.bind(self.config.get(config_section, 'adress'))
|
|
if config_channel is not None:
|
|
self.pub_channel = self.config.get(config_section, config_channel)
|
|
else:
|
|
# The publishing channel is defined dynamically
|
|
self.pub_channel = None
|
|
|
|
def zmq_pub_send(self, msg):
|
|
if self.pub_channel is None:
|
|
raise Exception('A channel is reqired to send a message.')
|
|
self.pub_socket.send('{} {}'.format(self.pub_channel, msg))
|
|
|
|
def redis_rpop(self):
|
|
return self.r_queues.rpop(self.redis_channel)
|
|
|
|
def redis_queue_shutdown(self, is_queue=False):
|
|
if is_queue:
|
|
flag = self.subscriber_name + '_Q'
|
|
else:
|
|
flag = self.subscriber_name
|
|
# srem returns False if the element does not exists
|
|
return self.r_queues.srem('SHUTDOWN_FLAGS', flag)
|
|
|
|
def redis_queue_subscribe(self, publisher):
|
|
publisher.info("Suscribed to channel {}".format(self.sub_channel))
|
|
while True:
|
|
msg = self.sub_socket.recv()
|
|
p = self.r_queues.pipeline()
|
|
p.sadd("queues", self.redis_channel)
|
|
p.lpush(self.redis_channel, msg)
|
|
p.execute()
|
|
if self.redis_queue_shutdown(True):
|
|
print "Shutdown Flag Up: Terminating"
|
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
|
break
|