diff --git a/README.md b/README.md index ccb2f80b..f3ee46d0 100644 --- a/README.md +++ b/README.md @@ -52,8 +52,8 @@ Then these modules need to be install with pip inside the virtual environment: ``` You'll need to clone langid: -[https://github.com/saffsd/langid.py] -And install it: +[https://github.com/saffsd/langid.py] +And install it: ``` python setup.py install ``` @@ -63,6 +63,7 @@ That's all the packages you can install with pip: ``` pip install redis pip install logbook +pip install pubsublogger pip install networkx pip install crcmod pip install mmh3 @@ -179,7 +180,7 @@ Those two files are there as an example. Overview -------- -Here is a "chained tree" to show how all ZMQ Modules are linked and how the informations +Here is a "chained tree" to show how all ZMQ Modules are linked and how the informations (mainly the paste) is going through them. The onion module is interfaced at top down level of this tree (like the ZMQ_Sub_Urls module). diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 53a5dc59..0d71248f 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -78,9 +78,9 @@ function launching_logs { screen -dmS "Logging" sleep 0.1 echo -e $GREEN"\t* Launching logging process"$DEFAULT - screen -S "Logging" -X screen -t "LogQueue" bash -c './log_subscriber -p 6380 -c Queuing -l ../logs/; read x' + screen -S "Logging" -X screen -t "LogQueue" bash -c 'log_subscriber -p 6380 -c Queuing -l ../logs/; read x' sleep 0.1 - screen -S "Logging" -X screen -t "LogScript" bash -c './log_subscriber -p 6380 -c Script -l ../logs/; read x' + screen -S "Logging" -X screen -t "LogScript" bash -c 'log_subscriber -p 6380 -c Script -l ../logs/; read x' } function launching_queues { diff --git a/bin/log_subscriber b/bin/log_subscriber deleted file mode 100755 index 1d9c7413..00000000 --- a/bin/log_subscriber +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -import argparse -import signal -from pubsublogger import subscriber - - -def signal_handler(signal, frame): - if subscriber.pubsub is not None: - subscriber.stop() - print "Subscriber closed." - -signal.signal(signal.SIGINT, signal_handler) - -if __name__ == '__main__': - - parser = argparse.ArgumentParser(description='Configure a logging subscriber.') - - parser.add_argument("-H", "--hostname", default='localhost', - type=str, help='Set the hostname of the server.') - parser.add_argument("-p", "--port", default=6379, - type=int, help='Set the server port.') - parser.add_argument('-s', '--use_unix_socket', action='store_true', - help='Use a unix socket path instead of a tcp socket.') - parser.add_argument("--unix_socket_path", default='/tmp/redis.sock', - type=str, help='Unix socket path.') - parser.add_argument("-c", "--channel", - type=str, required=True, help='Channel to subscribe to.') - parser.add_argument("-l", "--log_path", - required=True, help='Path where the logs will be written') - parser.add_argument("-d", "--debug", action="store_true", - help='Also log debug messages.') - parser.add_argument("-m", "--mail", type=file, default=None, - help='Path to the config file used to send errors by email.') - - args = parser.parse_args() - - if args.use_unix_socket: - subscriber.unix_socket = args.unix_socket_path - else: - subscriber.hostname = args.hostname - subscriber.port = args.port - subscriber.run(args.channel, args.log_path, args.debug, args.mail) - diff --git a/bin/packages/Paste.py b/bin/packages/Paste.py index 4e5d809c..f8c6dffa 100755 --- a/bin/packages/Paste.py +++ b/bin/packages/Paste.py @@ -16,7 +16,17 @@ Conditions to fulfill to be able to use this class correctly: """ -import os, magic, gzip, langid, pprint, redis, operator, string, re, json, ConfigParser +import os +import magic +import gzip +import pprint +import redis +import operator +import string +import re +import json +import ConfigParser +import cStringIO from Date import Date from Hash import Hash @@ -25,11 +35,10 @@ from langid.langid import LanguageIdentifier, model from nltk.tokenize import RegexpTokenizer from textblob import TextBlob -from lib_refine import * - clean = lambda dirty: ''.join(filter(string.printable.__contains__, dirty)) """It filters out non-printable characters from the string it receives.""" + class Paste(object): """ This class representing a Paste as an object. @@ -50,38 +59,29 @@ class Paste(object): configfile = './packages/config.cfg' cfg = ConfigParser.ConfigParser() cfg.read(configfile) + self.cache = redis.StrictRedis( + host=cfg.get("Redis_Queues", "host"), + port=cfg.getint("Redis_Queues", "port"), + db=cfg.getint("Redis_Queues", "db")) self.p_path = p_path - self.p_name = self.p_path.split('/')[-1] + self.p_size = round(os.path.getsize(self.p_path)/1024.0, 2) + self.p_mime = magic.from_buffer(self.get_p_content(), mime=True) - self.p_size = round(os.path.getsize(self.p_path)/1024.0,2) - - self.cache = redis.StrictRedis( - host = cfg.get("Redis_Queues", "host"), - port = cfg.getint("Redis_Queues", "port"), - db = cfg.getint("Redis_Queues", "db")) - - self.p_mime = magic.from_buffer(self.get_p_content(), mime = True) - - self.p_encoding = None - - #Assuming that the paste will alway be in a day folder which is itself + # Assuming that the paste will alway be in a day folder which is itself # in a month folder which is itself in a year folder. # /year/month/day/paste.gz var = self.p_path.split('/') self.p_date = Date(var[-4], var[-3], var[-2]) - - self.p_hash_kind = None - self.p_hash = None - - self.p_langage = None - - self.p_nb_lines = None - self.p_max_length_line = None - self.p_source = var[-5] + self.p_encoding = None + self.p_hash_kind = None + self.p_hash = None + self.p_langage = None + self.p_nb_lines = None + self.p_max_length_line = None def get_p_content(self): """ @@ -92,16 +92,18 @@ class Paste(object): PST.get_p_content() """ - r_serv = self.cache - paste = r_serv.get(self.p_path) + paste = self.cache.get(self.p_path) if paste is None: - with gzip.open(self.p_path, 'rb') as F: - paste = F.read() - r_serv.set(self.p_path, paste) - r_serv.expire(self.p_path, 300) + with gzip.open(self.p_path, 'rb') as f: + paste = f.read() + self.cache.set(self.p_path, paste) + self.cache.expire(self.p_path, 300) return paste + def get_p_content_as_file(self): + return cStringIO.StringIO(self.get_p_content()) + def get_lines_info(self): """ Returning and setting the number of lines and the maximum lenght of the @@ -112,15 +114,17 @@ class Paste(object): :Example: PST.get_lines_info() """ - max_length_line = 0 - with gzip.open(self.p_path, 'rb') as F: - for nb_line in enumerate(F): - if len(nb_line[1]) >= max_length_line: - max_length_line = len(nb_line[1]) - - self.p_nb_lines = nb_line[0] - self.p_max_length_line = max_length_line - return (nb_line[0], max_length_line) + if self.p_nb_lines is None or self.p_max_length_line is None: + max_length_line = 0 + f = self.get_p_content_as_file() + for line_id, line in enumerate(f): + length = len(line) + if length >= max_length_line: + max_length_line = length + f.close() + self.p_nb_lines = line_id + self.p_max_length_line = max_length_line + return (self.p_nb_lines, self.p_max_length_line) def _get_p_encoding(self): """ @@ -130,11 +134,10 @@ class Paste(object): """ try: - return magic.Magic(mime_encoding = True).from_buffer(self.get_p_content()) + return magic.Magic(mime_encoding=True).from_buffer(self.get_p_content()) except magic.MagicException: pass - def _set_p_hash_kind(self, hashkind): """ Setting the hash (as an object) used for futur operation on it. @@ -173,9 +176,7 @@ class Paste(object): ..seealso: git@github.com:saffsd/langid.py.git """ - identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True) - return identifier.classify(self.get_p_content()) def _get_p_hash_kind(self): @@ -184,7 +185,7 @@ class Paste(object): def _get_p_date(self): return self.p_date - def _get_hash_lines(self, min = 1, start = 1, jump = 10): + def _get_hash_lines(self, min=1, start=1, jump=10): """ Returning all the lines of the paste hashed. @@ -210,20 +211,17 @@ class Paste(object): """ S = set([]) - with gzip.open(self.p_path, 'rb') as F: - - for num, line in enumerate(F, start): - - if len(line) >= min: - if jump > 1: - if (num % jump) == 1 : - S.add(self.p_hash_kind.Calculate(line)) - else: + f = self.get_p_content_as_file() + for num, line in enumerate(f, start): + if len(line) >= min: + if jump > 1: + if (num % jump) == 1: S.add(self.p_hash_kind.Calculate(line)) + else: + S.add(self.p_hash_kind.Calculate(line)) return S - - def is_duplicate(self, obj, min = 1, percent = 50, start = 1, jump = 10): + def is_duplicate(self, obj, min=1, percent=50, start=1, jump=10): """ Returning the percent of similarity with another paste. ( Using the previous hashing method ) @@ -264,8 +262,7 @@ class Paste(object): else: return False, var - - def save_all_attributes_redis(self, r_serv, key = None): + def save_all_attributes_redis(self, r_serv, key=None): """ Saving all the attributes in a "Redis-like" Database (Redis, LevelDB) @@ -281,23 +278,25 @@ class Paste(object): PST.save_all_attributes_redis(r_serv) """ - #LevelDB Compatibility - r_serv.hset(self.p_path, "p_name", self.p_name) - r_serv.hset(self.p_path, "p_size", self.p_size) - r_serv.hset(self.p_path, "p_mime", self.p_mime) - #r_serv.hset(self.p_path, "p_encoding", self.p_encoding) - r_serv.hset(self.p_path, "p_date", self._get_p_date()) - r_serv.hset(self.p_path, "p_hash_kind", self._get_p_hash_kind()) - r_serv.hset(self.p_path, "p_hash", self.p_hash) - #r_serv.hset(self.p_path, "p_langage", self.p_langage) - #r_serv.hset(self.p_path, "p_nb_lines", self.p_nb_lines) - #r_serv.hset(self.p_path, "p_max_length_line", self.p_max_length_line) - #r_serv.hset(self.p_path, "p_categories", self.p_categories) - r_serv.hset(self.p_path, "p_source", self.p_source) - if key != None: - r_serv.sadd(key, self.p_path) + # LevelDB Compatibility + p = r_serv.pipeline(False) + p.hset(self.p_path, "p_name", self.p_name) + p.hset(self.p_path, "p_size", self.p_size) + p.hset(self.p_path, "p_mime", self.p_mime) + # p.hset(self.p_path, "p_encoding", self.p_encoding) + p.hset(self.p_path, "p_date", self._get_p_date()) + p.hset(self.p_path, "p_hash_kind", self._get_p_hash_kind()) + p.hset(self.p_path, "p_hash", self.p_hash) + # p.hset(self.p_path, "p_langage", self.p_langage) + # p.hset(self.p_path, "p_nb_lines", self.p_nb_lines) + # p.hset(self.p_path, "p_max_length_line", self.p_max_length_line) + # p.hset(self.p_path, "p_categories", self.p_categories) + p.hset(self.p_path, "p_source", self.p_source) + if key is not None: + p.sadd(key, self.p_path) else: pass + p.execute() def save_attribute_redis(self, r_serv, attr_name, value): """ @@ -308,11 +307,10 @@ class Paste(object): else: r_serv.hset(self.p_path, attr_name, json.dumps(value)) - def _get_from_redis(self,r_serv): + def _get_from_redis(self, r_serv): return r_serv.hgetall(self.p_hash) - - def _get_top_words(self, sort = False): + def _get_top_words(self, sort=False): """ Tokenising method: Returning a sorted list or a set of paste's words @@ -325,28 +323,23 @@ class Paste(object): """ words = {} tokenizer = RegexpTokenizer('[\&\~\:\;\,\.\(\)\{\}\|\[\]\\\\/\-/\=\'\"\%\$\?\@\+\#\_\^\<\>\!\*\n\r\t\s]+', - gaps = True, - discard_empty = True) + gaps=True, discard_empty=True) - blob = TextBlob(clean(self.get_p_content()), - tokenizer = tokenizer) + blob = TextBlob(clean(self.get_p_content()), tokenizer=tokenizer) for word in blob.tokens: if word in words.keys(): num = words[word] else: num = 0 - words[word] = num + 1 - if sort: - var = sorted(words.iteritems(), key = operator.itemgetter(1), reverse = True) + var = sorted(words.iteritems(), key=operator.itemgetter(1), reverse=True) else: var = words return var - def _get_word(self, word): """ Returning a specific word and his occurence if present in the paste @@ -358,7 +351,6 @@ class Paste(object): """ return [item for item in self._get_top_words() if item[0] == word] - def get_regex(self, regex): """ Returning matches with the regex given as an argument. diff --git a/bin/pubsublogger/__init__.py b/bin/pubsublogger/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bin/pubsublogger/exceptions.py b/bin/pubsublogger/exceptions.py deleted file mode 100644 index adab92db..00000000 --- a/bin/pubsublogger/exceptions.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -"Core exceptions raised by the PubSub module" - -class PubSubError(Exception): - pass - -class InvalidErrorLevel(PubSubError): - pass - -class NoChannelError(PubSubError): - pass diff --git a/bin/pubsublogger/publisher.py b/bin/pubsublogger/publisher.py deleted file mode 100644 index 7b30f293..00000000 --- a/bin/pubsublogger/publisher.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -""" -:mod:`publisher` -- Publish logging messages on a redis channel - -To use this module, you have to define at least a channel name. - -.. note:: - The channel name should represent the area of the program you want - to log. It can be whatever you want. - - -""" - -import redis - -from pubsublogger.exceptions import InvalidErrorLevel, NoChannelError - -# use a TCP Socket by default -use_tcp_socket = True - -#default config for a UNIX socket -unix_socket = '/tmp/redis.sock' -# default config for a TCP socket -hostname = 'localhost' -port = 6380 - -channel = None -redis_instance = None - -__error_levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL') - - -def __connect(): - """ - Connect to a redis instance. - """ - global redis_instance - if use_tcp_socket: - redis_instance = redis.StrictRedis(host=hostname, port=port) - else: - redis_instance = redis.StrictRedis(unix_socket_path = unix_socket) - - -def log(level, message): - """ - Publish `message` with the `level` the redis `channel`. - - :param level: the level of the message - :param message: the message you want to log - """ - if redis_instance is None: - __connect() - - if level not in __error_levels: - raise InvalidErrorLevel('You have used an invalid error level. \ - Please choose in: ' + ', '.join(__error_levels)) - if channel is None: - raise NoChannelError('Please set a channel.') - c = '{channel}.{level}'.format(channel=channel, level=level) - redis_instance.publish(c, message) - - -def debug(message): - """ - Publush a DEBUG `message` - """ - log('DEBUG', message) - - -def info(message): - """ - Publush an INFO `message` - """ - log('INFO', message) - - -def warning(message): - """ - Publush a WARNING `message` - """ - log('WARNING', message) - - -def error(message): - """ - Publush an ERROR `message` - """ - log('ERROR', message) - - -def critical(message): - """ - Publush a CRITICAL `message` - """ - log('CRITICAL', message) - - diff --git a/bin/pubsublogger/subscriber.py b/bin/pubsublogger/subscriber.py deleted file mode 100644 index a150cc68..00000000 --- a/bin/pubsublogger/subscriber.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -""" -:mod:`subscriber` -- Subscribe to a redis channel and gather logging messages. - -To use this module, you have to define at least a channel name. -""" - - -import redis -from logbook import Logger -import ConfigParser -from logbook import NestedSetup -from logbook import NullHandler -from logbook import TimedRotatingFileHandler -from logbook import MailHandler -import os - -# use a TCP Socket by default -use_tcp_socket = True - -# default config for a UNIX socket -unix_socket = '/tmp/redis.sock' -# default config for a TCP socket -hostname = 'localhost' -port = 6379 - -pubsub = None -channel = None - -# Required only if you want to send emails -dest_mails = [] -smtp_server = None -smtp_port = 0 -src_server = None - - -def setup(name, path='log', enable_debug=False): - """ - Prepare a NestedSetup. - - :param name: the channel name - :param path: the path where the logs will be written - :param enable_debug: do we want to save the message at the DEBUG level - - :return a nested Setup - """ - path_tmpl = os.path.join(path, '{name}_{level}.log') - info = path_tmpl.format(name=name, level='info') - warn = path_tmpl.format(name=name, level='warn') - err = path_tmpl.format(name=name, level='err') - crit = path_tmpl.format(name=name, level='crit') - # a nested handler setup can be used to configure more complex setups - setup = [ - # make sure we never bubble up to the stderr handler - # if we run out of setup handling - NullHandler(), - # then write messages that are at least info to to a logfile - TimedRotatingFileHandler(info, level='INFO', encoding='utf-8', - date_format='%Y-%m-%d'), - # then write messages that are at least warnings to to a logfile - TimedRotatingFileHandler(warn, level='WARNING', encoding='utf-8', - date_format='%Y-%m-%d'), - # then write messages that are at least errors to to a logfile - TimedRotatingFileHandler(err, level='ERROR', encoding='utf-8', - date_format='%Y-%m-%d'), - # then write messages that are at least critical errors to to a logfile - TimedRotatingFileHandler(crit, level='CRITICAL', encoding='utf-8', - date_format='%Y-%m-%d'), - ] - if enable_debug: - debug = path_tmpl.format(name=name, level='debug') - setup.insert(1, TimedRotatingFileHandler(debug, level='DEBUG', - encoding='utf-8', date_format='%Y-%m-%d')) - if src_server is not None and smtp_server is not None \ - and smtp_port != 0 and len(dest_mails) != 0: - mail_tmpl = '{name}_error@{src}' - from_mail = mail_tmpl.format(name=name, src=src_server) - subject = 'Error in {}'.format(name) - # errors should then be delivered by mail and also be kept - # in the application log, so we let them bubble up. - setup.append(MailHandler(from_mail, dest_mails, subject, - level='ERROR', bubble=True, - server_addr=(smtp_server, smtp_port))) - - return NestedSetup(setup) - - -def mail_setup(path): - """ - Set the variables to be able to send emails. - - :param path: path to the config file - """ - global dest_mails - global smtp_server - global smtp_port - global src_server - config = ConfigParser.RawConfigParser() - config.readfp(path) - dest_mails = config.get('mail', 'dest_mail').split(',') - smtp_server = config.get('mail', 'smtp_server') - smtp_port = config.get('mail', 'smtp_port') - src_server = config.get('mail', 'src_server') - - -def run(log_name, path, debug=False, mail=None): - """ - Run a subscriber and pass the messages to the logbook setup. - Stays alive as long as the pubsub instance listen to something. - - :param log_name: the channel to listen to - :param path: the path where the log files will be written - :param debug: True if you want to save the debug messages too - :param mail: Path to the config file for the mails - - """ - global pubsub - global channel - channel = log_name - if use_tcp_socket: - r = redis.StrictRedis(host=hostname, port=port) - else: - r = redis.StrictRedis(unix_socket_path=unix_socket) - pubsub = r.pubsub() - pubsub.psubscribe(channel + '.*') - - logger = Logger(channel) - if mail is not None: - mail_setup(mail) - if os.path.exists(path) and not os.path.isdir(path): - raise Exception("The path you want to use to save the file is invalid (not a directory).") - if not os.path.exists(path): - os.mkdir(path) - with setup(channel, path, debug): - for msg in pubsub.listen(): - if msg['type'] == 'pmessage': - level = msg['channel'].split('.')[1] - message = msg['data'] - try: - message = message.decode('utf-8') - except: - pass - logger.log(level, message) - - -def stop(): - """ - Unsubscribe to the channel, stop the script. - """ - pubsub.punsubscribe(channel + '.*') diff --git a/pip_packages_requirement.txt b/pip_packages_requirement.txt index d8a8a5a7..46b2088d 100644 --- a/pip_packages_requirement.txt +++ b/pip_packages_requirement.txt @@ -3,6 +3,7 @@ redis pyzmq dnspython logbook +pubsublogger #Graph