mirror of https://github.com/CIRCL/AIL-framework
227 lines
8.2 KiB
Python
Executable File
227 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
"""
|
|
The Mixer Module
|
|
================
|
|
|
|
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
|
|
|
|
This module take all the feeds provided in the config.
|
|
|
|
|
|
Depending on the configuration, this module will process the feed as follows:
|
|
operation_mode 1: "Avoid any duplicate from any sources"
|
|
- The module maintain a list of content for each item
|
|
- If the content is new, process it
|
|
- Else, do not process it but keep track for statistics on duplicate
|
|
|
|
DISABLED
|
|
operation_mode 2: "Keep duplicate coming from different sources"
|
|
- The module maintain a list of name given to the item by the feeder
|
|
- If the name has not yet been seen, process it
|
|
- Elseif, the saved content associated with the item is not the same, process it
|
|
- Else, do not process it but keep track for statistics on duplicate
|
|
|
|
operation_mode 3: "Don't look if duplicated content"
|
|
- SImply do not bother to check if it is a duplicate
|
|
- Simply do not bother to check if it is a duplicate
|
|
|
|
Note that the hash of the content is defined as the sha1(gzip64encoded).
|
|
|
|
"""
|
|
import os
|
|
import sys
|
|
|
|
import hashlib
|
|
import time
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
##################################
|
|
# Import Project packages
|
|
##################################
|
|
from modules.abstract_module import AbstractModule
|
|
from lib.ConfigLoader import ConfigLoader
|
|
|
|
|
|
class Mixer(AbstractModule):
|
|
"""docstring for Mixer module."""
|
|
|
|
def __init__(self):
|
|
super(Mixer, self).__init__()
|
|
|
|
config_loader = ConfigLoader()
|
|
self.r_cache = config_loader.get_redis_conn("Redis_Mixer_Cache")
|
|
# self.r_cache_s = config_loader.get_redis_conn("Redis_Log_submit")
|
|
|
|
self.pending_seconds = 5
|
|
|
|
self.refresh_time = 30
|
|
self.last_refresh = time.time()
|
|
|
|
self.operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode")
|
|
print(f'Operation mode {self.operation_mode}')
|
|
|
|
self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate")
|
|
self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
|
|
|
|
self.nb_processed_items = 0
|
|
self.feeders_processed = {}
|
|
self.feeders_duplicate = {}
|
|
|
|
self.logger.info(f"Module: {self.module_name} Launched")
|
|
|
|
# TODO Save stats in cache
|
|
# def get_feeders(self):
|
|
# return self.r_cache_s.smembers("mixer_cache:feeders")
|
|
#
|
|
# def get_feeder_nb_last_processed(self, feeder):
|
|
# nb = self.r_cache_s.hget("mixer_cache:feeders:last_processed", feeder)
|
|
# if nb:
|
|
# return int(nb)
|
|
# else:
|
|
# return 0
|
|
#
|
|
# def get_cache_feeders_nb_last_processed(self):
|
|
# feeders = {}
|
|
# for feeder in self.get_feeders():
|
|
# feeders[feeder] = self.get_feeder_nb_last_processed(feeder)
|
|
# return feeders
|
|
|
|
def clear_feeders_stat(self):
|
|
pass
|
|
# self.r_cache_s.delete("mixer_cache:feeders:last_processed")
|
|
|
|
def increase_stat_processed(self, feeder):
|
|
self.nb_processed_items += 1
|
|
try:
|
|
self.feeders_processed[feeder] += 1
|
|
except KeyError:
|
|
self.feeders_processed[feeder] = 1
|
|
|
|
def increase_stat_duplicate(self, feeder):
|
|
self.nb_processed_items += 1
|
|
try:
|
|
self.feeders_duplicate[feeder] += 1
|
|
except KeyError:
|
|
self.feeders_duplicate[feeder] = 1
|
|
|
|
# TODO Save stats in cache
|
|
def refresh_stats(self):
|
|
if int(time.time() - self.last_refresh) > self.refresh_time:
|
|
# update internal feeder
|
|
to_print = f'Mixer; ; ; ;mixer_all All_feeders Processed {self.nb_processed_items} item(s) in {self.refresh_time}sec'
|
|
print(to_print)
|
|
self.redis_logger.info(to_print)
|
|
self.nb_processed_items = 0
|
|
|
|
for feeder in self.feeders_processed:
|
|
to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Processed {self.feeders_processed[feeder]} item(s) in {self.refresh_time}sec'
|
|
print(to_print)
|
|
self.redis_logger.info(to_print)
|
|
self.feeders_processed[feeder] = 0
|
|
|
|
for feeder in self.feeders_duplicate:
|
|
to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Duplicated {self.feeders_duplicate[feeder]} item(s) in {self.refresh_time}sec'
|
|
print(to_print)
|
|
self.redis_logger.info(to_print)
|
|
self.feeders_duplicate[feeder] = 0
|
|
|
|
self.last_refresh = time.time()
|
|
self.clear_feeders_stat()
|
|
time.sleep(0.5)
|
|
|
|
def computeNone(self):
|
|
self.refresh_stats()
|
|
|
|
def compute(self, message):
|
|
self.refresh_stats()
|
|
# obj = self.obj
|
|
# TODO CHECK IF NOT self.object -> get object global ID from message
|
|
|
|
splitted = message.split()
|
|
# message -> feeder_name - content
|
|
# or message -> feeder_name
|
|
|
|
# feeder_name - object
|
|
if len(splitted) == 1: # feeder_name - object (content already saved)
|
|
feeder_name = message
|
|
gzip64encoded = None
|
|
|
|
# Feeder name in message: "feeder obj_id gzip64encoded"
|
|
elif len(splitted) == 2: # gzip64encoded content
|
|
feeder_name, gzip64encoded = splitted
|
|
else:
|
|
self.logger.warning(f'Invalid Message: {splitted} not processed')
|
|
return None
|
|
|
|
if self.obj.type == 'item':
|
|
# Remove ITEMS_FOLDER from item path (crawled item + submitted)
|
|
# Limit basename length
|
|
obj_id = self.obj.id
|
|
self.obj.sanitize_id()
|
|
if self.obj.id != obj_id:
|
|
self.queue.rename_message_obj(self.obj.id, obj_id)
|
|
|
|
|
|
relay_message = gzip64encoded
|
|
# print(relay_message)
|
|
|
|
# TODO only work for item object
|
|
# Avoid any duplicate coming from any sources
|
|
if self.operation_mode == 1:
|
|
digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
|
|
if self.r_cache.exists(digest): # Content already exists
|
|
# STATS
|
|
self.increase_stat_duplicate(feeder_name)
|
|
else: # New content
|
|
self.r_cache.sadd(digest, feeder_name)
|
|
self.r_cache.expire(digest, self.ttl_key)
|
|
|
|
self.increase_stat_processed(feeder_name)
|
|
self.add_message_to_queue(message=relay_message)
|
|
|
|
# Need To Be Fixed, Currently doesn't check the source (-> same as operation 1)
|
|
# # Keep duplicate coming from different sources
|
|
# elif self.operation_mode == 2:
|
|
# digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
|
|
# # Filter to avoid duplicate
|
|
# older_digest = self.r_cache.get(f'HASH_{item_id}')
|
|
# if older_digest is None:
|
|
# # New content
|
|
# # Store in redis for filtering
|
|
# self.r_cache.set(f'HASH_{item_id}', digest)
|
|
# self.r_cache.sadd(item_id, feeder_name)
|
|
# self.r_cache.expire(item_id, self.ttl_key)
|
|
# self.r_cache.expire(f'HASH_{item_id}', self.ttl_key)
|
|
#
|
|
# self.add_message_to_queue(relay_message)
|
|
#
|
|
# else:
|
|
# if digest != older_digest:
|
|
# # Same item name but different content
|
|
# # STATS
|
|
# self.increase_stat_duplicate(feeder_name)
|
|
# self.r_cache.sadd(item_id, feeder_name)
|
|
# self.r_cache.expire(item_id, ttl_key)
|
|
#
|
|
# self.add_message_to_queue(relay_message)
|
|
#
|
|
# else:
|
|
# # Already processed
|
|
# # Keep track of processed items
|
|
# # STATS
|
|
# self.increase_stat_duplicate(feeder_name)
|
|
|
|
# No Filtering
|
|
else:
|
|
self.increase_stat_processed(feeder_name)
|
|
if self.obj.type == 'item':
|
|
self.add_message_to_queue(obj=self.obj, message=gzip64encoded)
|
|
else:
|
|
self.add_message_to_queue(obj=self.obj, message=gzip64encoded)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
module = Mixer()
|
|
module.run()
|