diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..8fac5792 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Temp files +*.swp +*.pyc + +# Install Dirs +AILENV +redis-leveldb +redis +Blooms +LEVEL_DB_DATA +PASTES +bin/indexdir/ +logs/ + +# Webstuff +var/www/static/ +!var/www/static/css/dygraph_gallery.css +!var/www/static/js/indexjavascript.js + +# Local config +bin/packages/config.cfg diff --git a/bin/Dir.py b/bin/Dir.py index 37354f6e..6156c579 100755 --- a/bin/Dir.py +++ b/bin/Dir.py @@ -38,6 +38,7 @@ def main(): port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + publisher.port = 6380 publisher.channel = "Script" create_dirfile(r_serv, args.directory, args.ow) diff --git a/bin/Helper.py b/bin/Helper.py new file mode 100755 index 00000000..92ce56c1 --- /dev/null +++ b/bin/Helper.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* +""" +Queue helper module +============================ + +This module subscribe to a Publisher stream and put the received messages +into a Redis-list waiting to be popped later by others scripts. + +..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put +the same Subscriber name in both of them. + +""" +import redis +import ConfigParser +import os +import zmq + + +class Redis_Queues(object): + + def __init__(self, conf_section, conf_channel, subscriber_name): + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + print configfile + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + self.config = ConfigParser.ConfigParser() + self.config.read(configfile) + self.subscriber_name = subscriber_name + + self.sub_channel = self.config.get(conf_section, conf_channel) + self.redis_channel = self.sub_channel + self.subscriber_name + + # Redis Queue + config_section = "Redis_Queues" + self.r_queues = redis.StrictRedis( + host=self.config.get(config_section, "host"), + port=self.config.getint(config_section, "port"), + db=self.config.getint(config_section, "db")) + + def zmq_sub(self, conf_section): + sub_address = self.config.get(conf_section, 'adress') + context = zmq.Context() + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(sub_address) + self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) + + def zmq_pub(self, config_section, config_channel): + context = zmq.Context() + self.pub_socket = context.socket(zmq.PUB) + self.pub_socket.bind(self.config.get(config_section, 'adress')) + if config_channel is not None: + self.pub_channel = self.config.get(config_section, config_channel) + else: + # The publishing channel is defined dynamically + self.pub_channel = None + + def zmq_pub_send(self, msg): + if self.pub_channel is None: + raise Exception('A channel is reqired to send a message.') + self.pub_socket.send('{} {}'.format(self.pub_channel, msg)) + + def redis_rpop(self): + return self.r_queues.rpop(self.redis_channel) + + def redis_queue_shutdown(self, is_queue=False): + if is_queue: + flag = self.subscriber_name + '_Q' + else: + flag = self.subscriber_name + # srem returns False if the element does not exists + return self.r_queues.srem('SHUTDOWN_FLAGS', flag) + + def redis_queue_subscribe(self, publisher): + publisher.info("Suscribed to channel {}".format(self.sub_channel)) + while True: + msg = self.sub_socket.recv() + p = self.r_queues.pipeline() + p.sadd("queues", self.redis_channel) + p.lpush(self.redis_channel, msg) + p.execute() + if self.redis_queue_shutdown(True): + print "Shutdown Flag Up: Terminating" + publisher.warning("Shutdown Flag Up: Terminating.") + break diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 0d71248f..d7424603 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -9,10 +9,13 @@ WHITE="\\033[0;02m" YELLOW="\\033[1;33m" CYAN="\\033[1;36m" -#Modify these PATH -export PATH=$(pwd):$PATH -export PATH=/opt/redis-2.8.12/src/:$PATH -export PATH=/opt/redis-leveldb/:$PATH +[ -z "$AIL_HOME" ] && echo "Needs the env var AIL_HOME. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_REDIS" ] && echo "Needs the env var AIL_REDIS. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_LEVELDB" ] && echo "Needs the env var AIL_LEVELDB. Run the script from the virtual environment." && exit 1; + +export PATH=$AIL_HOME:$PATH +export PATH=$AIL_REDIS:$PATH +export PATH=$AIL_LEVELDB:$PATH function helptext { echo -e $YELLOW" @@ -45,7 +48,7 @@ function helptext { } function launching_redis { - conf_dir='/home/user/AIL-framework/configs/' + conf_dir="${AIL_HOME}/configs/" screen -dmS "Redis" sleep 0.1 @@ -60,7 +63,7 @@ function launching_redis { function launching_lvldb { #Want to launch more level_db? lvdbhost='127.0.0.1' - lvdbdir='/home/user/AIL-framework/LEVEL_DB_DATA/' + lvdbdir="${AIL_HOME}/LEVEL_DB_DATA/" db1_y='2013' db2_y='2014' nb_db=13 diff --git a/bin/Queues_Monitoring.py b/bin/Queues_Monitoring.py index 2439e838..955f3ed5 100755 --- a/bin/Queues_Monitoring.py +++ b/bin/Queues_Monitoring.py @@ -33,6 +33,7 @@ def main(): db=cfg.getint("Redis_Queues", "db")) # LOGGING # + publisher.port = 6380 publisher.channel = "Queuing" # ZMQ # diff --git a/bin/Repartition_graph.py b/bin/Repartition_graph.py index 89a661c7..f1955a0e 100755 --- a/bin/Repartition_graph.py +++ b/bin/Repartition_graph.py @@ -34,6 +34,7 @@ def main(): db=cfg.getint("Redis_Level_DB_Hashs", "db")) # LOGGING # + publisher.port = 6380 publisher.channel = "Graph" # FUNCTIONS # diff --git a/bin/Shutdown.py b/bin/Shutdown.py index e2474c32..8467dafb 100755 --- a/bin/Shutdown.py +++ b/bin/Shutdown.py @@ -22,8 +22,9 @@ Requirements """ import redis import ConfigParser +import os -configfile = './packages/config.cfg' +configfile = os.path.join(os.environ['AIL_BIN'], './packages/config.cfg') def main(): @@ -38,6 +39,7 @@ def main(): port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + # FIXME: automatic based on the queue name. # ### SCRIPTS #### r_serv.sadd("SHUTDOWN_FLAGS", "Feed") r_serv.sadd("SHUTDOWN_FLAGS", "Categ") diff --git a/bin/ZMQ_Feed.py b/bin/ZMQ_Feed.py index be1e3eff..aac05045 100755 --- a/bin/ZMQ_Feed.py +++ b/bin/ZMQ_Feed.py @@ -20,50 +20,35 @@ Requirements *Need the ZMQ_Feed_Q Module running to be able to work properly. """ -import redis -import ConfigParser import base64 import os import time from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.port = 6380 + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' - # REDIS - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - - # Subscriber - subscriber_name = "feed" - subscriber_config_section = "Feed" # Publisher - publisher_name = "pubfed" - publisher_config_section = "PubSub_Global" - - Sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - PubGlob = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + pub_config_section = "PubSub_Global" + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) # LOGGING # - publisher.channel = "Script" publisher.info("Feed Script started to receive & publish.") while True: - message = Sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() # Recovering the streamed message informations. if message is not None: if len(message.split()) == 3: @@ -75,8 +60,7 @@ def main(): publisher.debug("Empty Paste: {0} not processed".format(paste)) continue else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -84,24 +68,13 @@ def main(): time.sleep(10) continue # Creating the full filepath - filename = cfg.get("Directories", "pastes") + paste + filename = os.path.join(os.environ['AIL_HOME'], + h.config.get("Directories", "pastes"), paste) + dirname = os.path.dirname(filename) + if not os.path.exists(dirname): + os.makedirs(dirname) - if not os.path.exists(filename.rsplit("/", 1)[0]): - os.makedirs(filename.rsplit("/", 1)[0]) - else: - # Path already existing - pass + with open(filename, 'wb') as f: + f.write(base64.standard_b64decode(gzip64encoded)) - decoded_gzip = base64.standard_b64decode(gzip64encoded) - # paste, zlib.decompress(decoded_gzip, zlib.MAX_WBITS|16) - - with open(filename, 'wb') as F: - F.write(decoded_gzip) - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - PubGlob.send_message(msg) - publisher.debug("{0} Published".format(msg)) - - -if __name__ == "__main__": - main() + h.zmq_pub_send(filename) diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py index ab9ed09a..9767c059 100755 --- a/bin/ZMQ_Feed_Q.py +++ b/bin/ZMQ_Feed_Q.py @@ -20,45 +20,19 @@ Requirements "channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - sub = ZMQ_PubSub.ZMQSub(configfile, "Feed", channel, "feed") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.channel = "Queuing" + + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Categ.py b/bin/ZMQ_PubSub_Categ.py index 71477b9a..71c4c164 100755 --- a/bin/ZMQ_PubSub_Categ.py +++ b/bin/ZMQ_PubSub_Categ.py @@ -36,111 +36,77 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis +import os import argparse -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher from packages import Paste -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.port = 6380 + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'categ' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Categ' + h.zmq_pub(pub_config_section, None) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='This script is a part of the Analysis Information \ + Leak framework.') parser.add_argument( - '-l', type=str, default="../files/list_categ_files", - help='Path to the list_categ_files (../files/list_categ_files)', + '-d', type=str, default="../files/", + help='Path to the directory containing the category files.', action='store') args = parser.parse_args() - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - publisher_name = "pubcateg" - publisher_config_section = "PubSub_Categ" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, - subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, - publisher_name) - # FUNCTIONS # - publisher.info("Script Categ subscribed to channel {0}".format( - cfg.get("PubSub_Words", "channel_0"))) + publisher.info( + "Script Categ subscribed to channel {}".format(h.sub_channel)) - with open(args.l, 'rb') as L: - tmp_dict = {} + tmp_dict = {} + for filename in os.listdir(args.d): + bname = os.path.basename(filename) + tmp_dict[bname] = [] + with open(os.path.join(args.d, filename), 'r') as f: + for l in f: + tmp_dict[bname].append(l.strip()) - for num, fname in enumerate(L): - # keywords temp list - tmp_list = [] - - with open(fname[:-1], 'rb') as LS: - - for num, kword in enumerate(LS): - tmp_list.append(kword[:-1]) - - tmp_dict[fname.split('/')[-1][:-1]] = tmp_list - - message = sub.get_msg_from_queue(r_serv) prec_filename = None while True: + message = h.redis_rpop() if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) + prec_filename = filename - prec_filename = filename + for categ, words_list in tmp_dict.items(): - for categ, list in tmp_dict.items(): - - if word.lower() in list: - channel = categ - msg = channel+" "+PST.p_path+" "+word+" "+score - pub.send_message(msg) - # dico_categ.add(categ) + if word.lower() in words_list: + h.pub_channel = categ + h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score)) publisher.info( 'Categ;{};{};{};Detected {} "{}"'.format( PST.p_source, PST.p_date, PST.p_name, score, word)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Categ is Idling 10s") time.sleep(10) - - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py index 45e0b563..08a93e2f 100755 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ b/bin/ZMQ_PubSub_Categ_Q.py @@ -17,47 +17,20 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = 'Queuing' + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'categ' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Lines.py b/bin/ZMQ_PubSub_Lines.py index d13103c6..abef2e95 100755 --- a/bin/ZMQ_PubSub_Lines.py +++ b/bin/ZMQ_PubSub_Lines.py @@ -5,10 +5,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q +Module. -It perform a sorting on the line's length and publish/forward them to differents -channels: +It perform a sorting on the line's length and publish/forward them to +differents channels: *Channel 1 if max length(line) < max *Channel 2 if max length(line) > max @@ -26,81 +27,57 @@ Requirements *Need the ZMQ_PubSub_Line_Q Module running to be able to work properly. """ -import redis import argparse -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.port = 6380 + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'line' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Longlines' + h.zmq_pub(pub_config_section, None) + + # Subscriber + h.zmq_sub(config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='''This script is a part of the Analysis Information \ + Leak framework.''') - parser.add_argument('-max', type=int, default=500, - help='The limit between "short lines" and "long lines" (500)', - action='store') + parser.add_argument( + '-max', type=int, default=500, + help='The limit between "short lines" and "long lines"', + action='store') args = parser.parse_args() - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - subscriber_config_section = "PubSub_Global" - - # Publisher - publisher_config_section = "PubSub_Longlines" - publisher_name = "publine" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) - - channel_0 = cfg.get("PubSub_Longlines", "channel_0") - channel_1 = cfg.get("PubSub_Longlines", "channel_1") + channel_0 = h.config.get("PubSub_Longlines", "channel_0") + channel_1 = h.config.get("PubSub_Longlines", "channel_1") # FUNCTIONS # - tmp_string = "Lines script Subscribed to channel {} and Start to publish on channel {}, {}" - publisher.info(tmp_string.format( - cfg.get("PubSub_Global", "channel"), - cfg.get("PubSub_Longlines", "channel_0"), - cfg.get("PubSub_Longlines", "channel_1"))) + tmp_string = "Lines script Subscribed to channel {} and Start to publish \ + on channel {}, {}" + publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1)) while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Lines"): - r_serv1.srem("SHUTDOWN_FLAGS", "Lines") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -108,21 +85,17 @@ def main(): time.sleep(10) continue + # FIXME do it in the paste class lines_infos = PST.get_lines_info() + PST.save_attribute_redis("p_nb_lines", lines_infos[0]) + PST.save_attribute_redis("p_max_length_line", lines_infos[1]) - PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0]) - PST.save_attribute_redis(r_serv, "p_max_length_line", lines_infos[1]) - - r_serv.sadd("Pastes_Objects", PST.p_path) + # FIXME Not used. + PST.store.sadd("Pastes_Objects", PST.p_path) if lines_infos[1] >= args.max: - msg = channel_0+" "+PST.p_path + h.pub_channel = channel_0 else: - msg = channel_1+" "+PST.p_path - - pub.send_message(msg) + h.pub_channel = channel_1 + h.zmq_pub_send(PST.p_path) except IOError: print "CRC Checksum Error on : ", PST.p_path - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py index 61f29443..3e366d82 100755 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ b/bin/ZMQ_PubSub_Lines_Q.py @@ -18,47 +18,18 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Lines_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Lines_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = "PubSub_Global" + config_channel = 'channel' + subscriber_name = 'line' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Tokenize.py b/bin/ZMQ_PubSub_Tokenize.py index 701968d0..7e1f45e1 100755 --- a/bin/ZMQ_PubSub_Tokenize.py +++ b/bin/ZMQ_PubSub_Tokenize.py @@ -4,9 +4,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q +Module. -It tokenize the content of the paste and publish the result in the following format: +It tokenize the content of the paste and publish the result in the following +format: channel_name+' '+/path/of/the/paste.gz+' '+tokenized_word+' '+scoring ..seealso:: Paste method (_get_top_words) @@ -21,72 +23,44 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Publisher - publisher_config_section = "PubSub_Words" - publisher_name = "pubtokenize" + pub_config_section = 'PubSub_Words' + pub_config_channel = 'channel_0' + h.zmq_pub(pub_config_section, pub_config_channel) - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) - - channel_0 = cfg.get("PubSub_Words", "channel_0") - - # FUNCTIONS # - publisher.info("Tokeniser subscribed to channel {0}".format(cfg.get("PubSub_Longlines", "channel_1"))) + # LOGGING # + publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel)) while True: - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() print message if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + paste = Paste.Paste(message.split(" ", -1)[-1]) + for word, score in paste._get_top_words().items(): + if len(word) >= 4: + h.zmq_pub_send('{} {} {}'.format(paste.p_path, word, + score)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Tokeniser is idling 10s") time.sleep(10) print "sleepin" - continue - - for word, score in PST._get_top_words().items(): - if len(word) >= 4: - msg = channel_0+' '+PST.p_path+' '+str(word)+' '+str(score) - pub.send_message(msg) - print msg - else: - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py index c5a5791f..459ef88e 100755 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ b/bin/ZMQ_PubSub_Tokenize_Q.py @@ -17,48 +17,20 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Line" channel 1 """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Pub_Global.py b/bin/ZMQ_Pub_Global.py index d47ee730..2e687432 100755 --- a/bin/ZMQ_Pub_Global.py +++ b/bin/ZMQ_Pub_Global.py @@ -21,49 +21,34 @@ Requirements *Need running Redis instances. (Redis) """ -import redis -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read('./packages/config.cfg') - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Global" - # ZMQ # - pub_glob = ZMQ_PubSub.ZMQPub(configfile, "PubSub_Global", "global") + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'global' - # FONCTIONS # + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Global' + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) + + # LOGGING # publisher.info("Starting to publish.") while True: - filename = r_serv.lpop("filelist") + filename = h.redis_rpop() if filename is not None: - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - pub_glob.send_message(msg) - publisher.debug("{0} Published".format(msg)) + h.zmq_pub_send(filename) else: time.sleep(10) publisher.debug("Nothing to publish") - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Attributes.py b/bin/ZMQ_Sub_Attributes.py index e8a59071..912f40e6 100755 --- a/bin/ZMQ_Sub_Attributes.py +++ b/bin/ZMQ_Sub_Attributes.py @@ -5,10 +5,10 @@ The ZMQ_Sub_Attribute Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module -It perform a sorting on the line's length and publish/forward them to differents -channels: +It perform a sorting on the line's length and publish/forward them to +differents channels: *Channel 1 if max length(line) < max *Channel 2 if max length(line) > max @@ -26,57 +26,36 @@ Requirements *Need the ZMQ_PubSub_Line_Q Module running to be able to work properly. """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - subscriber_config_section = "PubSub_Global" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # FUNCTIONS # publisher.info("""ZMQ Attribute is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Attributes"): - r_serv1.srem("SHUTDOWN_FLAGS", "Attributes") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -84,20 +63,14 @@ def main(): time.sleep(10) continue - encoding = PST._get_p_encoding() - language = PST._get_p_language() - - PST.save_attribute_redis(r_serv, "p_encoding", encoding) - PST.save_attribute_redis(r_serv, "p_language", language) - - r_serv.sadd("Pastes_Objects", PST.p_path) - - PST.save_all_attributes_redis(r_serv) + # FIXME do it directly in the class + PST.save_attribute_redis("p_encoding", PST._get_p_encoding()) + PST.save_attribute_redis("p_language", PST._get_p_language()) + # FIXME why not all saving everything there. + PST.save_all_attributes_redis() + # FIXME Not used. + PST.store.sadd("Pastes_Objects", PST.p_path) except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed")) - pass - - -if __name__ == "__main__": - main() + publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format( + PST.p_source, PST.p_date, PST.p_name)) diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py index 4396a6bc..5d1a349f 100755 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ b/bin/ZMQ_Sub_Attributes_Q.py @@ -18,47 +18,19 @@ Requirements """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Attributes_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Attributes_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_CreditCards.py b/bin/ZMQ_Sub_CreditCards.py index 54021144..22a8c5c5 100755 --- a/bin/ZMQ_Sub_CreditCards.py +++ b/bin/ZMQ_Sub_CreditCards.py @@ -1,53 +1,38 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser import pprint import time from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher +import Helper -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'cards' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # FUNCTIONS # publisher.info("Creditcard script subscribed to channel creditcard_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?" # mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}" # visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}" - # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|[3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" + # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]| + # [3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" # jcb_regex = "35(?:2[89]|[3-8]\d)([\ \-]?)\d{4}\1\d{4}\1\d{4}" # amex_regex = "3[47]\d\d([\ \-]?)\d{6}\1\d{5}" # chinaUP_regex = "62[0-5]\d{13,16}" @@ -66,28 +51,25 @@ def main(): creditcard_set.add(x) PST.__setattr__(channel, creditcard_set) - PST.save_attribute_redis(r_serv1, channel, creditcard_set) + PST.save_attribute_redis(channel, creditcard_set) pprint.pprint(creditcard_set) - to_print = 'CreditCard;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'CreditCard;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if (len(creditcard_set) > 0): - publisher.critical('{}Checked {} valid number(s)'.format(to_print, len(creditcard_set))) + publisher.critical('{}Checked {} valid number(s)'.format( + to_print, len(creditcard_set))) else: publisher.info('{}CreditCard related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script creditcard is idling 1m") time.sleep(60) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py index 7ef4a9b9..81a79c7c 100755 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ b/bin/ZMQ_Sub_CreditCards_Q.py @@ -1,44 +1,19 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("creditcard_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'cards' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Curve.py b/bin/ZMQ_Sub_Curve.py index 7a065afb..c62ae32c 100755 --- a/bin/ZMQ_Sub_Curve.py +++ b/bin/ZMQ_Sub_Curve.py @@ -6,7 +6,8 @@ The ZMQ_Sub_Curve Module This module is consuming the Redis-list created by the ZMQ_Sub_Curve_Q Module. -This modules update a .csv file used to draw curves representing selected words and their occurency per day. +This modules update a .csv file used to draw curves representing selected +words and their occurency per day. ..note:: The channel will have the name of the file created. @@ -22,72 +23,64 @@ Requirements """ import redis -import ConfigParser import time -from packages import Paste as P -from packages import ZMQ_PubSub +from packages import Paste from pubsublogger import publisher from packages import lib_words +import os -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=cfg.get("Redis_Level_DB", "port"), - db=0) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") + config_section = 'PubSub_Words' + config_channel = 'channel_0' subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Level_DB", "host"), + port=h.config.get("Redis_Level_DB", "port"), + db=h.config.get("Redis_Level_DB", "db")) # FUNCTIONS # - publisher.info("Script Curve subscribed to channel {0}".format(cfg.get("PubSub_Words", "channel_0"))) + publisher.info("Script Curve subscribed to {}".format(h.sub_channel)) # FILE CURVE SECTION # - csv_path = cfg.get("Directories", "wordtrending_csv") - wordfile_path = cfg.get("Directories", "wordsfile") + csv_path = os.path.join(os.environ['AIL_HOME'], + h.config.get("Directories", "wordtrending_csv")) + wordfile_path = os.path.join(os.environ['AIL_HOME'], + h.config.get("Directories", "wordsfile")) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None while True: if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - lib_words.create_curve_with_word_file(r_serv1, csv_path, wordfile_path, int(PST.p_date.year), int(PST.p_date.month)) + PST = Paste.Paste(filename) + lib_words.create_curve_with_word_file( + r_serv1, csv_path, wordfile_path, int(PST.p_date.year), + int(PST.p_date.month)) prec_filename = filename prev_score = r_serv1.hget(word.lower(), PST.p_date) print prev_score if prev_score is not None: - r_serv1.hset(word.lower(), PST.p_date, int(prev_score) + int(score)) + r_serv1.hset(word.lower(), PST.p_date, + int(prev_score) + int(score)) else: r_serv1.hset(word.lower(), PST.p_date, score) - # r_serv.expire(word,86400) #1day else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -95,8 +88,4 @@ def main(): print "sleepin" time.sleep(1) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py index ba6aa67c..b7264639 100755 --- a/bin/ZMQ_Sub_Curve_Q.py +++ b/bin/ZMQ_Sub_Curve_Q.py @@ -17,47 +17,20 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Tokenize" """ -import redis -import ConfigParser + from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'curve' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Duplicate.py b/bin/ZMQ_Sub_Duplicate.py index 5bcaf4c7..eca87934 100755 --- a/bin/ZMQ_Sub_Duplicate.py +++ b/bin/ZMQ_Sub_Duplicate.py @@ -13,61 +13,46 @@ Requirements: """ import redis -import ConfigParser import os import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from pybloomfilter import BloomFilter -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.port = 6380 + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - # REDIS # - # DB QUEUE ( MEMORY ) - r_Q_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv_merge = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) + # Subscriber + h.zmq_sub(config_section) # REDIS # # DB OBJECT & HASHS ( DISK ) + # FIXME increase flexibility dico_redis = {} for year in xrange(2013, 2015): for month in xrange(0, 16): dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=year, + host=h.config.get("Redis_Level_DB", "host"), port=year, db=month) - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "duplicate" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Script duplicate subscribed to channel {0}""".format(cfg.get("PubSub_Global", "channel"))) + publisher.info("""Script duplicate subscribed to channel {0}""".format( + h.config.get("PubSub_Global", "channel"))) set_limit = 100 + bloompath = os.path.join(os.environ['AIL_HOME'], + h.config.get("Directories", "bloomfilters")) + bloop_path_set = set() while True: try: super_dico = {} @@ -77,15 +62,14 @@ def main(): x = time.time() - message = sub.get_msg_from_queue(r_Q_serv) + message = h.redis_rpop() if message is not None: path = message.split(" ", -1)[-1] PST = Paste.Paste(path) else: publisher.debug("Script Attribute is idling 10s") time.sleep(10) - if r_Q_serv.sismember("SHUTDOWN_FLAGS", "Duplicate"): - r_Q_serv.srem("SHUTDOWN_FLAGS", "Duplicate") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -97,19 +81,14 @@ def main(): r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month] # Creating the bloom filter name: bloomyyyymm - bloomname = 'bloom' + PST.p_date.year + PST.p_date.month - - bloompath = cfg.get("Directories", "bloomfilters") - - filebloompath = bloompath + bloomname - - # datetime.date(int(PST.p_date.year),int(PST.p_date.month),int(PST.p_date.day)).timetuple().tm_yday % 7 + filebloompath = os.path.join(bloompath, 'bloom' + PST.p_date.year + + PST.p_date.month) if os.path.exists(filebloompath): bloom = BloomFilter.open(filebloompath) else: bloom = BloomFilter(100000000, 0.01, filebloompath) - r_Q_serv.sadd("bloomlist", filebloompath) + bloop_path_set.add(filebloompath) # UNIQUE INDEX HASHS TABLE r_serv0 = dico_redis["201300"] @@ -121,45 +100,43 @@ def main(): # For each bloom filter opened_bloom = [] - for bloo in r_Q_serv.smembers("bloomlist"): + for bloo in bloop_path_set: # Opening blooms opened_bloom.append(BloomFilter.open(bloo)) # For each hash of the paste - for hash in PST._get_hash_lines(min=5, start=1, jump=0): + for line_hash in PST._get_hash_lines(min=5, start=1, jump=0): nb_hash_current += 1 # Adding the hash in Redis & limiting the set - if r_serv1.scard(hash) <= set_limit: - r_serv1.sadd(hash, index) - r_serv1.sadd("HASHS", hash) + if r_serv1.scard(line_hash) <= set_limit: + r_serv1.sadd(line_hash, index) + r_serv1.sadd("HASHS", line_hash) # Adding the hash in the bloom of the month - bloom.add(hash) + bloom.add(line_hash) # Go throught the Database of the bloom filter (of the month) for bloo in opened_bloom: - if hash in bloo: + if line_hash in bloo: db = bloo.name[-6:] - # Go throught the Database of the bloom filter (of the month) + # Go throught the Database of the bloom filter (month) r_serv_bloom = dico_redis[db] # set of index paste: set([1,2,4,65]) - hash_current = r_serv_bloom.smembers(hash) + hash_current = r_serv_bloom.smembers(line_hash) # removing itself from the list hash_current = hash_current - set([index]) - # if the hash is present at least in 1 files (already processed) + # if the hash is present at least in 1 files + # (already processed) if len(hash_current) != 0: - hash_dico[hash] = hash_current + hash_dico[line_hash] = hash_current # if there is data in this dictionnary if len(hash_dico) != 0: super_dico[index] = hash_dico - else: - # The hash is not in this bloom - pass - ########################################################################################### + ########################################################################### # if there is data in this dictionnary if len(super_dico) != 0: @@ -171,7 +148,8 @@ def main(): for p_fname in pset: occur_dico.setdefault(p_fname, 0) - # Count how much hash is similar per file occuring in the dictionnary + # Count how much hash is similar per file occuring + # in the dictionnary if occur_dico[p_fname] >= 0: occur_dico[p_fname] = occur_dico[p_fname] + 1 @@ -181,10 +159,11 @@ def main(): dupl.append((paste, percentage)) # Creating the object attribute and save it. - to_print = 'Duplicate;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Duplicate;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if dupl != []: PST.__setattr__("p_duplicate", dupl) - PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl) + PST.save_attribute_redis("p_duplicate", dupl) publisher.info('{}Detected {}'.format(to_print, len(dupl))) y = time.time() @@ -193,7 +172,3 @@ def main(): except IOError: print "CRC Checksum Failed on :", PST.p_path publisher.error('{}CRC Checksum Failed'.format(to_print)) - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py index 1f4b8ef6..9fc455ba 100755 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ b/bin/ZMQ_Sub_Duplicate_Q.py @@ -1,45 +1,18 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, "duplicate") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format(channel)) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Duplicate_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Duplicate_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = 'Queuing' + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/ZMQ_Sub_Indexer.py index 3fc3f56e..0eb6dcce 100755 --- a/bin/ZMQ_Sub_Indexer.py +++ b/bin/ZMQ_Sub_Indexer.py @@ -9,38 +9,38 @@ The ZMQ_Sub_Indexer modules is fetching the list of files to be processed and index each file with a full-text indexer (Whoosh until now). """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from whoosh.index import create_in, exists_in, open_dir from whoosh.fields import Schema, TEXT, ID import os -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.port = 6380 + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + # Subscriber + sub_config_section = 'PubSub_Global' + sub_name = 'indexer' - # Redis - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Indexer configuration - index dir and schema setup - indexpath = cfg.get("Indexer", "path") - indexertype = cfg.get("Indexer", "type") + indexpath = h.config.get("Indexer", "path") + indexertype = h.config.get("Indexer", "type") if indexertype == "whoosh": - schema = Schema(title=TEXT(stored=True), path=ID(stored=True, unique=True), content=TEXT) + schema = Schema(title=TEXT(stored=True), path=ID(stored=True, + unique=True), + content=TEXT) if not os.path.exists(indexpath): os.mkdir(indexpath) if not exists_in(indexpath): @@ -49,29 +49,16 @@ def main(): ix = open_dir(indexpath) # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # publisher.info("""ZMQ Indexer is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Indexer"): - r_serv1.srem("SHUTDOWN_FLAGS", "Indexer") - publisher.warning("Shutdown Flag Up: Terminating.") + if h.redis_queue_shutdown(): break publisher.debug("Script Indexer is idling 10s") time.sleep(1) @@ -88,9 +75,5 @@ def main(): indexwriter.commit() except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format(PST.p_source, PST.p_date, PST.p_name)) - pass - - -if __name__ == "__main__": - main() + publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format( + PST.p_source, PST.p_date, PST.p_name)) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py index 004c0d57..af7e076a 100755 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -12,49 +12,19 @@ handling the indexing process of the files seen. """ -import redis -import ConfigParser from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) - - publisher.info("""Suscribed to channel {0}""".format(channel)) - - # Until the service is requested to be shutdown, the service - # will get the data from the global ZMQ queue and buffer it in Redis. - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Indexer_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Indexer_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Mails.py b/bin/ZMQ_Sub_Mails.py index b406a152..e3c7e91a 100755 --- a/bin/ZMQ_Sub_Mails.py +++ b/bin/ZMQ_Sub_Mails.py @@ -2,53 +2,43 @@ # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception -from packages import Paste as P +from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'emails' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # FUNCTIONS # publisher.info("Suscribed to channel mails_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None + # Log as critical if there are more that that amout of valid emails + is_critical = 10 + email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" while True: @@ -57,36 +47,35 @@ def main(): channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - MX_values = lib_refine.checking_MX_record(r_serv2, PST.get_regex(email_regex)) + PST = Paste.Paste(filename) + MX_values = lib_refine.checking_MX_record( + r_serv2, PST.get_regex(email_regex)) if MX_values[0] >= 1: PST.__setattr__(channel, MX_values) - PST.save_attribute_redis(r_serv1, channel, (MX_values[0], list(MX_values[1]))) + PST.save_attribute_redis(channel, (MX_values[0], + list(MX_values[1]))) pprint.pprint(MX_values) - to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.format(PST.p_source, PST.p_date, PST.p_name, MX_values[0]) - if MX_values[0] > 10: + to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.\ + format(PST.p_source, PST.p_date, PST.p_name, + MX_values[0]) + if MX_values[0] > is_critical: publisher.warning(to_print) else: publisher.info(to_print) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Mails is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: + # FIXME retry! print "dns.exception.Timeout" - pass - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py index d57e63c6..7a6e66e9 100755 --- a/bin/ZMQ_Sub_Mails_Q.py +++ b/bin/ZMQ_Sub_Mails_Q.py @@ -1,44 +1,18 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("mails_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break +import Helper if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'emails' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Onion.py b/bin/ZMQ_Sub_Onion.py index 57d8e17f..2bf27c85 100755 --- a/bin/ZMQ_Sub_Onion.py +++ b/bin/ZMQ_Sub_Onion.py @@ -6,8 +6,8 @@ The ZMQ_Sub_Onion Module This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module. -It trying to extract url from paste and returning only ones which are tor related -(.onion) +It trying to extract url from paste and returning only ones which are tor +related (.onion) ..seealso:: Paste method (get_regex) @@ -21,46 +21,32 @@ Requirements *Need the ZMQ_Sub_Onion_Q Module running to be able to work properly. """ -import redis -import ConfigParser import pprint import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - Sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'tor' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # FUNCTIONS # publisher.info("Script subscribed to channel onion_categ") # Getting the first message from redis. - message = Sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None # Thanks to Faup project for this regex @@ -78,35 +64,33 @@ def main(): for x in PST.get_regex(url_regex): # Extracting url with regex - credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + credential, subdomain, domain, host, tld, port, \ + resource_path, query_string, f1, f2, f3, f4 = x if f1 == "onion": domains_list.append(domain) # Saving the list of extracted onion domains. PST.__setattr__(channel, domains_list) - PST.save_attribute_redis(r_serv1, channel, domains_list) + PST.save_attribute_redis(channel, domains_list) pprint.pprint(domains_list) print PST.p_path - to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, + PST.p_name) if len(domains_list) > 0: - publisher.warning('{}Detected {} .onion(s)'.format(to_print, len(domains_list))) + publisher.warning('{}Detected {} .onion(s)'.format( + to_print, len(domains_list))) else: publisher.info('{}Onion related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = Sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py index c1f559da..6cbc9938 100755 --- a/bin/ZMQ_Sub_Onion_Q.py +++ b/bin/ZMQ_Sub_Onion_Q.py @@ -17,44 +17,19 @@ Requirements *Should register to the Publisher "ZMQ_PubSub_Categ" """ -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("onion_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'tor' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Urls.py b/bin/ZMQ_Sub_Urls.py index 89b556a4..b34de8fb 100755 --- a/bin/ZMQ_Sub_Urls.py +++ b/bin/ZMQ_Sub_Urls.py @@ -1,13 +1,11 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher # Country and ASN lookup @@ -16,55 +14,39 @@ import socket import pycountry import ipaddress -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": + publisher.port = 6380 publisher.channel = "Script" - # ZMQ # - # Subscriber + config_section = 'PubSub_Categ' + config_channel = 'channel_3' subscriber_name = "urls" - subscriber_config_section = "PubSub_Categ" + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Publisher - publisher_config_section = "PubSub_Url" - publisher_name = "adress" - pubchannel = cfg.get("PubSub_Url", "channel") + pub_config_section = "PubSub_Url" + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # Country to log as critical - cc_critical = cfg.get("PubSub_Url", "cc_critical") - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, "web_categ", subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + cc_critical = h.config.get("PubSub_Url", "cc_critical") # FUNCTIONS # publisher.info("Script URL subscribed to channel web_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" @@ -79,11 +61,12 @@ def main(): PST = Paste.Paste(filename) client = ip2asn() for x in PST.get_regex(url_regex): - scheme, credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + scheme, credential, subdomain, domain, host, tld, \ + port, resource_path, query_string, f1, f2, f3, \ + f4 = x domains_list.append(domain) - msg = pubchannel + " " + str(x) - pub.send_message(msg) - publisher.debug('{0} Published'.format(x)) + h.zmq_pub_send(str(x)) + publisher.debug('{} Published'.format(x)) if f1 == "onion": print domain @@ -107,34 +90,36 @@ def main(): # EU is not an official ISO 3166 code (but used by RIPE # IP allocation) if cc is not None and cc != "EU": - print hostl, asn, cc, pycountry.countries.get(alpha2=cc).name + print hostl, asn, cc, \ + pycountry.countries.get(alpha2=cc).name if cc == cc_critical: - publisher.warning('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Detected " + hostl + " " + cc)) + publisher.warning( + 'Url;{};{};{};Detected {} {}'.format( + PST.p_source, PST.p_date, PST.p_name, + hostl, cc)) else: print hostl, asn, cc - A_values = lib_refine.checking_A_record(r_serv2, domains_list) + A_values = lib_refine.checking_A_record(r_serv2, + domains_list) if A_values[0] >= 1: PST.__setattr__(channel, A_values) - PST.save_attribute_redis(r_serv1, channel, (A_values[0], list(A_values[1]))) + PST.save_attribute_redis(channel, (A_values[0], + list(A_values[1]))) pprint.pprint(A_values) - publisher.info('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Checked " + str(A_values[0]) + " URL")) + publisher.info('Url;{};{};{};Checked {} URL'.format( + PST.p_source, PST.p_date, PST.p_name, A_values[0])) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: print "dns.exception.Timeout", A_values - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py index 4d4e2931..1e63ae8c 100755 --- a/bin/ZMQ_Sub_Urls_Q.py +++ b/bin/ZMQ_Sub_Urls_Q.py @@ -1,44 +1,19 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* -import redis -import ConfigParser -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Queuing" - - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "web_categ", "urls") - - # FUNCTIONS # - publisher.info("""Suscribed to channel {0}""".format("web_categ")) - - while True: - sub.get_and_lpush(r_serv) - - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls_Q"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls_Q") - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - if __name__ == "__main__": - main() + publisher.port = 6380 + publisher.channel = "Queuing" + + config_section = 'PubSub_Categ' + config_channel = 'channel_3' + subscriber_name = 'urls' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) + h.redis_queue_subscribe(publisher) diff --git a/bin/indexer_lookup.py b/bin/indexer_lookup.py index 73eed9f2..b8fc6e1a 100644 --- a/bin/indexer_lookup.py +++ b/bin/indexer_lookup.py @@ -13,6 +13,7 @@ import ConfigParser import argparse import gzip +import os def readdoc(path=None): @@ -21,12 +22,12 @@ def readdoc(path=None): f = gzip.open(path, 'r') return f.read() -configfile = './packages/config.cfg' +configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') cfg = ConfigParser.ConfigParser() cfg.read(configfile) # Indexer configuration - index dir and schema setup -indexpath = cfg.get("Indexer", "path") +indexpath = os.path.join(os.environ['AIL_HOME'], cfg.get("Indexer", "path")) indexertype = cfg.get("Indexer", "type") argParser = argparse.ArgumentParser(description='Fulltext search for AIL') diff --git a/bin/packages/Paste.py b/bin/packages/Paste.py index fddae0f9..de02b887 100755 --- a/bin/packages/Paste.py +++ b/bin/packages/Paste.py @@ -19,7 +19,6 @@ Conditions to fulfill to be able to use this class correctly: import os import magic import gzip -import pprint import redis import operator import string @@ -44,7 +43,8 @@ class Paste(object): This class representing a Paste as an object. When created, the object will have by default some "main attributes" such as the size or the date of the paste already calculated, whereas other - attributes are not set and need to be "asked to be calculated" by their methods. + attributes are not set and need to be "asked to be calculated" by their + methods. It was design like this because some attributes take time to be calculated such as the langage or the duplicate... @@ -56,16 +56,25 @@ class Paste(object): def __init__(self, p_path): - configfile = './packages/config.cfg' + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + cfg = ConfigParser.ConfigParser() cfg.read(configfile) self.cache = redis.StrictRedis( host=cfg.get("Redis_Queues", "host"), port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + self.store = redis.StrictRedis( + host=cfg.get("Redis_Data_Merging", "host"), + port=cfg.getint("Redis_Data_Merging", "port"), + db=cfg.getint("Redis_Data_Merging", "db")) self.p_path = p_path - self.p_name = self.p_path.split('/')[-1] + self.p_name = os.path.basename(self.p_path) self.p_size = round(os.path.getsize(self.p_path)/1024.0, 2) self.p_mime = magic.from_buffer(self.get_p_content(), mime=True) @@ -260,7 +269,7 @@ class Paste(object): else: return False, var - def save_all_attributes_redis(self, r_serv, key=None): + def save_all_attributes_redis(self, key=None): """ Saving all the attributes in a "Redis-like" Database (Redis, LevelDB) @@ -277,7 +286,7 @@ class Paste(object): """ # LevelDB Compatibility - p = r_serv.pipeline(False) + p = self.store.pipeline(False) p.hset(self.p_path, "p_name", self.p_name) p.hset(self.p_path, "p_size", self.p_size) p.hset(self.p_path, "p_mime", self.p_mime) @@ -296,14 +305,14 @@ class Paste(object): pass p.execute() - def save_attribute_redis(self, r_serv, attr_name, value): + def save_attribute_redis(self, attr_name, value): """ Save an attribute as a field """ if type(value) == set: - r_serv.hset(self.p_path, attr_name, json.dumps(list(value))) + self.store.hset(self.p_path, attr_name, json.dumps(list(value))) else: - r_serv.hset(self.p_path, attr_name, json.dumps(value)) + self.store.hset(self.p_path, attr_name, json.dumps(value)) def _get_from_redis(self, r_serv): return r_serv.hgetall(self.p_hash) @@ -366,8 +375,3 @@ class Paste(object): if match != '' and len(match) < 100: matchs.append(match) return matchs - - -if __name__ == "__main__": - pp = pprint.PrettyPrinter(indent=4) - main() diff --git a/bin/packages/ZMQ_PubSub.py b/bin/packages/ZMQ_PubSub.py deleted file mode 100755 index b7c65231..00000000 --- a/bin/packages/ZMQ_PubSub.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/python2.7 -""" -The ``ZMQ PubSub`` Modules -========================== - -""" - -import zmq -import ConfigParser - - -class PubSub(object): - """ - The PubSub class is a ``Virtual Class`` which regroup the shared attribute - of a Publisher ZeroMQ and a Subcriber ZeroMQ - - :param file_conf: -- (str) The filepath of the configuration file used (.cfg) - :param log_channel: -- (str) The channel used as a log channel - :param ps_name: -- (str) The "ID" of the Publisher/Subcriber - - :return: PubSub Object - - ..note:: The ps_name was implemented to separate subscriber queues in redis - when they are listening on a same "stream" - ..seealso:: Method of the ZMQSub class - - ..todo:: Create Implementing a log channel as an attribute of this virtual class. - - """ - def __init__(self, file_conf, log_channel, ps_name): - self._ps_name = ps_name - self._config_parser = ConfigParser.ConfigParser() - self._config_file = file_conf # "./packages/config.cfg" - - self._config_parser.read(self._config_file) - - self._context_zmq = zmq.Context() - - # self._logging_publisher_channel = log_channel # "Default" - # publisher.channel(self._logging_publisher_channel) - - -class ZMQPub(PubSub): - """ - This class create a ZMQ Publisher which is able to send_message to a choosen socket. - - :param pub_config_section: -- (str) The name of the section in the config file to get the settings - - :return: ZMQPub Object - - :Example: - Extract of the config file: - [PubSub_Categ] - adress = tcp://127.0.0.1:5003 - - Creating the object and sending message: - MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg') - - msg = "categ1"+" "+"Im the data sent on the categ1 channel" - MyPublisher.send_message(msg) - - ..note:: The ps_name attribute for a publisher is "optionnal" but still required to be - instantiated correctly. - - """ - def __init__(self, file_conf, pub_config_section, ps_name): - super(ZMQPub, self).__init__(file_conf, "Default", ps_name) - - self._pub_config_section = pub_config_section - self._pubsocket = self._context_zmq.socket(zmq.PUB) - self._pub_adress = self._config_parser.get(self._pub_config_section, "adress") - - self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress")) - - def send_message(self, message): - """Send a message throught the publisher socket""" - self._pubsocket.send(message) - - -class ZMQSub(PubSub): - """ - This class create a ZMQ Subcriber which is able to receive message directly or - throught redis as a buffer. - - The redis buffer is usefull when the subcriber do a time consuming job which is - desynchronising it from the stream of data received. - The redis buffer ensures that no data will be loss waiting to be processed. - - :param sub_config_section: -- (str) The name of the section in the config file to get the settings - :param channel: -- (str) The listening channel of the Subcriber. - - :return: ZMQSub Object - - :Example: - Extract of the config file: - [PubSub_Global] - adress = tcp://127.0.0.1:5000 - channel = filelist - - Creating the object and receiving data + pushing to redis (redis buffering): - - r_serv = redis.StrictRedis( - host = 127.0.0.1, - port = 6387, - db = 0) - - channel = cfg.get("PubSub_Global", "channel") - MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate") - - while True: - MySubscriber.get_and_lpush(r_serv) - - - Inside another script use this line to retrive the data from redis. - ... - while True: - MySubscriber.get_msg_from_queue(r_serv) - ... - - ..note:: If you don't want any redis buffering simply use the "get_message" method - - """ - def __init__(self, file_conf, sub_config_section, channel, ps_name): - super(ZMQSub, self).__init__(file_conf, "Default", ps_name) - - self._sub_config_section = sub_config_section - self._subsocket = self._context_zmq.socket(zmq.SUB) - self._sub_adress = self._config_parser.get(self._sub_config_section, "adress") - - self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress")) - - self._channel = channel - self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel) - - def get_message(self): - """ - Get the first sent message from a Publisher. - :return: (str) Message from Publisher - - """ - return self._subsocket.recv() - - def get_and_lpush(self, r_serv): - """ - Get the first sent message from a Publisher and storing it in redis - - ..note:: This function also create a set named "queue" for monitoring needs - - """ - r_serv.sadd("queues", self._channel+self._ps_name) - r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv()) - - def get_msg_from_queue(self, r_serv): - """ - Get the first sent message from a Redis List - - :return: (str) Message from Publisher - - """ - return r_serv.rpop(self._channel+self._ps_name) diff --git a/bin/packages/config.cfg b/bin/packages/config.cfg deleted file mode 100644 index fd43b98b..00000000 --- a/bin/packages/config.cfg +++ /dev/null @@ -1,61 +0,0 @@ -[Directories] -bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ - -##### Redis ##### -[Redis_Cache] -host = localhost -port = 6379 -db = 0 - -[Redis_Log] -host = localhost -port = 6380 -db = 0 - -[Redis_Queues] -host = localhost -port = 6381 -db = 0 - -[Redis_Data_Merging] -host = localhost -port = 6379 -db = 1 - -##### LevelDB ##### -[Redis_Level_DB] -host = localhost -port = 2013 -db = 0 - -[Redis_Level_DB_Hashs] -host = localhost -port = 2013 -db = 1 - -# PUB / SUB : ZMQ -[Feed] -adress = tcp://crf.circl.lu:5556 -topicfilter = 102 - -[PubSub_Global] -adress = tcp://127.0.0.1:5000 -channel = filelist - -[PubSub_Longlines] -adress = tcp://127.0.0.1:5001 -channel_0 = Longlines -channel_1 = Shortlines - -[PubSub_Words] -adress = tcp://127.0.0.1:5002 -channel_0 = words - -[PubSub_Categ] -adress = tcp://127.0.0.1:5003 -#Channels are dynamic (1 channel per categ) - -[PubSub_Url] -adress = tcp://127.0.0.1:5004 -channel = urls diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 76a3ee20..2483e6c9 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -1,8 +1,8 @@ [Directories] -bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ -wordtrending_csv = /home/user/AIL/var/www/static/csv/wordstrendingdata -wordsfile = /home/user/AIL/files/wordfile +bloomfilters = Blooms +pastes = PASTES +wordtrending_csv = var/www/static/csv/wordstrendingdata +wordsfile = files/wordfile ##### Redis ##### [Redis_Cache] @@ -56,7 +56,10 @@ channel_0 = words [PubSub_Categ] adress = tcp://127.0.0.1:5003 -#Channels are dynamic (1 channel per categ) +channel_0 = creditcard_categ +channel_1 = mails_categ +channel_2 = onion_categ +channel_3 = web_categ [PubSub_Url] adress = tcp://127.0.0.1:5004 @@ -67,4 +70,4 @@ cc_critical = DE # Indexer configuration [Indexer] type = whoosh -path = /home/user/indexdir +path = indexdir diff --git a/bin/packages/lib_words.py b/bin/packages/lib_words.py index 0acea7c8..9446a8ec 100644 --- a/bin/packages/lib_words.py +++ b/bin/packages/lib_words.py @@ -45,7 +45,7 @@ def create_dirfile(r_serv, directory, overwrite): r_serv.delete("filelist") for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was overwritten") @@ -53,13 +53,13 @@ def create_dirfile(r_serv, directory, overwrite): if r_serv.llen("filelist") == 0: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("New list created") else: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was updated with new elements") diff --git a/configs/6379.conf b/configs/6379.conf index 74482f4e..d799cd17 100644 --- a/configs/6379.conf +++ b/configs/6379.conf @@ -38,7 +38,7 @@ daemonize no # When running daemonized, Redis writes a pid file in /var/run/redis.pid by # default. You can specify a custom pid file location here. -pidfile /var/run/redis.pid +#pidfile /var/run/redis.pid # Accept connections on the specified port, default is 6379. # If port 0 is specified Redis will not listen on a TCP socket. @@ -67,8 +67,8 @@ tcp-backlog 511 # incoming connections. There is no default, so Redis will not listen # on a unix socket when not specified. # -unixsocket /tmp/redis.sock -unixsocketperm 755 +#unixsocket /tmp/redis.sock +#unixsocketperm 755 # Close the connection after a client is idle for N seconds (0 to disable) timeout 0 @@ -180,9 +180,9 @@ dbfilename dump.rdb # # The DB will be written inside this directory, with the filename specified # above using the 'dbfilename' configuration directive. -# +# # The Append Only File will also be created inside this directory. -# +# # Note that you must specify a directory here, not a file name. dir ./ @@ -331,7 +331,7 @@ slave-priority 100 # # This should stay commented out for backward compatibility and because most # people do not need auth (e.g. they run their own servers). -# +# # Warning: since Redis is pretty fast an outside user can try up to # 150k passwords per second against a good box. This means that you should # use a very strong password otherwise it will be very easy to break. @@ -397,14 +397,14 @@ slave-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: -# +# # volatile-lru -> remove the key with an expire set using an LRU algorithm # allkeys-lru -> remove any key accordingly to the LRU algorithm # volatile-random -> remove a random key with an expire set # allkeys-random -> remove a random key, any key # volatile-ttl -> remove the key with the nearest expire time (minor TTL) # noeviction -> don't expire at all, just return an error on write operations -# +# # Note: with any of the above policies, Redis will return an error on write # operations, when there are not suitable keys for eviction. # @@ -453,7 +453,7 @@ appendonly no appendfilename "appendonly.aof" # The fsync() call tells the Operating System to actually write data on disk -# instead to wait for more data in the output buffer. Some OS will really flush +# instead to wait for more data in the output buffer. Some OS will really flush # data on disk, some other OS will just try to do it ASAP. # # Redis supports three different modes: @@ -494,7 +494,7 @@ appendfsync everysec # the same as "appendfsync none". In practical terms, this means that it is # possible to lose up to 30 seconds of log in the worst scenario (with the # default Linux settings). -# +# # If you have latency problems turn this to "yes". Otherwise leave it as # "no" that is the safest pick from the point of view of durability. @@ -503,7 +503,7 @@ no-appendfsync-on-rewrite no # Automatic rewrite of the append only file. # Redis is able to automatically rewrite the log file implicitly calling # BGREWRITEAOF when the AOF log size grows by the specified percentage. -# +# # This is how it works: Redis remembers the size of the AOF file after the # latest rewrite (if no rewrite has happened since the restart, the size of # the AOF at startup is used). @@ -546,7 +546,7 @@ lua-time-limit 5000 # but just the time needed to actually execute the command (this is the only # stage of command execution where the thread is blocked and can not serve # other requests in the meantime). -# +# # You can configure the slow log with two parameters: one tells Redis # what is the execution time, in microseconds, to exceed in order for the # command to get logged, and the other parameter is the length of the @@ -566,7 +566,7 @@ slowlog-max-len 128 # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/keyspace-events -# +# # For instance if keyspace events notification is enabled, and a client # performs a DEL operation on key "foo" stored in the Database 0, two # messages will be published via Pub/Sub: @@ -642,7 +642,7 @@ zset-max-ziplist-value 64 # that is rehashing, the more rehashing "steps" are performed, so if the # server is idle the rehashing is never complete and some more memory is used # by the hash table. -# +# # The default is to use this millisecond 10 times every second in order to # active rehashing the main dictionaries, freeing memory when possible. # diff --git a/configs/6380.conf b/configs/6380.conf index 0b1be6a9..2a30b0d1 100644 --- a/configs/6380.conf +++ b/configs/6380.conf @@ -38,7 +38,7 @@ daemonize no # When running daemonized, Redis writes a pid file in /var/run/redis.pid by # default. You can specify a custom pid file location here. -pidfile /var/run/redis.pid +#pidfile /var/run/redis.pid # Accept connections on the specified port, default is 6379. # If port 0 is specified Redis will not listen on a TCP socket. @@ -67,8 +67,8 @@ tcp-backlog 511 # incoming connections. There is no default, so Redis will not listen # on a unix socket when not specified. # -unixsocket /tmp/redis.sock -unixsocketperm 755 +#unixsocket /tmp/redis.sock +#unixsocketperm 755 # Close the connection after a client is idle for N seconds (0 to disable) timeout 0 @@ -180,9 +180,9 @@ dbfilename dump2.rdb # # The DB will be written inside this directory, with the filename specified # above using the 'dbfilename' configuration directive. -# +# # The Append Only File will also be created inside this directory. -# +# # Note that you must specify a directory here, not a file name. dir ./ @@ -331,7 +331,7 @@ slave-priority 100 # # This should stay commented out for backward compatibility and because most # people do not need auth (e.g. they run their own servers). -# +# # Warning: since Redis is pretty fast an outside user can try up to # 150k passwords per second against a good box. This means that you should # use a very strong password otherwise it will be very easy to break. @@ -397,14 +397,14 @@ slave-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: -# +# # volatile-lru -> remove the key with an expire set using an LRU algorithm # allkeys-lru -> remove any key accordingly to the LRU algorithm # volatile-random -> remove a random key with an expire set # allkeys-random -> remove a random key, any key # volatile-ttl -> remove the key with the nearest expire time (minor TTL) # noeviction -> don't expire at all, just return an error on write operations -# +# # Note: with any of the above policies, Redis will return an error on write # operations, when there are not suitable keys for eviction. # @@ -453,7 +453,7 @@ appendonly no appendfilename "appendonly.aof" # The fsync() call tells the Operating System to actually write data on disk -# instead to wait for more data in the output buffer. Some OS will really flush +# instead to wait for more data in the output buffer. Some OS will really flush # data on disk, some other OS will just try to do it ASAP. # # Redis supports three different modes: @@ -494,7 +494,7 @@ appendfsync everysec # the same as "appendfsync none". In practical terms, this means that it is # possible to lose up to 30 seconds of log in the worst scenario (with the # default Linux settings). -# +# # If you have latency problems turn this to "yes". Otherwise leave it as # "no" that is the safest pick from the point of view of durability. @@ -503,7 +503,7 @@ no-appendfsync-on-rewrite no # Automatic rewrite of the append only file. # Redis is able to automatically rewrite the log file implicitly calling # BGREWRITEAOF when the AOF log size grows by the specified percentage. -# +# # This is how it works: Redis remembers the size of the AOF file after the # latest rewrite (if no rewrite has happened since the restart, the size of # the AOF at startup is used). @@ -546,7 +546,7 @@ lua-time-limit 5000 # but just the time needed to actually execute the command (this is the only # stage of command execution where the thread is blocked and can not serve # other requests in the meantime). -# +# # You can configure the slow log with two parameters: one tells Redis # what is the execution time, in microseconds, to exceed in order for the # command to get logged, and the other parameter is the length of the @@ -566,7 +566,7 @@ slowlog-max-len 128 # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/keyspace-events -# +# # For instance if keyspace events notification is enabled, and a client # performs a DEL operation on key "foo" stored in the Database 0, two # messages will be published via Pub/Sub: @@ -642,7 +642,7 @@ zset-max-ziplist-value 64 # that is rehashing, the more rehashing "steps" are performed, so if the # server is idle the rehashing is never complete and some more memory is used # by the hash table. -# +# # The default is to use this millisecond 10 times every second in order to # active rehashing the main dictionaries, freeing memory when possible. # diff --git a/configs/6381.conf b/configs/6381.conf index 8d47e895..95a5c07d 100644 --- a/configs/6381.conf +++ b/configs/6381.conf @@ -38,7 +38,7 @@ daemonize no # When running daemonized, Redis writes a pid file in /var/run/redis.pid by # default. You can specify a custom pid file location here. -pidfile /var/run/redis.pid +#pidfile /var/run/redis.pid # Accept connections on the specified port, default is 6379. # If port 0 is specified Redis will not listen on a TCP socket. @@ -67,8 +67,8 @@ tcp-backlog 511 # incoming connections. There is no default, so Redis will not listen # on a unix socket when not specified. # -unixsocket /tmp/redis.sock -unixsocketperm 755 +#unixsocket /tmp/redis.sock +#unixsocketperm 755 # Close the connection after a client is idle for N seconds (0 to disable) timeout 0 @@ -180,9 +180,9 @@ dbfilename dump3.rdb # # The DB will be written inside this directory, with the filename specified # above using the 'dbfilename' configuration directive. -# +# # The Append Only File will also be created inside this directory. -# +# # Note that you must specify a directory here, not a file name. dir ./ ################################# REPLICATION ################################# @@ -330,7 +330,7 @@ slave-priority 100 # # This should stay commented out for backward compatibility and because most # people do not need auth (e.g. they run their own servers). -# +# # Warning: since Redis is pretty fast an outside user can try up to # 150k passwords per second against a good box. This means that you should # use a very strong password otherwise it will be very easy to break. @@ -396,14 +396,14 @@ slave-priority 100 # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: -# +# # volatile-lru -> remove the key with an expire set using an LRU algorithm # allkeys-lru -> remove any key accordingly to the LRU algorithm # volatile-random -> remove a random key with an expire set # allkeys-random -> remove a random key, any key # volatile-ttl -> remove the key with the nearest expire time (minor TTL) # noeviction -> don't expire at all, just return an error on write operations -# +# # Note: with any of the above policies, Redis will return an error on write # operations, when there are not suitable keys for eviction. # @@ -452,7 +452,7 @@ appendonly no appendfilename "appendonly.aof" # The fsync() call tells the Operating System to actually write data on disk -# instead to wait for more data in the output buffer. Some OS will really flush +# instead to wait for more data in the output buffer. Some OS will really flush # data on disk, some other OS will just try to do it ASAP. # # Redis supports three different modes: @@ -493,7 +493,7 @@ appendfsync everysec # the same as "appendfsync none". In practical terms, this means that it is # possible to lose up to 30 seconds of log in the worst scenario (with the # default Linux settings). -# +# # If you have latency problems turn this to "yes". Otherwise leave it as # "no" that is the safest pick from the point of view of durability. @@ -502,7 +502,7 @@ no-appendfsync-on-rewrite no # Automatic rewrite of the append only file. # Redis is able to automatically rewrite the log file implicitly calling # BGREWRITEAOF when the AOF log size grows by the specified percentage. -# +# # This is how it works: Redis remembers the size of the AOF file after the # latest rewrite (if no rewrite has happened since the restart, the size of # the AOF at startup is used). @@ -545,7 +545,7 @@ lua-time-limit 5000 # but just the time needed to actually execute the command (this is the only # stage of command execution where the thread is blocked and can not serve # other requests in the meantime). -# +# # You can configure the slow log with two parameters: one tells Redis # what is the execution time, in microseconds, to exceed in order for the # command to get logged, and the other parameter is the length of the @@ -565,7 +565,7 @@ slowlog-max-len 128 # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/keyspace-events -# +# # For instance if keyspace events notification is enabled, and a client # performs a DEL operation on key "foo" stored in the Database 0, two # messages will be published via Pub/Sub: @@ -641,7 +641,7 @@ zset-max-ziplist-value 64 # that is rehashing, the more rehashing "steps" are performed, so if the # server is idle the rehashing is never complete and some more memory is used # by the hash table. -# +# # The default is to use this millisecond 10 times every second in order to # active rehashing the main dictionaries, freeing memory when possible. # diff --git a/installing_deps.sh b/installing_deps.sh index 17c2ac89..1d413622 100755 --- a/installing_deps.sh +++ b/installing_deps.sh @@ -5,15 +5,8 @@ set -x sudo apt-get update -sudo apt-get install python-pip python-virtualenv python-dev libfreetype6-dev screen - -virtualenv AILENV - -echo export AIL_HOME=$(pwd) >> ./AILENV/bin/activate -echo export AIL_BIN=$(pwd)/bin/ >> ./AILENV/bin/activate -echo export AIL_FLASK=$(pwd)/var/www/ >> ./AILENV/bin/activate - -sudo apt-get install g++ python-tk +sudo apt-get install python-pip python-virtualenv python-dev libfreetype6-dev \ + screen g++ python-tk #Needed for bloom filters sudo apt-get install libssl-dev libfreetype6-dev python-numpy @@ -24,31 +17,35 @@ sudo apt-get install libadns1 libadns1-dev #needed for mathplotlib test ! -L /usr/include/ft2build.h && sudo ln -s freetype2/ft2build.h /usr/include/ -. ./AILENV/bin/activate - -pip install -r pip_packages_requirement.txt --upgrade - -pip install -U textblob -python -m textblob.download_corpora - # REDIS # test ! -d redis/ && git clone https://github.com/antirez/redis.git pushd redis/ git checkout 2.8 -git pull make popd -echo export AIL_REDIS=$(pwd)/src/ >> ./AILENV/bin/activate - # REDIS LEVEL DB # test ! -d redis-leveldb/ && git clone https://github.com/KDr2/redis-leveldb.git pushd redis-leveldb/ git submodule init git submodule update +make popd -mkdir -p $AIL_HOME/{PASTES,Blooms,dumps} +virtualenv AILENV +echo export AIL_HOME=$(pwd) >> ./AILENV/bin/activate +echo export AIL_BIN=$(pwd)/bin/ >> ./AILENV/bin/activate +echo export AIL_FLASK=$(pwd)/var/www/ >> ./AILENV/bin/activate +echo export AIL_REDIS=$(pwd)/redis/src/ >> ./AILENV/bin/activate +echo export AIL_LEVELDB=$(pwd)/redis-leveldb/ >> ./AILENV/bin/activate + +. ./AILENV/bin/activate + +mkdir -p $AIL_HOME/{PASTES,Blooms,dumps} mkdir -p $AIL_HOME/LEVEL_DB_DATA/{2014,2013} +pip install -r pip_packages_requirement.txt + +# Download the necessary NLTK corpora +HOME=$(pwd) python -m textblob.download_corpora diff --git a/pip_packages_requirement.txt b/pip_packages_requirement.txt index 5505f554..75d5d866 100644 --- a/pip_packages_requirement.txt +++ b/pip_packages_requirement.txt @@ -4,7 +4,7 @@ pyzmq dnspython logbook pubsublogger - +textblob #Graph numpy @@ -29,6 +29,9 @@ texttable #Indexer requirements whoosh +ipaddress +pycountry + #ASN lookup requirements http://adns-python.googlecode.com/files/adns-python-1.2.1.tar.gz https://github.com/trolldbois/python-cymru-services/archive/master.zip