AIL-framework/bin/packages/lib_words.py

615 lines
20 KiB
Python

import redis, gzip
import numpy as np
import matplotlib.pyplot as plt
from pylab import *
from textblob import TextBlob
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from lib_redis_insert import clean, listdirectory
from lib_jobs import *
from pubsublogger import publisher
import calendar as cal
from datetime import date, timedelta
from dateutil.rrule import rrule, DAILY
from packages import *
def redis_words_ranking(pipe, r_serv, nb, minlength, maxlength):
"""Looping function
:param pipe: -- Redis pipe.
:param nb: -- (int) Number of pastes proceeded by function
:param minlength: -- (int) passed to the next function
:param maxlength: -- (int) passed to the next function
"""
try:
for n in xrange(0,nb):
path = r_serv.lpop("filelist")
if path != None:
set_listof_pid(r_serv, path, sys.argv[0])
redis_zincr_words(pipe, path, minlength, maxlength)
update_listof_pid(r_serv)
r_serv.lpush("processed",path)
publisher.debug(path)
else:
publisher.debug("Empty list")
break
except (KeyboardInterrupt, SystemExit) as e:
flush_list_of_pid(r_serv)
publisher.debug("Pid list flushed")
def redis_zincr_words(pipe, filename, minlength, maxlength):
"""Create news sorted set in redis.
:param minlength: -- (int) Minimum words length inserted
:param maxlength: -- (int) Maximum words length inserted
:param filename: -- The absolute path to the file.gz to process.
Representation of the set in redis:
+------------+------------+-----------+
| Keys | Members | Scores |
+============+============+===========+
| 20131001 | word1 | 142 |
+------------+------------+-----------+
| ... | word2 | 120 |
+------------+------------+-----------+
| 20131002 | ... | ... |
+------------+------------+-----------+
This function store all words between minlength and maxlength in redis.
Redis will count as well how much time each word will appear by day:
The cardinality.
"""
tokenizer = RegexpTokenizer('[\&\~\:\;\,\.\(\)\{\}\|\[\]\\\\/\-/\=\'\"\%\$\?\@\+\#\_\^\<\>\!\*\n\r\t\s]+', gaps = True, discard_empty = True)
with gzip.open(filename, 'rb') as F:
blob = TextBlob(clean(F.read()), tokenizer = tokenizer)
for word in blob.tokens:
if (len(word) >= minlength) and (len(word) <= maxlength):
pipe.zincrby(filename[-22:-12].replace('/',''), word, 1)
if (len(word) >= maxlength):
publisher.info("word bigger than {0} detected at {1}".format(maxlength, filename))
publisher.info(word)
pipe.execute()
def classify_token_paste(r_serv, listname, choicedatastruct, nb, r_set):
"""Tokenizing on word category
:param r_serv: -- Redis database connexion
:param listname: -- (str) path to the file containing the list of path of category files
:param choicedatastruct: -- (bool) Changing the index of datastructure
:param nb: -- (int) Number of pastes proceeded by function
Redis data structures cas be choose as follow:
+---------------+------------+-----------+
| Keys | Members | Scores |
+===============+============+===========+
| mails_categ | filename | 25000 |
+---------------+------------+-----------+
| ... | filename2 | 2400 |
+---------------+------------+-----------+
| web_categ | ... | ... |
+---------------+------------+-----------+
Or
+--------------+-------------+-----------+
| Keys | Members | Scores |
+==============+=============+===========+
| filename | mails_categ | 100000 |
+--------------+-------------+-----------+
| ... | web_categ | 24050 |
+--------------+-------------+-----------+
| filename2 | ... | ... |
+--------------+-------------+-----------+
This function tokenise on all special characters like: @^\|[{#~}]!:;$^=
And insert data in redis if the token match the keywords in a list previously
created.
These lists of keywords can be list of everything you want but it's better
to create "category" of keywords.
"""
try:
for n in xrange(0,nb):
filename = r_serv.lpop(r_set)
if filename != None:
tokenizer = RegexpTokenizer('[\&\~\:\;\,\.\(\)\{\}\|\[\]\\\\/\-/\=\'\"\%\$\?\@\+\#\_\^\<\>\!\*\n\r\t\s]+', gaps = True, discard_empty = True)
set_listof_pid(r_serv, filename, sys.argv[0])
with open(listname, 'rb') as L:
# for each "categ" listed in the file
for num, fname in enumerate(L):
# contain keywords by categ
tmp_list = []
#for each keywords
with open(fname[:-1], 'rb') as LS:
for num, kword in enumerate(LS):
tmp_list.append(kword[:-1])
# for each paste
with gzip.open(filename, 'rb') as F:
blob = TextBlob(clean(F.read()),
tokenizer = tokenizer)
# for each paste token
for word in blob.tokens.lower():
if word in tmp_list:
# choosing between two data structures.
if choicedatastruct:
r_serv.zincrby(filename,
fname.split('/')[-1][:-1],
1)
else:
r_serv.zincrby(fname.split('/')[-1][:-1],
filename,
1)
update_listof_pid(r_serv)
else:
publisher.debug("Empty list")
#r_serv.save()
break
except (KeyboardInterrupt, SystemExit) as e:
flush_list_of_pid(r_serv)
publisher.debug("Pid list flushed")
def dectect_longlines(r_serv, r_key, store = False, maxlength = 500):
"""Store longlines's linenumbers in redis
:param r_serv: -- The redis connexion database
:param r_key: -- (str) The key name in redis
:param store: -- (bool) Store the line numbers or not.
:param maxlength: -- The limit between "short lines" and "long lines"
This function connect to a redis list of filename (pastes filename);
Open the paste and check inside if there is some line with their
length >= to maxlength.
If yes, the paste is "tagged" as containing a longlines in another
redis structures, and the linenumber (of the long lines) can be stored
in addition if the argument store is at True.
"""
try:
while True:
#r_key_list (categ)
filename = r_serv.lpop(r_key)
if filename != None:
set_listof_pid(r_serv, filename, sys.argv[0])
# for each pastes
with gzip.open(filename, 'rb') as F:
var = True
for num, line in enumerate(F):
if len(line) >= maxlength:
#publisher.debug("Longline:{0}".format(line))
if var:
r_serv.rpush("longlines", filename)
var = False
if store:
r_serv.sadd(filename, num)
else:
publisher.debug("Line numbers of longlines not stored")
update_listof_pid(r_serv)
else:
publisher.debug("Empty list")
return False
break
except (KeyboardInterrupt, SystemExit) as e:
flush_list_of_pid(r_serv)
publisher.debug("Pid list flushed")
# NOT USED RIGHT NOW #
def recovering_longlines(r_serv):
"""Get longlines with linenumbers
"""
try:
for n in xrange(0,nb):
filename = r_serv.lpop("longlines")
if filename != None:
# For each values in redis (longline's line number)
for numline in r_serv.smembers(filename):
with gzip.open(filename,'rb') as F:
for num, line in enumerate(F):
#When corresponding.
if int(num) == int(numline):
pass
# TREATMENT
else:
publisher.debug("Empty list")
r_serv.save()
break
except (KeyboardInterrupt, SystemExit) as e:
flush_list_of_pid(r_serv)
publisher.debug("Pid list flushed")
def remove_longline_from_categ(r_serv, r_key, delete, store, maxlength):
"""Remove from a set, file with long lines.
:param r_serv: -- The redis connexion database
:param r_key: -- (str) The key name in redis
:param store: -- (bool) Store the line numbers or not.
:param delete: -- (bool) If true, delete the used key from redis.
:param maxlength: -- The limit between "short lines" and "long lines"
"""
publisher.info("Number of file before:{0}".format(r_serv.zcard(r_key)))
#Create a list of file to proceed (1)
for filename in r_serv.zrange(r_key, 0, -1):
r_serv.rpush(r_key+"_list", filename)
#detecting longlines in pastes
dectect_longlines(r_serv, r_key+"_list", store, maxlength)
#remove false positive members
while True:
fp_filename = r_serv.lpop("longlines")
if fp_filename == None:
break
else:
# if wanted, delete in addition the set with linenumbers (created with store)
if delete:
r_serv.zrem(r_key, fp_filename)
r_serv.delete(fp_filename)
else:
#remove the file with longline from the r_key zset.
r_serv.zrem(r_key, fp_filename)
publisher.info("Longline file removed from {0}, {1} Files remaining".format(r_key, r_serv.zcard(r_key)))
def detect_longline_from_list(r_serv, nb):
try:
for n in xrange(0,nb):
if not dectect_longlines(r_serv, "filelist", True):
break
except (KeyboardInterrupt, SystemExit) as e:
flush_list_of_pid(r_serv)
publisher.debug("Pid list flushed")
def create_dirfile(r_serv, directory, overwrite):
"""Create a file of path.
:param r_serv: -- connexion to redis database
:param directory: -- The folder where to launch the listing of the .gz files
This function create a list in redis with inside the absolute path
of all the pastes needed to be proceeded by function using parallel
(like redis_words_ranking)
"""
if overwrite:
r_serv.delete("filelist")
for x in listdirectory(directory):
r_serv.rpush("filelist",x)
publisher.info("The list was overwritten")
else:
if r_serv.llen("filelist") == 0:
for x in listdirectory(directory):
r_serv.rpush("filelist",x)
publisher.info("New list created")
else:
for x in listdirectory(directory):
r_serv.rpush("filelist",x)
publisher.info("The list was updated with new elements")
def redis_interbargraph_set(r_serv, year, month, overwrite):
"""Create a Redis sorted set.
:param r_serv: -- connexion to redis database
:param year: -- (integer) The year to process
:param month: -- (integer) The month to process
:param overwrite: -- (bool) trigger the overwrite mode
This function create inside redis the intersection of all days in
a month two by two.
Example:
For a month of 31days it will create 30 sorted set between day and
day+1 until the last day.
The overwrite mode delete the intersets and re-create them.
"""
a = date(year, month, 01)
b = date(year, month, cal.monthrange(year, month)[1])
if overwrite:
r_serv.delete("InterSet")
for dt in rrule(DAILY, dtstart = a, until = b - timedelta(1)):
dayafter = dt+timedelta(1)
r_serv.delete(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
r_serv.zinterstore(
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")),
{str(dt.strftime("%Y%m%d")):1,
str(dayafter.strftime("%Y%m%d")):-1})
r_serv.zadd(
"InterSet",
1,
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
else:
for dt in rrule(DAILY, dtstart = a, until = b - timedelta(1)):
dayafter = dt+timedelta(1)
if r_serv.zcard(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d"))) == 0:
r_serv.zinterstore(
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")),
{str(dt.strftime("%Y%m%d")):1,
str(dayafter.strftime("%Y%m%d")):-1})
r_serv.zadd(
"InterSet",
1,
str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d")))
publisher.info(str(dt.strftime("%Y%m%d"))+str(dayafter.strftime("%Y%m%d"))+" Intersection Created")
else:
publisher.warning("Data already exist, operation aborted.")
def word_bar_graph(r_serv, year, month, filename):
"""Create an histogram.
:param r_serv: -- connexion to redis database
:param year: -- (integer) The year to process
:param month: -- (integer) The month to process
:param filename: -- The absolute path where to save the figure.png
This function use matplotlib to create an histogram.
The redis database need obviously to be populated first
with functions: redis_words_ranking and redis_interbargraph_set.
"""
lw = []
adate = []
inter = [0]
rcParams['figure.figsize'] = 15, 10
a = date(year, month, 01)
b = date(year, month, cal.monthrange(year,month)[1])
for dt in rrule(DAILY, dtstart = a, until = b):
lw.append(r_serv.zcard(dt.strftime("%Y%m%d")))
adate.append(dt.strftime("%d"))
for x in r_serv.zrange("InterSet", 0, 31):
inter.append(r_serv.zcard(x))
n_groups = len(lw)
card_words = tuple(lw)
card_interword = tuple(inter)
index = np.arange(n_groups)
bar_width = 0.5
opacity = 0.6
words = plt.bar(index, card_words, bar_width,
alpha=opacity,
color='g',
label='Words/day')
lwords = plt.bar(index - 0.5, card_interword, bar_width,
alpha=opacity,
color='r',
label='Intersection')
plt.plot(tuple(inter), 'b--')
plt.xlabel(str(year)+'/'+str(month)+' Days')
plt.ylabel('Words')
plt.title('Words Cardinality & Intersection Histogram')
plt.xticks(index + bar_width/2 , tuple(adate))
plt.legend()
plt.grid()
plt.tight_layout()
plt.savefig(filename+".png", dpi=None, facecolor='w', edgecolor='b',
orientation='portrait', papertype=None, format="png",
transparent=False, bbox_inches=None, pad_inches=0.1,
frameon=True)
publisher.info(filename+".png"+" saved!")
def create_data_words_curve(r_serv, r_serv2, year, month, filename):
"""Create a Redis hashes.
:param r_serv: -- connexion to redis database (read)
:param r_serv2: -- connexion to redis database (write)
:param year: -- (integer) The year to process
:param month: -- (integer) The month to process
:param filename: -- the path to the file which contain a list of words.
The hashes of redis is created as follow:
+------------+------------+-----------+
| Keys | Field | Values |
+============+============+===========+
| word1 | 20131001 | 150 |
+------------+------------+-----------+
| ... | 20131002 | 145 |
+------------+------------+-----------+
| word2 | ... | ... |
+------------+------------+-----------+
The filename need to be a list of words separated by a carriage return
with an empty line at the end.
This function create datas which is used by the function
create_curve_with_word_file which create a csv file.
"""
stop = stopwords.words('english')
a = date(year, month, 01)
b = date(year, month, cal.monthrange(year,month)[1])
with open(filename, 'rb') as F:
for line in F:
for dt in rrule(DAILY, dtstart = a, until = b):
if r_serv.zscore(dt.strftime("%Y%m%d"), line[:-1]) is not None:
#tester si ca existe deja "en option" et ajouter un WARNING log
r_serv2.hmset(line[:-1], {str(dt.strftime("%Y%m%d")):r_serv.zscore(dt.strftime("%Y%m%d"), line[:-1])})
else:
pass
def create_curve_with_word_file(r_serv, csvfilename, feederfilename, year, month):
"""Create a csv file used with dygraph.
:param r_serv: -- connexion to redis database
:param csvfilename: -- the path to the .csv file created
:param feederfilename: -- the path to the file which contain a list of words.
:param year: -- (integer) The year to process
:param month: -- (integer) The month to process
This function create a .csv file using datas in redis.
It's checking if the words contained in feederfilename and
their respectives values by days exists. If these values are missing
(Word not present during a day) it's will automatically put a 0
to keep the timeline of the curve correct.
"""
a = date(year, month, 01)
b = date(year, month, cal.monthrange(year,month)[1])
days = {}
words = []
with open(feederfilename, 'rb') as F:
for word in F: # words of the files
words.append(word[:-1]) # list of words (sorted as in the file)
for dt in rrule(DAILY, dtstart = a, until = b): # for each days
mot = []
mot1 = []
mot2 = []
days[dt.strftime("%Y%m%d")] = ''
for word in sorted(words): # from the 1srt day to the last of the list
if r_serv.hexists(word, dt.strftime("%Y%m%d")): # if the word have a value for the day
mot1.append(str(word))
mot2.append(r_serv.hget(word, dt.strftime("%Y%m%d")))
mot = zip(mot1, mot2)
days[dt.strftime("%Y%m%d")] = mot
else:
mot1.append(str(word))
mot2.append(0)
mot = zip(mot1, mot2)
days[dt.strftime("%Y%m%d")] = mot
with open(csvfilename+".csv", 'wb') as F:
F.write("Date," + ",".join(sorted(words)) + '\n')
for x, s in days.items():
val = []
for y in s:
val.append(y[1])
F.write(x + ',' + str(val) + '\n')
with open(csvfilename+".csv", 'rb') as F:
h = F.read()
h = h.replace("[","")
h = h.replace("]","")
h = h.replace('\'',"")
with open(csvfilename+".csv", 'wb') as F:
F.write(h)