diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..6c5564af --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +*.swp + +# Install Dirs +AILENV +redis-leveldb +redis + +# Local config +bin/packages/config.cfg diff --git a/bin/Helper.py b/bin/Helper.py new file mode 100755 index 00000000..8e3bade2 --- /dev/null +++ b/bin/Helper.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* +""" +Queue helper module +============================ + +This module subscribe to a Publisher stream and put the received messages +into a Redis-list waiting to be popped later by others scripts. + +..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put +the same Subscriber name in both of them. + +""" +import redis +import ConfigParser +import os +import zmq + + +class Redis_Queues(object): + + def __init__(self, conf_section, conf_channel, subscriber_name): + configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + self.config = ConfigParser.ConfigParser() + self.config.read(configfile) + self.subscriber_name = subscriber_name + + self.sub_channel = self.config.get(conf_section, conf_channel) + + # Redis Queue + config_section = "Redis_Queues" + self.r_queues = redis.StrictRedis( + host=self.config.get(config_section, "host"), + port=self.config.getint(config_section, "port"), + db=self.config.getint(config_section, "db")) + + def zmq_sub(self, conf_section): + sub_address = self.config.get(conf_section, 'adress') + context = zmq.Context() + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(sub_address) + self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) + + def zmq_pub(self, config_section, config_channel): + context = zmq.Context() + self.pub_socket = context.socket(zmq.PUB) + self.pub_socket.bind(self.config.get(config_section, 'adress')) + if config_channel is not None: + self.pub_channel = self.config.get(config_section, config_channel) + else: + # The publishing channel is defined dynamically + self.pub_channel = None + + def zmq_pub_send(self, msg): + if self.pub_channel is None: + raise Exception('A channel is reqired to send a message.') + self.pub_socket.send('{} {}'.format(self.pub_channel, msg)) + + def redis_rpop(self): + return self.r_queues.rpop(self.sub_channel + self.subscriber_name) + + def redis_queue_shutdown(self, is_queue=False): + if is_queue: + flag = self.subscriber_name + '_Q' + else: + flag = self.subscriber_name + # srem returns False if the element does not exists + return self.r_queues.srem('SHUTDOWN_FLAGS', flag) + + def redis_queue_subscribe(self, publisher): + self.redis_channel = self.sub_channel + self.subscriber_name + publisher.info("Suscribed to channel {}".format(self.sub_channel)) + while True: + msg = self.sub_socket.recv() + p = self.r_queues.pipeline() + p.sadd("queues", self.redis_channel) + p.lpush(self.redis_channel, msg) + p.execute() + if self.redis_queue_shutdown(True): + print "Shutdown Flag Up: Terminating" + publisher.warning("Shutdown Flag Up: Terminating.") + break diff --git a/bin/Shutdown.py b/bin/Shutdown.py index e2474c32..f197e5c8 100755 --- a/bin/Shutdown.py +++ b/bin/Shutdown.py @@ -38,6 +38,7 @@ def main(): port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + # FIXME: automatic based on the queue name. # ### SCRIPTS #### r_serv.sadd("SHUTDOWN_FLAGS", "Feed") r_serv.sadd("SHUTDOWN_FLAGS", "Categ") diff --git a/bin/ZMQ_Feed.py b/bin/ZMQ_Feed.py index be1e3eff..eeee987e 100755 --- a/bin/ZMQ_Feed.py +++ b/bin/ZMQ_Feed.py @@ -20,50 +20,34 @@ Requirements *Need the ZMQ_Feed_Q Module running to be able to work properly. """ -import redis -import ConfigParser import base64 import os import time from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' - # REDIS - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - - # Subscriber - subscriber_name = "feed" - subscriber_config_section = "Feed" # Publisher - publisher_name = "pubfed" - publisher_config_section = "PubSub_Global" - - Sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - PubGlob = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + pub_config_section = "PubSub_Global" + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) # LOGGING # - publisher.channel = "Script" publisher.info("Feed Script started to receive & publish.") while True: - message = Sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() # Recovering the streamed message informations. if message is not None: if len(message.split()) == 3: @@ -75,8 +59,7 @@ def main(): publisher.debug("Empty Paste: {0} not processed".format(paste)) continue else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -84,24 +67,13 @@ def main(): time.sleep(10) continue # Creating the full filepath - filename = cfg.get("Directories", "pastes") + paste + filename = os.path.join(os.environ('AIL_BIN'), + h.config.get("Directories", "pastes"), paste) + dirname = os.path.dirname(filename) + if not os.path.exists(dirname): + os.makedirs(dirname) - if not os.path.exists(filename.rsplit("/", 1)[0]): - os.makedirs(filename.rsplit("/", 1)[0]) - else: - # Path already existing - pass + with open(filename, 'wb') as f: + f.write(base64.standard_b64decode(gzip64encoded)) - decoded_gzip = base64.standard_b64decode(gzip64encoded) - # paste, zlib.decompress(decoded_gzip, zlib.MAX_WBITS|16) - - with open(filename, 'wb') as F: - F.write(decoded_gzip) - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - PubGlob.send_message(msg) - publisher.debug("{0} Published".format(msg)) - - -if __name__ == "__main__": - main() + h.zmq_pub_send(filename) diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py index ab9ed09a..ba7c4a2d 100755 --- a/bin/ZMQ_Feed_Q.py +++ b/bin/ZMQ_Feed_Q.py @@ -20,45 +20,19 @@ Requirements "channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - sub = ZMQ_PubSub.ZMQSub(configfile, "Feed", channel, "feed") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' + + h = Helper.Redis_Queues(subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Categ.py b/bin/ZMQ_PubSub_Categ.py index 71477b9a..fa0ef533 100755 --- a/bin/ZMQ_PubSub_Categ.py +++ b/bin/ZMQ_PubSub_Categ.py @@ -36,111 +36,78 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis +import glob +import os import argparse -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher from packages import Paste -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'pubcateg' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Categ' + + h.zmq_pub(pub_config_section, None) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='This script is a part of the Analysis Information \ + Leak framework.') parser.add_argument( - '-l', type=str, default="../files/list_categ_files", - help='Path to the list_categ_files (../files/list_categ_files)', + '-d', type=str, default="../files/", + help='Path to the directory containing the category files.', action='store') args = parser.parse_args() - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - publisher_name = "pubcateg" - publisher_config_section = "PubSub_Categ" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, - subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, - publisher_name) - # FUNCTIONS # - publisher.info("Script Categ subscribed to channel {0}".format( - cfg.get("PubSub_Words", "channel_0"))) + publisher.info( + "Script Categ subscribed to channel {}".format(h.sub_channel)) - with open(args.l, 'rb') as L: - tmp_dict = {} + tmp_dict = {} + for filename in glob.glob(args.d): + bname = os.path.basename(filename) + tmp_dict[bname] = [] + with open(filename, 'r') as f: + for l in f: + tmp_dict[bname].append(l.strip()) - for num, fname in enumerate(L): - # keywords temp list - tmp_list = [] - - with open(fname[:-1], 'rb') as LS: - - for num, kword in enumerate(LS): - tmp_list.append(kword[:-1]) - - tmp_dict[fname.split('/')[-1][:-1]] = tmp_list - - message = sub.get_msg_from_queue(r_serv) prec_filename = None while True: + message = h.redis_rpop() if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) + prec_filename = filename - prec_filename = filename + for categ, words_list in tmp_dict.items(): - for categ, list in tmp_dict.items(): - - if word.lower() in list: - channel = categ - msg = channel+" "+PST.p_path+" "+word+" "+score - pub.send_message(msg) - # dico_categ.add(categ) + if word.lower() in words_list: + h.pub_channel = categ + h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score)) publisher.info( 'Categ;{};{};{};Detected {} "{}"'.format( PST.p_source, PST.p_date, PST.p_name, score, word)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Categ is Idling 10s") time.sleep(10) - - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py index 45e0b563..7780dd2f 100755 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ b/bin/ZMQ_PubSub_Categ_Q.py @@ -17,47 +17,19 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = 'Queuing' + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Lines.py b/bin/ZMQ_PubSub_Lines.py index d13103c6..1cb38fe5 100755 --- a/bin/ZMQ_PubSub_Lines.py +++ b/bin/ZMQ_PubSub_Lines.py @@ -5,10 +5,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q +Module. -It perform a sorting on the line's length and publish/forward them to differents -channels: +It perform a sorting on the line's length and publish/forward them to +differents channels: *Channel 1 if max length(line) < max *Channel 2 if max length(line) > max @@ -28,79 +29,62 @@ Requirements """ import redis import argparse -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'line' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Longlines' + h.zmq_pub(pub_config_section, None) + + # Subscriber + h.zmq_sub(config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='''This script is a part of the Analysis Information \ + Leak framework.''') - parser.add_argument('-max', type=int, default=500, - help='The limit between "short lines" and "long lines" (500)', - action='store') + parser.add_argument( + '-max', type=int, default=500, + help='The limit between "short lines" and "long lines"', + action='store') args = parser.parse_args() # REDIS # + # FIXME move it in the Paste object r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - subscriber_config_section = "PubSub_Global" - - # Publisher - publisher_config_section = "PubSub_Longlines" - publisher_name = "publine" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) - - channel_0 = cfg.get("PubSub_Longlines", "channel_0") - channel_1 = cfg.get("PubSub_Longlines", "channel_1") + channel_0 = h.config.get("PubSub_Longlines", "channel_0") + channel_1 = h.config.get("PubSub_Longlines", "channel_1") # FUNCTIONS # - tmp_string = "Lines script Subscribed to channel {} and Start to publish on channel {}, {}" - publisher.info(tmp_string.format( - cfg.get("PubSub_Global", "channel"), - cfg.get("PubSub_Longlines", "channel_0"), - cfg.get("PubSub_Longlines", "channel_1"))) + tmp_string = "Lines script Subscribed to channel {} and Start to publish \ + on channel {}, {}" + publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1)) while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Lines"): - r_serv1.srem("SHUTDOWN_FLAGS", "Lines") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -111,18 +95,14 @@ def main(): lines_infos = PST.get_lines_info() PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0]) - PST.save_attribute_redis(r_serv, "p_max_length_line", lines_infos[1]) + PST.save_attribute_redis(r_serv, "p_max_length_line", + lines_infos[1]) r_serv.sadd("Pastes_Objects", PST.p_path) if lines_infos[1] >= args.max: - msg = channel_0+" "+PST.p_path + h.pub_channel = channel_0 else: - msg = channel_1+" "+PST.p_path - - pub.send_message(msg) + h.pub_channel = channel_1 + h.zmq_pub_send(PST.p_path) except IOError: print "CRC Checksum Error on : ", PST.p_path - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py index 61f29443..abf1fd35 100755 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ b/bin/ZMQ_PubSub_Lines_Q.py @@ -18,47 +18,17 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Lines_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Lines_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = "PubSub_Global" + config_channel = 'channel' + subscriber_name = 'line' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Tokenize.py b/bin/ZMQ_PubSub_Tokenize.py index 701968d0..2fe3de5b 100755 --- a/bin/ZMQ_PubSub_Tokenize.py +++ b/bin/ZMQ_PubSub_Tokenize.py @@ -4,9 +4,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q +Module. -It tokenize the content of the paste and publish the result in the following format: +It tokenize the content of the paste and publish the result in the following +format: channel_name+' '+/path/of/the/paste.gz+' '+tokenized_word+' '+scoring ..seealso:: Paste method (_get_top_words) @@ -21,72 +23,43 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Publisher - publisher_config_section = "PubSub_Words" - publisher_name = "pubtokenize" + pub_config_section = 'PubSub_Words' + pub_config_channel = 'channel_0' + h.zmq_pub(pub_config_section, pub_config_channel) - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) - - channel_0 = cfg.get("PubSub_Words", "channel_0") - - # FUNCTIONS # - publisher.info("Tokeniser subscribed to channel {0}".format(cfg.get("PubSub_Longlines", "channel_1"))) + # LOGGING # + publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel)) while True: - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() print message if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + paste = Paste.Paste(message.split(" ", -1)[-1]) + for word, score in paste._get_top_words().items(): + if len(word) >= 4: + h.zmq_pub_send('{} {} {}'.format(paste.p_path, word, + score)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Tokeniser is idling 10s") time.sleep(10) print "sleepin" - continue - - for word, score in PST._get_top_words().items(): - if len(word) >= 4: - msg = channel_0+' '+PST.p_path+' '+str(word)+' '+str(score) - pub.send_message(msg) - print msg - else: - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py index c5a5791f..7594c37a 100755 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ b/bin/ZMQ_PubSub_Tokenize_Q.py @@ -17,48 +17,19 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Line" channel 1 """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Pub_Global.py b/bin/ZMQ_Pub_Global.py index d47ee730..7ac5ede7 100755 --- a/bin/ZMQ_Pub_Global.py +++ b/bin/ZMQ_Pub_Global.py @@ -21,49 +21,33 @@ Requirements *Need running Redis instances. (Redis) """ -import redis -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read('./packages/config.cfg') - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Global" - # ZMQ # - pub_glob = ZMQ_PubSub.ZMQPub(configfile, "PubSub_Global", "global") + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'global' - # FONCTIONS # + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Global' + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) + + # LOGGING # publisher.info("Starting to publish.") while True: - filename = r_serv.lpop("filelist") + filename = h.redis_rpop() if filename is not None: - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - pub_glob.send_message(msg) - publisher.debug("{0} Published".format(msg)) + h.zmq_pub_send(filename) else: time.sleep(10) publisher.debug("Nothing to publish") - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Attributes.py b/bin/ZMQ_Sub_Attributes.py index e8a59071..4e36a7bc 100755 --- a/bin/ZMQ_Sub_Attributes.py +++ b/bin/ZMQ_Sub_Attributes.py @@ -27,56 +27,41 @@ Requirements """ import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # REDIS # r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("""ZMQ Attribute is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Attributes"): - r_serv1.srem("SHUTDOWN_FLAGS", "Attributes") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -95,9 +80,6 @@ def main(): PST.save_all_attributes_redis(r_serv) except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed")) - pass - - -if __name__ == "__main__": - main() + publisher.error('{0};{1};{2};{3};{4}'.format( + "Duplicate", PST.p_source, PST.p_date, PST.p_name, + "CRC Checksum Failed")) diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py index 4396a6bc..6117174d 100755 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ b/bin/ZMQ_Sub_Attributes_Q.py @@ -18,47 +18,18 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Attributes_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Attributes_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_CreditCards.py b/bin/ZMQ_Sub_CreditCards.py index 54021144..084a7b64 100755 --- a/bin/ZMQ_Sub_CreditCards.py +++ b/bin/ZMQ_Sub_CreditCards.py @@ -1,53 +1,44 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher +import Helper -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'cards' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("Creditcard script subscribed to channel creditcard_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?" # mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}" # visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}" - # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|[3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" + # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]| + # [3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" # jcb_regex = "35(?:2[89]|[3-8]\d)([\ \-]?)\d{4}\1\d{4}\1\d{4}" # amex_regex = "3[47]\d\d([\ \-]?)\d{6}\1\d{5}" # chinaUP_regex = "62[0-5]\d{13,16}" @@ -69,25 +60,22 @@ def main(): PST.save_attribute_redis(r_serv1, channel, creditcard_set) pprint.pprint(creditcard_set) - to_print = 'CreditCard;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'CreditCard;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if (len(creditcard_set) > 0): - publisher.critical('{}Checked {} valid number(s)'.format(to_print, len(creditcard_set))) + publisher.critical('{}Checked {} valid number(s)'.format( + to_print, len(creditcard_set))) else: publisher.info('{}CreditCard related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script creditcard is idling 1m") time.sleep(60) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py index 7ef4a9b9..46bc33e9 100755 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ b/bin/ZMQ_Sub_CreditCards_Q.py @@ -1,44 +1,18 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("creditcard_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'creditcard_categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Curve.py b/bin/ZMQ_Sub_Curve.py index 7a065afb..5a0b94e1 100755 --- a/bin/ZMQ_Sub_Curve.py +++ b/bin/ZMQ_Sub_Curve.py @@ -6,7 +6,8 @@ The ZMQ_Sub_Curve Module This module is consuming the Redis-list created by the ZMQ_Sub_Curve_Q Module. -This modules update a .csv file used to draw curves representing selected words and their occurency per day. +This modules update a .csv file used to draw curves representing selected +words and their occurency per day. ..note:: The channel will have the name of the file created. @@ -22,72 +23,60 @@ Requirements """ import redis -import ConfigParser import time -from packages import Paste as P -from packages import ZMQ_PubSub +from packages import Paste from pubsublogger import publisher from packages import lib_words -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=cfg.get("Redis_Level_DB", "port"), - db=0) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") + config_section = 'PubSub_Words' + config_channel = 'channel_0' subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Level_DB", "host"), + port=h.config.get("Redis_Level_DB", "port"), + db=h.config.get("Redis_Level_DB", "db")) # FUNCTIONS # - publisher.info("Script Curve subscribed to channel {0}".format(cfg.get("PubSub_Words", "channel_0"))) + publisher.info("Script Curve subscribed to {}".format(h.sub_channel)) # FILE CURVE SECTION # - csv_path = cfg.get("Directories", "wordtrending_csv") - wordfile_path = cfg.get("Directories", "wordsfile") + csv_path = h.config.get("Directories", "wordtrending_csv") + wordfile_path = h.config.get("Directories", "wordsfile") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None while True: if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - lib_words.create_curve_with_word_file(r_serv1, csv_path, wordfile_path, int(PST.p_date.year), int(PST.p_date.month)) + PST = Paste.Paste(filename) + lib_words.create_curve_with_word_file( + r_serv1, csv_path, wordfile_path, int(PST.p_date.year), + int(PST.p_date.month)) prec_filename = filename prev_score = r_serv1.hget(word.lower(), PST.p_date) print prev_score if prev_score is not None: - r_serv1.hset(word.lower(), PST.p_date, int(prev_score) + int(score)) + r_serv1.hset(word.lower(), PST.p_date, + int(prev_score) + int(score)) else: r_serv1.hset(word.lower(), PST.p_date, score) - # r_serv.expire(word,86400) #1day else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -95,8 +84,4 @@ def main(): print "sleepin" time.sleep(1) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py index ba6aa67c..572c9c79 100755 --- a/bin/ZMQ_Sub_Curve_Q.py +++ b/bin/ZMQ_Sub_Curve_Q.py @@ -17,47 +17,19 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'curve' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Duplicate.py b/bin/ZMQ_Sub_Duplicate.py index 5bcaf4c7..ccaf6f7d 100755 --- a/bin/ZMQ_Sub_Duplicate.py +++ b/bin/ZMQ_Sub_Duplicate.py @@ -13,61 +13,51 @@ Requirements: """ import redis -import ConfigParser import os import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from pybloomfilter import BloomFilter -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # REDIS # - # DB QUEUE ( MEMORY ) - r_Q_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - r_serv_merge = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # REDIS # # DB OBJECT & HASHS ( DISK ) + # FIXME increase flexibility dico_redis = {} for year in xrange(2013, 2015): for month in xrange(0, 16): dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=year, + host=h.config.get("Redis_Level_DB", "host"), port=year, db=month) - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "duplicate" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Script duplicate subscribed to channel {0}""".format(cfg.get("PubSub_Global", "channel"))) + publisher.info("""Script duplicate subscribed to channel {0}""".format( + h.config.get("PubSub_Global", "channel"))) set_limit = 100 + bloompath = os.path.join(os.environ('AIL_BIN'), + h.config.get("Directories", "bloomfilters")) + bloop_path_set = set() while True: try: super_dico = {} @@ -77,15 +67,14 @@ def main(): x = time.time() - message = sub.get_msg_from_queue(r_Q_serv) + message = h.redis_rpop() if message is not None: path = message.split(" ", -1)[-1] PST = Paste.Paste(path) else: publisher.debug("Script Attribute is idling 10s") time.sleep(10) - if r_Q_serv.sismember("SHUTDOWN_FLAGS", "Duplicate"): - r_Q_serv.srem("SHUTDOWN_FLAGS", "Duplicate") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -97,19 +86,14 @@ def main(): r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month] # Creating the bloom filter name: bloomyyyymm - bloomname = 'bloom' + PST.p_date.year + PST.p_date.month - - bloompath = cfg.get("Directories", "bloomfilters") - - filebloompath = bloompath + bloomname - - # datetime.date(int(PST.p_date.year),int(PST.p_date.month),int(PST.p_date.day)).timetuple().tm_yday % 7 + filebloompath = os.path.join(bloompath, 'bloom' + PST.p_date.year + + PST.p_date.month) if os.path.exists(filebloompath): bloom = BloomFilter.open(filebloompath) else: bloom = BloomFilter(100000000, 0.01, filebloompath) - r_Q_serv.sadd("bloomlist", filebloompath) + bloop_path_set.add(filebloompath) # UNIQUE INDEX HASHS TABLE r_serv0 = dico_redis["201300"] @@ -121,45 +105,43 @@ def main(): # For each bloom filter opened_bloom = [] - for bloo in r_Q_serv.smembers("bloomlist"): + for bloo in bloop_path_set: # Opening blooms opened_bloom.append(BloomFilter.open(bloo)) # For each hash of the paste - for hash in PST._get_hash_lines(min=5, start=1, jump=0): + for line_hash in PST._get_hash_lines(min=5, start=1, jump=0): nb_hash_current += 1 # Adding the hash in Redis & limiting the set - if r_serv1.scard(hash) <= set_limit: - r_serv1.sadd(hash, index) - r_serv1.sadd("HASHS", hash) + if r_serv1.scard(line_hash) <= set_limit: + r_serv1.sadd(line_hash, index) + r_serv1.sadd("HASHS", line_hash) # Adding the hash in the bloom of the month - bloom.add(hash) + bloom.add(line_hash) # Go throught the Database of the bloom filter (of the month) for bloo in opened_bloom: - if hash in bloo: + if line_hash in bloo: db = bloo.name[-6:] - # Go throught the Database of the bloom filter (of the month) + # Go throught the Database of the bloom filter (month) r_serv_bloom = dico_redis[db] # set of index paste: set([1,2,4,65]) - hash_current = r_serv_bloom.smembers(hash) + hash_current = r_serv_bloom.smembers(line_hash) # removing itself from the list hash_current = hash_current - set([index]) - # if the hash is present at least in 1 files (already processed) + # if the hash is present at least in 1 files + # (already processed) if len(hash_current) != 0: - hash_dico[hash] = hash_current + hash_dico[line_hash] = hash_current # if there is data in this dictionnary if len(hash_dico) != 0: super_dico[index] = hash_dico - else: - # The hash is not in this bloom - pass - ########################################################################################### + ########################################################################### # if there is data in this dictionnary if len(super_dico) != 0: @@ -171,7 +153,8 @@ def main(): for p_fname in pset: occur_dico.setdefault(p_fname, 0) - # Count how much hash is similar per file occuring in the dictionnary + # Count how much hash is similar per file occuring + # in the dictionnary if occur_dico[p_fname] >= 0: occur_dico[p_fname] = occur_dico[p_fname] + 1 @@ -181,7 +164,8 @@ def main(): dupl.append((paste, percentage)) # Creating the object attribute and save it. - to_print = 'Duplicate;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Duplicate;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if dupl != []: PST.__setattr__("p_duplicate", dupl) PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl) @@ -193,7 +177,3 @@ def main(): except IOError: print "CRC Checksum Failed on :", PST.p_path publisher.error('{}CRC Checksum Failed'.format(to_print)) - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py index 1f4b8ef6..24af023e 100755 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ b/bin/ZMQ_Sub_Duplicate_Q.py @@ -1,45 +1,17 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, "duplicate") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Duplicate_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Duplicate_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = 'Queuing' + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/ZMQ_Sub_Indexer.py index 3fc3f56e..6fa4b233 100755 --- a/bin/ZMQ_Sub_Indexer.py +++ b/bin/ZMQ_Sub_Indexer.py @@ -9,38 +9,37 @@ The ZMQ_Sub_Indexer modules is fetching the list of files to be processed and index each file with a full-text indexer (Whoosh until now). """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from whoosh.index import create_in, exists_in, open_dir from whoosh.fields import Schema, TEXT, ID import os -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + # Subscriber + sub_config_section = 'PubSub_Global' + sub_name = 'indexer' - # Redis - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Indexer configuration - index dir and schema setup - indexpath = cfg.get("Indexer", "path") - indexertype = cfg.get("Indexer", "type") + indexpath = h.config.get("Indexer", "path") + indexertype = h.config.get("Indexer", "type") if indexertype == "whoosh": - schema = Schema(title=TEXT(stored=True), path=ID(stored=True, unique=True), content=TEXT) + schema = Schema(title=TEXT(stored=True), path=ID(stored=True, + unique=True), + content=TEXT) if not os.path.exists(indexpath): os.mkdir(indexpath) if not exists_in(indexpath): @@ -49,29 +48,16 @@ def main(): ix = open_dir(indexpath) # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # publisher.info("""ZMQ Indexer is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Indexer"): - r_serv1.srem("SHUTDOWN_FLAGS", "Indexer") - publisher.warning("Shutdown Flag Up: Terminating.") + if h.redis_queue_shutdown(): break publisher.debug("Script Indexer is idling 10s") time.sleep(1) @@ -88,9 +74,5 @@ def main(): indexwriter.commit() except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format(PST.p_source, PST.p_date, PST.p_name)) - pass - - -if __name__ == "__main__": - main() + publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format( + PST.p_source, PST.p_date, PST.p_name)) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py index 004c0d57..cbdb4a05 100755 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -12,49 +12,18 @@ handling the indexing process of the files seen. """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - publisher.info("""Suscribed to channel {0}""".format(channel)) - - # Until the service is requested to be shutdown, the service - # will get the data from the global ZMQ queue and buffer it in Redis. - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Indexer_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Indexer_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Mails.py b/bin/ZMQ_Sub_Mails.py index b406a152..ee7c7f92 100755 --- a/bin/ZMQ_Sub_Mails.py +++ b/bin/ZMQ_Sub_Mails.py @@ -2,53 +2,47 @@ # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception -from packages import Paste as P +from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'emails' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) + + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # FUNCTIONS # publisher.info("Suscribed to channel mails_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None + # Log as critical if there are more that that amout of valid emails + is_critical = 10 + email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" while True: @@ -57,36 +51,36 @@ def main(): channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - MX_values = lib_refine.checking_MX_record(r_serv2, PST.get_regex(email_regex)) + PST = Paste.Paste(filename) + MX_values = lib_refine.checking_MX_record( + r_serv2, PST.get_regex(email_regex)) if MX_values[0] >= 1: PST.__setattr__(channel, MX_values) - PST.save_attribute_redis(r_serv1, channel, (MX_values[0], list(MX_values[1]))) + PST.save_attribute_redis(r_serv1, channel, + (MX_values[0], + list(MX_values[1]))) pprint.pprint(MX_values) - to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.format(PST.p_source, PST.p_date, PST.p_name, MX_values[0]) - if MX_values[0] > 10: + to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.\ + format(PST.p_source, PST.p_date, PST.p_name, + MX_values[0]) + if MX_values[0] > is_critical: publisher.warning(to_print) else: publisher.info(to_print) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Mails is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: + # FIXME retry! print "dns.exception.Timeout" - pass - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py index d57e63c6..4fe630b5 100755 --- a/bin/ZMQ_Sub_Mails_Q.py +++ b/bin/ZMQ_Sub_Mails_Q.py @@ -1,44 +1,17 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("mails_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'mails_categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Onion.py b/bin/ZMQ_Sub_Onion.py index 57d8e17f..c8ad9149 100755 --- a/bin/ZMQ_Sub_Onion.py +++ b/bin/ZMQ_Sub_Onion.py @@ -6,8 +6,8 @@ The ZMQ_Sub_Onion Module This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module. -It trying to extract url from paste and returning only ones which are tor related -(.onion) +It trying to extract url from paste and returning only ones which are tor +related (.onion) ..seealso:: Paste method (get_regex) @@ -22,45 +22,37 @@ Requirements """ import redis -import ConfigParser import pprint import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - Sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'tor' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("Script subscribed to channel onion_categ") # Getting the first message from redis. - message = Sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None # Thanks to Faup project for this regex @@ -78,7 +70,8 @@ def main(): for x in PST.get_regex(url_regex): # Extracting url with regex - credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + credential, subdomain, domain, host, tld, port, \ + resource_path, query_string, f1, f2, f3, f4 = x if f1 == "onion": domains_list.append(domain) @@ -88,25 +81,22 @@ def main(): PST.save_attribute_redis(r_serv1, channel, domains_list) pprint.pprint(domains_list) print PST.p_path - to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, + PST.p_name) if len(domains_list) > 0: - publisher.warning('{}Detected {} .onion(s)'.format(to_print, len(domains_list))) + publisher.warning('{}Detected {} .onion(s)'.format( + to_print, len(domains_list))) else: publisher.info('{}Onion related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = Sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py index c1f559da..6d239203 100755 --- a/bin/ZMQ_Sub_Onion_Q.py +++ b/bin/ZMQ_Sub_Onion_Q.py @@ -17,44 +17,18 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Categ" """ -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("onion_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'onion_categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Urls.py b/bin/ZMQ_Sub_Urls.py index 1ba8f892..963a75b3 100755 --- a/bin/ZMQ_Sub_Urls.py +++ b/bin/ZMQ_Sub_Urls.py @@ -1,13 +1,11 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher # Country and ASN lookup @@ -16,55 +14,43 @@ import socket import pycountry import ipaddress -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - # Subscriber + config_section = 'PubSub_Categ' + config_channel = 'channel_3' subscriber_name = "urls" - subscriber_config_section = "PubSub_Categ" + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Publisher - publisher_config_section = "PubSub_Url" - publisher_name = "adress" - pubchannel = cfg.get("PubSub_Url", "channel") + pub_config_section = "PubSub_Url" + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) + + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # Country to log as critical - cc_critical = cfg.get("PubSub_Url", "cc_critical") - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, "web_categ", subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + cc_critical = h.config.get("PubSub_Url", "cc_critical") # FUNCTIONS # publisher.info("Script URL subscribed to channel web_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" @@ -79,11 +65,12 @@ def main(): PST = Paste.Paste(filename) client = ip2asn() for x in PST.get_regex(url_regex): - scheme, credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + scheme, credential, subdomain, domain, host, tld, \ + port, resource_path, query_string, f1, f2, f3, \ + f4 = x domains_list.append(domain) - msg = pubchannel + " " + str(x) - pub.send_message(msg) - publisher.debug('{0} Published'.format(x)) + h.zmq_pub_send(str(x)) + publisher.debug('{} Published'.format(x)) if f1 == "onion": print domain @@ -107,35 +94,38 @@ def main(): # EU is not an official ISO 3166 code (but used by RIPE # IP allocation) if cc is not None and cc != "EU": - print hostl, asn, cc, pycountry.countries.get(alpha2=cc).name + print hostl, asn, cc, \ + pycountry.countries.get(alpha2=cc).name if cc == cc_critical: # FIXME: That's going to fail. - publisher.warning('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Detected " + str(A_values[0]) + " " + hostl + " " + cc)) + publisher.warning( + 'Url;{};{};{};Detected {} {} {}'.format( + PST.p_source, PST.p_date, PST.p_name, + A_values[0], hostl, cc)) else: print hostl, asn, cc - A_values = lib_refine.checking_A_record(r_serv2, domains_list) + A_values = lib_refine.checking_A_record(r_serv2, + domains_list) if A_values[0] >= 1: PST.__setattr__(channel, A_values) - PST.save_attribute_redis(r_serv1, channel, (A_values[0], list(A_values[1]))) + PST.save_attribute_redis(r_serv1, channel, + (A_values[0], + list(A_values[1]))) pprint.pprint(A_values) - publisher.info('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Checked " + str(A_values[0]) + " URL")) + publisher.info('Url;{};{};{};Checked {} URL'.format( + PST.p_source, PST.p_date, PST.p_name, A_values[0])) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: print "dns.exception.Timeout", A_values - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py index 4d4e2931..b083090e 100755 --- a/bin/ZMQ_Sub_Urls_Q.py +++ b/bin/ZMQ_Sub_Urls_Q.py @@ -1,44 +1,18 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "web_categ", "urls") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("web_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_3' + subscriber_name = 'web_categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/indexer_lookup.py b/bin/indexer_lookup.py index 73eed9f2..d89b0155 100644 --- a/bin/indexer_lookup.py +++ b/bin/indexer_lookup.py @@ -13,6 +13,7 @@ import ConfigParser import argparse import gzip +import os def readdoc(path=None): @@ -21,7 +22,7 @@ def readdoc(path=None): f = gzip.open(path, 'r') return f.read() -configfile = './packages/config.cfg' +configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') cfg = ConfigParser.ConfigParser() cfg.read(configfile) diff --git a/bin/packages/ZMQ_PubSub.py b/bin/packages/ZMQ_PubSub.py deleted file mode 100755 index b7c65231..00000000 --- a/bin/packages/ZMQ_PubSub.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/python2.7 -""" -The ``ZMQ PubSub`` Modules -========================== - -""" - -import zmq -import ConfigParser - - -class PubSub(object): - """ - The PubSub class is a ``Virtual Class`` which regroup the shared attribute - of a Publisher ZeroMQ and a Subcriber ZeroMQ - - :param file_conf: -- (str) The filepath of the configuration file used (.cfg) - :param log_channel: -- (str) The channel used as a log channel - :param ps_name: -- (str) The "ID" of the Publisher/Subcriber - - :return: PubSub Object - - ..note:: The ps_name was implemented to separate subscriber queues in redis - when they are listening on a same "stream" - ..seealso:: Method of the ZMQSub class - - ..todo:: Create Implementing a log channel as an attribute of this virtual class. - - """ - def __init__(self, file_conf, log_channel, ps_name): - self._ps_name = ps_name - self._config_parser = ConfigParser.ConfigParser() - self._config_file = file_conf # "./packages/config.cfg" - - self._config_parser.read(self._config_file) - - self._context_zmq = zmq.Context() - - # self._logging_publisher_channel = log_channel # "Default" - # publisher.channel(self._logging_publisher_channel) - - -class ZMQPub(PubSub): - """ - This class create a ZMQ Publisher which is able to send_message to a choosen socket. - - :param pub_config_section: -- (str) The name of the section in the config file to get the settings - - :return: ZMQPub Object - - :Example: - Extract of the config file: - [PubSub_Categ] - adress = tcp://127.0.0.1:5003 - - Creating the object and sending message: - MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg') - - msg = "categ1"+" "+"Im the data sent on the categ1 channel" - MyPublisher.send_message(msg) - - ..note:: The ps_name attribute for a publisher is "optionnal" but still required to be - instantiated correctly. - - """ - def __init__(self, file_conf, pub_config_section, ps_name): - super(ZMQPub, self).__init__(file_conf, "Default", ps_name) - - self._pub_config_section = pub_config_section - self._pubsocket = self._context_zmq.socket(zmq.PUB) - self._pub_adress = self._config_parser.get(self._pub_config_section, "adress") - - self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress")) - - def send_message(self, message): - """Send a message throught the publisher socket""" - self._pubsocket.send(message) - - -class ZMQSub(PubSub): - """ - This class create a ZMQ Subcriber which is able to receive message directly or - throught redis as a buffer. - - The redis buffer is usefull when the subcriber do a time consuming job which is - desynchronising it from the stream of data received. - The redis buffer ensures that no data will be loss waiting to be processed. - - :param sub_config_section: -- (str) The name of the section in the config file to get the settings - :param channel: -- (str) The listening channel of the Subcriber. - - :return: ZMQSub Object - - :Example: - Extract of the config file: - [PubSub_Global] - adress = tcp://127.0.0.1:5000 - channel = filelist - - Creating the object and receiving data + pushing to redis (redis buffering): - - r_serv = redis.StrictRedis( - host = 127.0.0.1, - port = 6387, - db = 0) - - channel = cfg.get("PubSub_Global", "channel") - MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate") - - while True: - MySubscriber.get_and_lpush(r_serv) - - - Inside another script use this line to retrive the data from redis. - ... - while True: - MySubscriber.get_msg_from_queue(r_serv) - ... - - ..note:: If you don't want any redis buffering simply use the "get_message" method - - """ - def __init__(self, file_conf, sub_config_section, channel, ps_name): - super(ZMQSub, self).__init__(file_conf, "Default", ps_name) - - self._sub_config_section = sub_config_section - self._subsocket = self._context_zmq.socket(zmq.SUB) - self._sub_adress = self._config_parser.get(self._sub_config_section, "adress") - - self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress")) - - self._channel = channel - self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel) - - def get_message(self): - """ - Get the first sent message from a Publisher. - :return: (str) Message from Publisher - - """ - return self._subsocket.recv() - - def get_and_lpush(self, r_serv): - """ - Get the first sent message from a Publisher and storing it in redis - - ..note:: This function also create a set named "queue" for monitoring needs - - """ - r_serv.sadd("queues", self._channel+self._ps_name) - r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv()) - - def get_msg_from_queue(self, r_serv): - """ - Get the first sent message from a Redis List - - :return: (str) Message from Publisher - - """ - return r_serv.rpop(self._channel+self._ps_name) diff --git a/bin/packages/config.cfg b/bin/packages/config.cfg deleted file mode 100644 index fd43b98b..00000000 --- a/bin/packages/config.cfg +++ /dev/null @@ -1,61 +0,0 @@ -[Directories] -bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ - -##### Redis ##### -[Redis_Cache] -host = localhost -port = 6379 -db = 0 - -[Redis_Log] -host = localhost -port = 6380 -db = 0 - -[Redis_Queues] -host = localhost -port = 6381 -db = 0 - -[Redis_Data_Merging] -host = localhost -port = 6379 -db = 1 - -##### LevelDB ##### -[Redis_Level_DB] -host = localhost -port = 2013 -db = 0 - -[Redis_Level_DB_Hashs] -host = localhost -port = 2013 -db = 1 - -# PUB / SUB : ZMQ -[Feed] -adress = tcp://crf.circl.lu:5556 -topicfilter = 102 - -[PubSub_Global] -adress = tcp://127.0.0.1:5000 -channel = filelist - -[PubSub_Longlines] -adress = tcp://127.0.0.1:5001 -channel_0 = Longlines -channel_1 = Shortlines - -[PubSub_Words] -adress = tcp://127.0.0.1:5002 -channel_0 = words - -[PubSub_Categ] -adress = tcp://127.0.0.1:5003 -#Channels are dynamic (1 channel per categ) - -[PubSub_Url] -adress = tcp://127.0.0.1:5004 -channel = urls diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 76a3ee20..daeabd83 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -1,6 +1,6 @@ [Directories] bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ +pastes = PASTES wordtrending_csv = /home/user/AIL/var/www/static/csv/wordstrendingdata wordsfile = /home/user/AIL/files/wordfile diff --git a/bin/packages/lib_words.py b/bin/packages/lib_words.py index 0acea7c8..9446a8ec 100644 --- a/bin/packages/lib_words.py +++ b/bin/packages/lib_words.py @@ -45,7 +45,7 @@ def create_dirfile(r_serv, directory, overwrite): r_serv.delete("filelist") for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was overwritten") @@ -53,13 +53,13 @@ def create_dirfile(r_serv, directory, overwrite): if r_serv.llen("filelist") == 0: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("New list created") else: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was updated with new elements")