mirror of https://github.com/CIRCL/AIL-framework
615 lines
20 KiB
Python
615 lines
20 KiB
Python
import redis, gzip
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from pylab import *
|
|
|
|
from textblob import TextBlob
|
|
from nltk.corpus import stopwords
|
|
from nltk.tokenize import RegexpTokenizer
|
|
|
|
from lib_redis_insert import clean, listdirectory
|
|
from lib_jobs import *
|
|
|
|
from pubsublogger import publisher
|
|
|
|
import calendar as cal
|
|
from datetime import date, timedelta
|
|
from dateutil.rrule import rrule, DAILY
|
|
|
|
from packages import *
|
|
|
|
def redis_words_ranking(pipe, r_serv, nb, minlength, maxlength):
|
|
"""Looping function
|
|
|
|
:param pipe: -- Redis pipe.
|
|
:param nb: -- (int) Number of pastes proceeded by function
|
|
:param minlength: -- (int) passed to the next function
|
|
:param maxlength: -- (int) passed to the next function
|
|
|
|
"""
|
|
try:
|
|
for n in xrange(0,nb):
|
|
|
|
path = r_serv.lpop("filelist")
|
|
|
|
if path != None:
|
|
set_listof_pid(r_serv, path, sys.argv[0])
|
|
|
|
redis_zincr_words(pipe, path, minlength, maxlength)
|
|
|
|
update_listof_pid(r_serv)
|
|
|
|
r_serv.lpush("processed",path)
|
|
|
|
publisher.debug(path)
|
|
else:
|
|
publisher.debug("Empty list")
|
|
break
|
|
except (KeyboardInterrupt, SystemExit) as e:
|
|
flush_list_of_pid(r_serv)
|
|
publisher.debug("Pid list flushed")
|
|
|
|
|
|
|
|
|
|
|
|
def redis_zincr_words(pipe, filename, minlength, maxlength):
|
|
"""Create news sorted set in redis.
|
|
|
|
:param minlength: -- (int) Minimum words length inserted
|
|
:param maxlength: -- (int) Maximum words length inserted
|
|
:param filename: -- The absolute path to the file.gz to process.
|
|
|
|
Representation of the set in redis:
|
|
|
|
+------------+------------+-----------+
|
|
| Keys | Members | Scores |
|
|
+============+============+===========+
|
|
| 20131001 | word1 | 142 |
|
|
+------------+------------+-----------+
|
|
| ... | word2 | 120 |
|
|
+------------+------------+-----------+
|
|
| 20131002 | ... | ... |
|
|
+------------+------------+-----------+
|
|
|
|
This function store all words between minlength and maxlength in redis.
|
|
Redis will count as well how much time each word will appear by day:
|
|
The cardinality.
|
|
|
|
"""
|
|
tokenizer = RegexpTokenizer('[\&\~\:\;\,\.\(\)\{\}\|\[\]\\\\/\-/\=\'\"\%\$\?\@\+\#\_\^\<\>\!\*\n\r\t\s]+', gaps = True, discard_empty = True)
|
|
|
|
with gzip.open(filename, 'rb') as F:
|
|
|
|
blob = TextBlob(clean(F.read()), tokenizer = tokenizer)
|
|
|
|
for word in blob.tokens:
|
|
|
|
if (len(word) >= minlength) and (len(word) <= maxlength):
|
|
pipe.zincrby(filename[-22:-12].replace('/',''), word, 1)
|
|
|
|
if (len(word) >= maxlength):
|
|
publisher.info("word bigger than {0} detected at {1}".format(maxlength, filename))
|
|
publisher.info(word)
|
|
|
|
pipe.execute()
|
|
|
|
|
|
|
|
|
|
def classify_token_paste(r_serv, listname, choicedatastruct, nb, r_set):
|
|
"""Tokenizing on word category
|
|
|
|
:param r_serv: -- Redis database connexion
|
|
:param listname: -- (str) path to the file containing the list of path of category files
|
|
:param choicedatastruct: -- (bool) Changing the index of datastructure
|
|
:param nb: -- (int) Number of pastes proceeded by function
|
|
|
|
Redis data structures cas be choose as follow:
|
|
|
|
+---------------+------------+-----------+
|
|
| Keys | Members | Scores |
|
|
+===============+============+===========+
|
|
| mails_categ | filename | 25000 |
|
|
+---------------+------------+-----------+
|
|
| ... | filename2 | 2400 |
|
|
+---------------+------------+-----------+
|
|
| web_categ | ... | ... |
|
|
+---------------+------------+-----------+
|
|
|
|
Or
|
|
|
|
+--------------+-------------+-----------+
|
|
| Keys | Members | Scores |
|
|
+==============+=============+===========+
|
|
| filename | mails_categ | 100000 |
|
|
+--------------+-------------+-----------+
|
|
| ... | web_categ | 24050 |
|
|
+--------------+-------------+-----------+
|
|
| filename2 | ... | ... |
|
|
+--------------+-------------+-----------+
|
|
|
|
This function tokenise on all special characters like: @^\|[{#~}]!:;$^=
|
|
And insert data in redis if the token match the keywords in a list previously
|
|
created.
|
|
These lists of keywords can be list of everything you want but it's better
|
|
to create "category" of keywords.
|
|
|
|
"""
|
|
|
|
try:
|
|
for n in xrange(0,nb):
|
|
filename = r_serv.lpop(r_set)
|
|
|
|
if filename != None:
|
|
|
|
tokenizer = RegexpTokenizer('[\&\~\:\;\,\.\(\)\{\}\|\[\]\\\\/\-/\=\'\"\%\$\?\@\+\#\_\^\<\>\!\*\n\r\t\s]+', gaps = True, discard_empty = True)
|
|
set_listof_pid(r_serv, filename, sys.argv[0])
|
|
|
|
with open(listname, 'rb') as L:
|
|
# for each "categ" listed in the file
|
|
for num, fname in enumerate(L):
|
|
# contain keywords by categ
|
|
tmp_list = []
|
|
#for each keywords
|
|
with open(fname[:-1], 'rb') as LS:
|
|
|
|
for num, kword in enumerate(LS):
|
|
tmp_list.append(kword[:-1])
|
|
|
|
# for each paste
|
|
with gzip.open(filename, 'rb') as F:
|
|
|
|
blob = TextBlob(clean(F.read()),
|
|
tokenizer = tokenizer)
|
|
|
|
# for each paste token
|
|
for word in blob.tokens.lower():
|
|
|
|
if word in tmp_list:
|
|
# choosing between two data structures.
|
|
if choicedatastruct:
|
|
r_serv.zincrby(filename,
|
|
fname.split('/')[-1][:-1],
|
|
1)
|
|
else:
|
|
r_serv.zincrby(fname.split('/')[-1][:-1],
|
|
filename,
|
|
1)
|
|
|
|
update_listof_pid(r_serv)
|
|
|
|
else:
|
|
publisher.debug("Empty list")
|
|
#r_serv.save()
|
|
break
|
|
|
|
except (KeyboardInterrupt, SystemExit) as e:
|
|
flush_list_of_pid(r_serv)
|
|
publisher.debug("Pid list flushed")
|
|
|
|
|
|
|
|
|
|
def dectect_longlines(r_serv, r_key, store = False, maxlength = 500):
|
|
"""Store longlines's linenumbers in redis
|
|
|
|
:param r_serv: -- The redis connexion database
|
|
:param r_key: -- (str) The key name in redis
|
|
:param store: -- (bool) Store the line numbers or not.
|
|
:param maxlength: -- The limit between "short lines" and "long lines"
|
|
|
|
This function connect to a redis list of filename (pastes filename);
|
|
Open the paste and check inside if there is some line with their
|
|
length >= to maxlength.
|
|
If yes, the paste is "tagged" as containing a longlines in another
|
|
redis structures, and the linenumber (of the long lines) can be stored
|
|
in addition if the argument store is at True.
|
|
|
|
"""
|
|
try:
|
|
while True:
|
|
#r_key_list (categ)
|
|
filename = r_serv.lpop(r_key)
|
|
|
|
if filename != None:
|
|
|
|
set_listof_pid(r_serv, filename, sys.argv[0])
|
|
|
|
# for each pastes
|
|
with gzip.open(filename, 'rb') as F:
|
|
var = True
|
|
for num, line in enumerate(F):
|
|
|
|
if len(line) >= maxlength:
|
|
#publisher.debug("Longline:{0}".format(line))
|
|
if var:
|
|
r_serv.rpush("longlines", filename)
|
|
var = False
|
|
|
|
if store:
|
|
r_serv.sadd(filename, num)
|
|
else:
|
|
publisher.debug("Line numbers of longlines not stored")
|
|
|
|
update_listof_pid(r_serv)
|
|
else:
|
|
publisher.debug("Empty list")
|
|
return False
|
|
break
|
|
|
|
except (KeyboardInterrupt, SystemExit) as e:
|
|
flush_list_of_pid(r_serv)
|
|
publisher.debug("Pid list flushed")
|
|
|
|
|
|
|
|
|
|
# NOT USED RIGHT NOW #
|
|
def recovering_longlines(r_serv):
|
|
"""Get longlines with linenumbers
|
|
|
|
"""
|
|
try:
|
|
for n in xrange(0,nb):
|
|
filename = r_serv.lpop("longlines")
|
|
|
|
if filename != None:
|
|
# For each values in redis (longline's line number)
|
|
for numline in r_serv.smembers(filename):
|
|
|
|
with gzip.open(filename,'rb') as F:
|
|
|
|
for num, line in enumerate(F):
|
|
#When corresponding.
|
|
if int(num) == int(numline):
|
|
pass
|
|
# TREATMENT
|
|
else:
|
|
publisher.debug("Empty list")
|
|
r_serv.save()
|
|
break
|
|
|
|
except (KeyboardInterrupt, SystemExit) as e:
|
|
flush_list_of_pid(r_serv)
|
|
publisher.debug("Pid list flushed")
|
|
|
|
|
|
|
|
|
|
def remove_longline_from_categ(r_serv, r_key, delete, store, maxlength):
|
|
"""Remove from a set, file with long lines.
|
|
|
|
:param r_serv: -- The redis connexion database
|
|
:param r_key: -- (str) The key name in redis
|
|
:param store: -- (bool) Store the line numbers or not.
|
|
:param delete: -- (bool) If true, delete the used key from redis.
|
|
:param maxlength: -- The limit between "short lines" and "long lines"
|
|
|
|
"""
|
|
publisher.info("Number of file before:{0}".format(r_serv.zcard(r_key)))
|
|
|
|
#Create a list of file to proceed (1)
|
|
for filename in r_serv.zrange(r_key, 0, -1):
|
|
r_serv.rpush(r_key+"_list", filename)
|
|
|
|
#detecting longlines in pastes
|
|
dectect_longlines(r_serv, r_key+"_list", store, maxlength)
|
|
|
|
#remove false positive members
|
|
while True:
|
|
fp_filename = r_serv.lpop("longlines")
|
|
|
|
if fp_filename == None:
|
|
break
|
|
|
|
else:
|
|
# if wanted, delete in addition the set with linenumbers (created with store)
|
|
if delete:
|
|
r_serv.zrem(r_key, fp_filename)
|
|
r_serv.delete(fp_filename)
|
|
|
|
else:
|
|
#remove the file with longline from the r_key zset.
|
|
r_serv.zrem(r_key, fp_filename)
|
|
|
|
publisher.info("Longline file removed from {0}, {1} Files remaining".format(r_key, r_serv.zcard(r_key)))
|
|
|
|
|
|
|
|
|
|
def detect_longline_from_list(r_serv, nb):
|
|
try:
|
|
for n in xrange(0,nb):
|
|
|
|
if not dectect_longlines(r_serv, "filelist", True):
|
|
break
|
|
|
|
except (KeyboardInterrupt, SystemExit) as e:
|
|
flush_list_of_pid(r_serv)
|
|
publisher.debug("Pid list flushed")
|
|
|
|
|
|
|
|
|
|
def create_dirfile(r_serv, directory, overwrite):
|
|
"""Create a file of path.
|
|
|
|
:param r_serv: -- connexion to redis database
|
|
:param directory: -- The folder where to launch the listing of the .gz files
|
|
|
|
This function create a list in redis with inside the absolute path
|
|
of all the pastes needed to be proceeded by function using parallel
|
|
(like redis_words_ranking)
|
|
|
|
"""
|
|
if overwrite:
|
|
r_serv.delete("filelist")
|
|
|
|
for x in listdirectory(directory):
|
|
r_serv.rpush("filelist",x)
|
|
|
|
publisher.info("The list was overwritten")
|
|
|
|
else:
|
|
if r_serv.llen("filelist") == 0:
|
|
|
|
for x in listdirectory(directory):
|
|
r_serv.rpush("filelist",x)
|
|
|
|
publisher.info("New list created")
|
|
else:
|
|
|
|
for x in listdirectory(directory):
|
|
r_serv.rpush("filelist",x)
|
|
|
|
publisher.info("The list was updated with new elements")
|
|
|
|
|
|
|
|
|
|
def redis_interbargraph_set(r_serv, year, month, overwrite):
|
|
"""Create a Redis sorted set.
|
|
|
|
:param r_serv: -- connexion to redis database
|
|
:param year: -- (integer) The year to process
|
|
:param month: -- (integer) The month to process
|
|
:param overwrite: -- (bool) trigger the overwrite mode
|
|
|
|
This function create inside redis the intersection of all days in
|
|
a month two by two.
|
|
Example:
|
|
For a month of 31days it will create 30 sorted set between day and
|
|
day+1 until the last day.
|
|
The overwrite mode delete the intersets and re-create them.
|
|
|
|
"""
|
|
a = date(year, month, 01)
|
|
b = date(year, month, cal.monthrange(year, month)[1])
|
|
|
|
if overwrite:
|
|
r_serv.delete("InterSet")
|
|
|
|
for dt in rrule(DAILY, dtstart = a, until = b - timedelta(1)):
|
|
dayafter = dt+timedelta(1)
|
|
|
|
r_serv.delete(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
|
|
|
|
r_serv.zinterstore(
|
|
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")),
|
|
{str(dt.strftime("%Y%m%d")):1,
|
|
str(dayafter.strftime("%Y%m%d")):-1})
|
|
|
|
r_serv.zadd(
|
|
"InterSet",
|
|
1,
|
|
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
|
|
else:
|
|
for dt in rrule(DAILY, dtstart = a, until = b - timedelta(1)):
|
|
dayafter = dt+timedelta(1)
|
|
|
|
if r_serv.zcard(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d"))) == 0:
|
|
|
|
r_serv.zinterstore(
|
|
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")),
|
|
{str(dt.strftime("%Y%m%d")):1,
|
|
str(dayafter.strftime("%Y%m%d")):-1})
|
|
|
|
r_serv.zadd(
|
|
"InterSet",
|
|
1,
|
|
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
|
|
|
|
publisher.info(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d"))+" Intersection Created")
|
|
|
|
else:
|
|
publisher.warning("Data already exist, operation aborted.")
|
|
|
|
|
|
|
|
|
|
|
|
def word_bar_graph(r_serv, year, month, filename):
|
|
"""Create an histogram.
|
|
|
|
:param r_serv: -- connexion to redis database
|
|
:param year: -- (integer) The year to process
|
|
:param month: -- (integer) The month to process
|
|
:param filename: -- The absolute path where to save the figure.png
|
|
|
|
This function use matplotlib to create an histogram.
|
|
The redis database need obviously to be populated first
|
|
with functions: redis_words_ranking and redis_interbargraph_set.
|
|
|
|
"""
|
|
lw = []
|
|
adate = []
|
|
inter = [0]
|
|
rcParams['figure.figsize'] = 15, 10
|
|
|
|
a = date(year, month, 01)
|
|
b = date(year, month, cal.monthrange(year,month)[1])
|
|
|
|
for dt in rrule(DAILY, dtstart = a, until = b):
|
|
lw.append(r_serv.zcard(dt.strftime("%Y%m%d")))
|
|
adate.append(dt.strftime("%d"))
|
|
|
|
for x in r_serv.zrange("InterSet", 0, 31):
|
|
inter.append(r_serv.zcard(x))
|
|
|
|
n_groups = len(lw)
|
|
card_words = tuple(lw)
|
|
card_interword = tuple(inter)
|
|
|
|
index = np.arange(n_groups)
|
|
bar_width = 0.5
|
|
opacity = 0.6
|
|
|
|
words = plt.bar(index, card_words, bar_width,
|
|
alpha=opacity,
|
|
color='g',
|
|
label='Words/day')
|
|
|
|
lwords = plt.bar(index - 0.5, card_interword, bar_width,
|
|
alpha=opacity,
|
|
color='r',
|
|
label='Intersection')
|
|
|
|
|
|
plt.plot(tuple(inter), 'b--')
|
|
plt.xlabel(str(year)+'/'+str(month)+' Days')
|
|
plt.ylabel('Words')
|
|
plt.title('Words Cardinality & Intersection Histogram')
|
|
plt.xticks(index + bar_width/2 , tuple(adate))
|
|
|
|
plt.legend()
|
|
plt.grid()
|
|
|
|
plt.tight_layout()
|
|
|
|
plt.savefig(filename+".png", dpi=None, facecolor='w', edgecolor='b',
|
|
orientation='portrait', papertype=None, format="png",
|
|
transparent=False, bbox_inches=None, pad_inches=0.1,
|
|
frameon=True)
|
|
|
|
publisher.info(filename+".png"+" saved!")
|
|
|
|
|
|
|
|
|
|
def create_data_words_curve(r_serv, r_serv2, year, month, filename):
|
|
"""Create a Redis hashes.
|
|
|
|
:param r_serv: -- connexion to redis database (read)
|
|
:param r_serv2: -- connexion to redis database (write)
|
|
:param year: -- (integer) The year to process
|
|
:param month: -- (integer) The month to process
|
|
:param filename: -- the path to the file which contain a list of words.
|
|
|
|
|
|
The hashes of redis is created as follow:
|
|
|
|
+------------+------------+-----------+
|
|
| Keys | Field | Values |
|
|
+============+============+===========+
|
|
| word1 | 20131001 | 150 |
|
|
+------------+------------+-----------+
|
|
| ... | 20131002 | 145 |
|
|
+------------+------------+-----------+
|
|
| word2 | ... | ... |
|
|
+------------+------------+-----------+
|
|
|
|
The filename need to be a list of words separated by a carriage return
|
|
with an empty line at the end.
|
|
This function create datas which is used by the function
|
|
create_curve_with_word_file which create a csv file.
|
|
|
|
"""
|
|
stop = stopwords.words('english')
|
|
a = date(year, month, 01)
|
|
b = date(year, month, cal.monthrange(year,month)[1])
|
|
|
|
with open(filename, 'rb') as F:
|
|
|
|
for line in F:
|
|
|
|
for dt in rrule(DAILY, dtstart = a, until = b):
|
|
|
|
if r_serv.zscore(dt.strftime("%Y%m%d"), line[:-1]) is not None:
|
|
#tester si ca existe deja "en option" et ajouter un WARNING log
|
|
r_serv2.hmset(line[:-1], {str(dt.strftime("%Y%m%d")):r_serv.zscore(dt.strftime("%Y%m%d"), line[:-1])})
|
|
else:
|
|
pass
|
|
|
|
|
|
|
|
|
|
def create_curve_with_word_file(r_serv, csvfilename, feederfilename, year, month):
|
|
"""Create a csv file used with dygraph.
|
|
|
|
:param r_serv: -- connexion to redis database
|
|
:param csvfilename: -- the path to the .csv file created
|
|
:param feederfilename: -- the path to the file which contain a list of words.
|
|
:param year: -- (integer) The year to process
|
|
:param month: -- (integer) The month to process
|
|
|
|
This function create a .csv file using datas in redis.
|
|
It's checking if the words contained in feederfilename and
|
|
their respectives values by days exists. If these values are missing
|
|
(Word not present during a day) it's will automatically put a 0
|
|
to keep the timeline of the curve correct.
|
|
|
|
"""
|
|
a = date(year, month, 01)
|
|
b = date(year, month, cal.monthrange(year,month)[1])
|
|
days = {}
|
|
words = []
|
|
|
|
with open(feederfilename, 'rb') as F:
|
|
for word in F: # words of the files
|
|
words.append(word[:-1]) # list of words (sorted as in the file)
|
|
|
|
for dt in rrule(DAILY, dtstart = a, until = b): # for each days
|
|
|
|
mot = []
|
|
mot1 = []
|
|
mot2 = []
|
|
|
|
days[dt.strftime("%Y%m%d")] = ''
|
|
for word in sorted(words): # from the 1srt day to the last of the list
|
|
if r_serv.hexists(word, dt.strftime("%Y%m%d")): # if the word have a value for the day
|
|
mot1.append(str(word))
|
|
mot2.append(r_serv.hget(word, dt.strftime("%Y%m%d")))
|
|
|
|
mot = zip(mot1, mot2)
|
|
|
|
days[dt.strftime("%Y%m%d")] = mot
|
|
else:
|
|
|
|
mot1.append(str(word))
|
|
mot2.append(0)
|
|
|
|
mot = zip(mot1, mot2)
|
|
|
|
days[dt.strftime("%Y%m%d")] = mot
|
|
|
|
with open(csvfilename+".csv", 'wb') as F:
|
|
F.write("Date," + ",".join(sorted(words)) + '\n')
|
|
|
|
for x, s in days.items():
|
|
val = []
|
|
for y in s:
|
|
val.append(y[1])
|
|
|
|
F.write(x + ',' + str(val) + '\n')
|
|
|
|
with open(csvfilename+".csv", 'rb') as F:
|
|
h = F.read()
|
|
h = h.replace("[","")
|
|
h = h.replace("]","")
|
|
h = h.replace('\'',"")
|
|
|
|
with open(csvfilename+".csv", 'wb') as F:
|
|
F.write(h)
|