AIL-framework/bin/CurveManageTopSets.py

167 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
This module manage top sets for terms frequency.
Every 'refresh_rate' update the weekly and monthly set
"""
import redis
import time
import datetime
import copy
from pubsublogger import publisher
from packages import lib_words
import datetime
import calendar
import os
import configparser
# Config Variables
Refresh_rate = 60*5 #sec
BlackListTermsSet_Name = "BlackListSetTermSet"
TrackedTermsSet_Name = "TrackedSetTermSet"
top_term_freq_max_set_cardinality = 20 # Max cardinality of the terms frequences set
oneDay = 60*60*24
num_day_month = 31
num_day_week = 7
top_termFreq_setName_day = ["TopTermFreq_set_day_", 1]
top_termFreq_setName_week = ["TopTermFreq_set_week", 7]
top_termFreq_setName_month = ["TopTermFreq_set_month", 31]
top_termFreq_set_array = [top_termFreq_setName_day,top_termFreq_setName_week, top_termFreq_setName_month]
def manage_top_set():
startDate = datetime.datetime.now()
startDate = startDate.replace(hour=0, minute=0, second=0, microsecond=0)
startDate = calendar.timegm(startDate.timetuple())
blacklist_size = int(server_term.scard(BlackListTermsSet_Name))
dico = {}
dico_per_paste = {}
# Retreive top data (max_card + blacklist_size) from days sets
for timestamp in range(startDate, startDate - top_termFreq_setName_month[1]*oneDay, -oneDay):
curr_set = top_termFreq_setName_day[0] + str(timestamp)
array_top_day = server_term.zrevrangebyscore(curr_set, '+inf', '-inf', withscores=True, start=0, num=top_term_freq_max_set_cardinality+blacklist_size)
array_top_day_per_paste = server_term.zrevrangebyscore("per_paste_" + curr_set, '+inf', '-inf', withscores=True, start=0, num=top_term_freq_max_set_cardinality+blacklist_size)
for word, value in array_top_day:
if word not in server_term.smembers(BlackListTermsSet_Name):
if word in dico.keys():
dico[word] += value
else:
dico[word] = value
for word, value in array_top_day_per_paste:
if word not in server_term.smembers(BlackListTermsSet_Name):
if word in dico_per_paste.keys():
dico_per_paste[word] += value
else:
dico_per_paste[word] = value
if timestamp == startDate - num_day_week*oneDay:
dico_week = copy.deepcopy(dico)
dico_week_per_paste = copy.deepcopy(dico_per_paste)
# convert dico into sorted array
array_month = []
for w, v in dico.items():
array_month.append((w, v))
array_month.sort(key=lambda tup: -tup[1])
array_month = array_month[0:20]
array_week = []
for w, v in dico_week.items():
array_week.append((w, v))
array_week.sort(key=lambda tup: -tup[1])
array_week = array_week[0:20]
# convert dico_per_paste into sorted array
array_month_per_paste = []
for w, v in dico_per_paste.items():
array_month_per_paste.append((w, v))
array_month_per_paste.sort(key=lambda tup: -tup[1])
array_month_per_paste = array_month_per_paste[0:20]
array_week_per_paste = []
for w, v in dico_week_per_paste.items():
array_week_per_paste.append((w, v))
array_week_per_paste.sort(key=lambda tup: -tup[1])
array_week_per_paste = array_week_per_paste[0:20]
# suppress every terms in top sets
for curr_set, curr_num_day in top_termFreq_set_array[1:3]:
for w in server_term.zrange(curr_set, 0, -1):
server_term.zrem(curr_set, w)
for w in server_term.zrange("per_paste_" + curr_set, 0, -1):
server_term.zrem("per_paste_" + curr_set, w)
# Add top term from sorted array in their respective sorted sets
for elem in array_week:
server_term.zadd(top_termFreq_setName_week[0], float(elem[1]), elem[0])
for elem in array_week_per_paste:
server_term.zadd("per_paste_" + top_termFreq_setName_week[0], float(elem[1]), elem[0])
for elem in array_month:
server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0])
for elem in array_month_per_paste:
server_term.zadd("per_paste_" + top_termFreq_setName_month[0], float(elem[1]), elem[0])
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
value = str(timestamp) + ", " + "-"
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
print("refreshed module")
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
cfg = configparser.ConfigParser()
cfg.read(configfile)
# For Module Manager
r_temp = redis.StrictRedis(
host=cfg.get('RedisPubSub', 'host'),
port=cfg.getint('RedisPubSub', 'port'),
db=cfg.getint('RedisPubSub', 'db'),
decode_responses=True)
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
value = str(timestamp) + ", " + "-"
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
r_temp.sadd("MODULE_TYPE_"+ "CurveManageTopSets" , str(os.getpid()))
server_term = redis.StrictRedis(
host=cfg.get("ARDB_TermFreq", "host"),
port=cfg.getint("ARDB_TermFreq", "port"),
db=cfg.getint("ARDB_TermFreq", "db"),
decode_responses=True)
publisher.info("Script Curve_manage_top_set started")
# Sent to the logging a description of the module
publisher.info("Manage the top sets with the data created by the module curve.")
manage_top_set()
while True:
# Get one message from the input queue (module only work if linked with a queue)
time.sleep(Refresh_rate) # sleep a long time then manage the set
manage_top_set()