From f1753d67c6779257483c9f2a7110573fb44b7e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 18 Aug 2014 18:35:08 +0200 Subject: [PATCH] Cleanup the queues. --- .gitignore | 6 ++++ bin/Helper.py | 61 ++++++++++++++++++++++++++++++++++++ bin/ZMQ_Feed_Q.py | 47 ++++++--------------------- bin/ZMQ_PubSub_Categ_Q.py | 49 ++++++----------------------- bin/ZMQ_PubSub_Lines_Q.py | 49 ++++++----------------------- bin/ZMQ_PubSub_Tokenize_Q.py | 50 ++++++----------------------- bin/ZMQ_Sub_Attributes_Q.py | 48 ++++++---------------------- bin/ZMQ_Sub_CreditCards_Q.py | 45 ++++++-------------------- bin/ZMQ_Sub_Curve_Q.py | 49 ++++++----------------------- bin/ZMQ_Sub_Duplicate_Q.py | 47 ++++++--------------------- bin/ZMQ_Sub_Indexer_Q.py | 50 ++++++----------------------- bin/ZMQ_Sub_Mails_Q.py | 46 ++++++--------------------- bin/ZMQ_Sub_Onion_Q.py | 45 ++++++-------------------- bin/ZMQ_Sub_Urls_Q.py | 45 ++++++-------------------- bin/packages/ZMQ_PubSub.py | 24 ++++++-------- bin/packages/config.cfg | 6 +++- 16 files changed, 193 insertions(+), 474 deletions(-) create mode 100644 .gitignore create mode 100755 bin/Helper.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..a27512e0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.swp + +# Install Dirs +AILENV +redis-leveldb +redis diff --git a/bin/Helper.py b/bin/Helper.py new file mode 100755 index 00000000..ab8a896f --- /dev/null +++ b/bin/Helper.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* +""" +Queue helper module +============================ + +This module subscribe to a Publisher stream and put the received messages +into a Redis-list waiting to be popped later by others scripts. + +..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put +the same Subscriber name in both of them. + +Requirements +------------ + +*Running Redis instances. +*Should register to the Publisher "ZMQ_PubSub_Line" channel 1 + +""" +import redis +import ConfigParser +import os +from packages import ZMQ_PubSub + + +class Queues(object): + + def __init__(self): + configfile = os.join(os.environ('AIL_BIN'), 'packages/config.cfg') + if not os.exists(configfile): + raise Exception('Unable to find the configuration file. Did you set environment variables? Or activate the virtualenv.') + self.config = ConfigParser.ConfigParser() + self.config.read(self.configfile) + + def _queue_init_redis(self): + config_section = "Redis_Queues" + self.r_queues = redis.StrictRedis( + host=self.config.get(config_section, "host"), + port=self.config.getint(config_section, "port"), + db=self.config.getint(config_section, "db")) + + def _queue_shutdown(self): + # FIXME: Why not just a key? + if self.r_queues.sismember("SHUTDOWN_FLAGS", "Feed_Q"): + self.r_queues.srem("SHUTDOWN_FLAGS", "Feed_Q") + return True + return False + + def queue_subscribe(self, publisher, config_section, channel, + subscriber_name): + channel = self.config.get(config_section, channel) + zmq_sub = ZMQ_PubSub.ZMQSub(self.config, config_section, + channel, subscriber_name) + publisher.info("""Suscribed to channel {}""".format(channel)) + self._queue_init_redis() + while True: + zmq_sub.get_and_lpush(self.r_queues) + if self._queues_shutdown(): + print "Shutdown Flag Up: Terminating" + publisher.warning("Shutdown Flag Up: Terminating.") + break diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py index ab9ed09a..a81d1d57 100755 --- a/bin/ZMQ_Feed_Q.py +++ b/bin/ZMQ_Feed_Q.py @@ -20,45 +20,18 @@ Requirements "channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - sub = ZMQ_PubSub.ZMQSub(configfile, "Feed", channel, "feed") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py index 45e0b563..c0651565 100755 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ b/bin/ZMQ_PubSub_Categ_Q.py @@ -17,47 +17,18 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = 'Queuing' + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'categ' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py index 61f29443..fe7cd606 100755 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ b/bin/ZMQ_PubSub_Lines_Q.py @@ -18,47 +18,16 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Lines_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Lines_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = "PubSub_Global" + config_channel = 'channel' + subscriber_name = 'line' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py index c5a5791f..73459e49 100755 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ b/bin/ZMQ_PubSub_Tokenize_Q.py @@ -17,48 +17,18 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Line" channel 1 """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py index 4396a6bc..5d0c2cd5 100755 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ b/bin/ZMQ_Sub_Attributes_Q.py @@ -18,47 +18,17 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Attributes_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Attributes_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py index 7ef4a9b9..5df01cf8 100755 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ b/bin/ZMQ_Sub_CreditCards_Q.py @@ -1,44 +1,17 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("creditcard_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'creditcard_categ' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py index ba6aa67c..aed0caad 100755 --- a/bin/ZMQ_Sub_Curve_Q.py +++ b/bin/ZMQ_Sub_Curve_Q.py @@ -17,47 +17,18 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'curve' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py index 1f4b8ef6..2a9a4212 100755 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ b/bin/ZMQ_Sub_Duplicate_Q.py @@ -1,45 +1,16 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, "duplicate") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Duplicate_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Duplicate_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = 'Queuing' + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py index 004c0d57..65d951e7 100755 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -12,49 +12,17 @@ handling the indexing process of the files seen. """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - publisher.info("""Suscribed to channel {0}""".format(channel)) - - # Until the service is requested to be shutdown, the service - # will get the data from the global ZMQ queue and buffer it in Redis. - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Indexer_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Indexer_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py index d57e63c6..044fc352 100755 --- a/bin/ZMQ_Sub_Mails_Q.py +++ b/bin/ZMQ_Sub_Mails_Q.py @@ -1,44 +1,16 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("mails_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'mails_categ' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py index c1f559da..5668240e 100755 --- a/bin/ZMQ_Sub_Onion_Q.py +++ b/bin/ZMQ_Sub_Onion_Q.py @@ -17,44 +17,17 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Categ" """ -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("onion_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'onion_categ' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py index 4d4e2931..0bdeed66 100755 --- a/bin/ZMQ_Sub_Urls_Q.py +++ b/bin/ZMQ_Sub_Urls_Q.py @@ -1,44 +1,17 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "web_categ", "urls") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("web_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_3' + subscriber_name = 'web_categ' + + h = Helper.Queues() + h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) diff --git a/bin/packages/ZMQ_PubSub.py b/bin/packages/ZMQ_PubSub.py index b7c65231..14b20440 100755 --- a/bin/packages/ZMQ_PubSub.py +++ b/bin/packages/ZMQ_PubSub.py @@ -6,7 +6,6 @@ The ``ZMQ PubSub`` Modules """ import zmq -import ConfigParser class PubSub(object): @@ -14,7 +13,7 @@ class PubSub(object): The PubSub class is a ``Virtual Class`` which regroup the shared attribute of a Publisher ZeroMQ and a Subcriber ZeroMQ - :param file_conf: -- (str) The filepath of the configuration file used (.cfg) + :param config: -- (ConfigParser) Handle on the parsed config file :param log_channel: -- (str) The channel used as a log channel :param ps_name: -- (str) The "ID" of the Publisher/Subcriber @@ -27,18 +26,13 @@ class PubSub(object): ..todo:: Create Implementing a log channel as an attribute of this virtual class. """ - def __init__(self, file_conf, log_channel, ps_name): + def __init__(self, config, log_channel, ps_name): self._ps_name = ps_name - self._config_parser = ConfigParser.ConfigParser() - self._config_file = file_conf # "./packages/config.cfg" - self._config_parser.read(self._config_file) + self._config_parser = config self._context_zmq = zmq.Context() - # self._logging_publisher_channel = log_channel # "Default" - # publisher.channel(self._logging_publisher_channel) - class ZMQPub(PubSub): """ @@ -63,14 +57,14 @@ class ZMQPub(PubSub): instantiated correctly. """ - def __init__(self, file_conf, pub_config_section, ps_name): - super(ZMQPub, self).__init__(file_conf, "Default", ps_name) + def __init__(self, config, pub_config_section, ps_name): + super(ZMQPub, self).__init__(config, "Default", ps_name) self._pub_config_section = pub_config_section self._pubsocket = self._context_zmq.socket(zmq.PUB) self._pub_adress = self._config_parser.get(self._pub_config_section, "adress") - self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress")) + self._pubsocket.bind(self._pub_adress) def send_message(self, message): """Send a message throught the publisher socket""" @@ -120,14 +114,14 @@ class ZMQSub(PubSub): ..note:: If you don't want any redis buffering simply use the "get_message" method """ - def __init__(self, file_conf, sub_config_section, channel, ps_name): - super(ZMQSub, self).__init__(file_conf, "Default", ps_name) + def __init__(self, config, sub_config_section, channel, ps_name): + super(ZMQSub, self).__init__(config, "Default", ps_name) self._sub_config_section = sub_config_section self._subsocket = self._context_zmq.socket(zmq.SUB) self._sub_adress = self._config_parser.get(self._sub_config_section, "adress") - self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress")) + self._subsocket.connect(self._sub_adress) self._channel = channel self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel) diff --git a/bin/packages/config.cfg b/bin/packages/config.cfg index fd43b98b..5ef8a9a5 100644 --- a/bin/packages/config.cfg +++ b/bin/packages/config.cfg @@ -54,7 +54,11 @@ channel_0 = words [PubSub_Categ] adress = tcp://127.0.0.1:5003 -#Channels are dynamic (1 channel per categ) +channel_0 = cards +channel_1 = emails +channel_2 = tor +channel_3 = urls +#Channels are dynamic (1 channel per categ) <= FIXME: no it's not. [PubSub_Url] adress = tcp://127.0.0.1:5004