AIL-framework/bin/Duplicate.py

165 lines
5.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python2
# -*-coding:UTF-8 -*
"""
The Duplicate module
====================
This huge module is, in short term, checking duplicates.
Requirements:
-------------
"""
2014-08-14 17:55:18 +02:00
import redis
import os
import time
from packages import Paste
from pubsublogger import publisher
from pybloomfilter import BloomFilter
from Helper import Process
2014-08-20 15:14:57 +02:00
if __name__ == "__main__":
publisher.port = 6380
2014-08-20 15:14:57 +02:00
publisher.channel = "Script"
2014-08-14 17:55:18 +02:00
config_section = 'Duplicates'
p = Process(config_section)
# REDIS #
# DB OBJECT & HASHS ( DISK )
2014-08-20 15:14:57 +02:00
# FIXME increase flexibility
dico_redis = {}
for year in xrange(2013, 2015):
2014-08-14 17:55:18 +02:00
for month in xrange(0, 16):
dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis(
host=p.config.get("Redis_Level_DB", "host"), port=year,
2014-08-14 17:55:18 +02:00
db=month)
# FUNCTIONS #
publisher.info("Script duplicate started")
set_limit = 100
bloompath = os.path.join(os.environ['AIL_HOME'],
p.config.get("Directories", "bloomfilters"))
2014-08-20 15:14:57 +02:00
bloop_path_set = set()
while True:
try:
super_dico = {}
hash_dico = {}
dupl = []
nb_hash_current = 0
x = time.time()
message = p.get_from_set()
2014-08-14 17:55:18 +02:00
if message is not None:
path = message
2014-08-14 17:55:18 +02:00
PST = Paste.Paste(path)
else:
publisher.debug("Script Attribute is idling 10s")
time.sleep(10)
continue
PST._set_p_hash_kind("md5")
2014-08-14 17:55:18 +02:00
# Assignate the correct redis connexion
r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month]
2014-08-14 17:55:18 +02:00
# Creating the bloom filter name: bloomyyyymm
2014-08-20 15:14:57 +02:00
filebloompath = os.path.join(bloompath, 'bloom' + PST.p_date.year +
PST.p_date.month)
if os.path.exists(filebloompath):
bloom = BloomFilter.open(filebloompath)
else:
bloom = BloomFilter(100000000, 0.01, filebloompath)
2014-08-20 15:14:57 +02:00
bloop_path_set.add(filebloompath)
# UNIQUE INDEX HASHS TABLE
r_serv0 = dico_redis["201300"]
r_serv0.incr("current_index")
index = r_serv0.get("current_index")+str(PST.p_date)
# HASHTABLES PER MONTH (because of r_serv1 changing db)
r_serv1.set(index, PST.p_path)
r_serv1.sadd("INDEX", index)
2014-08-14 17:55:18 +02:00
# For each bloom filter
opened_bloom = []
2014-08-20 15:14:57 +02:00
for bloo in bloop_path_set:
2014-08-14 17:55:18 +02:00
# Opening blooms
opened_bloom.append(BloomFilter.open(bloo))
# For each hash of the paste
2014-08-20 15:14:57 +02:00
for line_hash in PST._get_hash_lines(min=5, start=1, jump=0):
nb_hash_current += 1
2014-08-14 17:55:18 +02:00
# Adding the hash in Redis & limiting the set
2014-08-20 15:14:57 +02:00
if r_serv1.scard(line_hash) <= set_limit:
r_serv1.sadd(line_hash, index)
r_serv1.sadd("HASHS", line_hash)
2014-08-14 17:55:18 +02:00
# Adding the hash in the bloom of the month
2014-08-20 15:14:57 +02:00
bloom.add(line_hash)
2014-08-14 17:55:18 +02:00
# Go throught the Database of the bloom filter (of the month)
for bloo in opened_bloom:
2014-08-20 15:14:57 +02:00
if line_hash in bloo:
db = bloo.name[-6:]
2014-08-20 15:14:57 +02:00
# Go throught the Database of the bloom filter (month)
r_serv_bloom = dico_redis[db]
2014-08-14 17:55:18 +02:00
# set of index paste: set([1,2,4,65])
2014-08-20 15:14:57 +02:00
hash_current = r_serv_bloom.smembers(line_hash)
2014-08-14 17:55:18 +02:00
# removing itself from the list
hash_current = hash_current - set([index])
2014-08-20 15:14:57 +02:00
# if the hash is present at least in 1 files
# (already processed)
if len(hash_current) != 0:
2014-08-20 15:14:57 +02:00
hash_dico[line_hash] = hash_current
2014-08-14 17:55:18 +02:00
# if there is data in this dictionnary
if len(hash_dico) != 0:
super_dico[index] = hash_dico
2014-08-20 15:14:57 +02:00
###########################################################################
2014-08-14 17:55:18 +02:00
# if there is data in this dictionnary
if len(super_dico) != 0:
# current = current paste, phash_dico = {hash: set, ...}
occur_dico = {}
for current, phash_dico in super_dico.items():
# phash = hash, pset = set([ pastes ...])
for phash, pset in hash_dico.items():
for p_fname in pset:
occur_dico.setdefault(p_fname, 0)
2014-08-20 15:14:57 +02:00
# Count how much hash is similar per file occuring
# in the dictionnary
if occur_dico[p_fname] >= 0:
occur_dico[p_fname] = occur_dico[p_fname] + 1
for paste, count in occur_dico.items():
percentage = round((count/float(nb_hash_current))*100, 2)
if percentage >= 50:
dupl.append((paste, percentage))
# Creating the object attribute and save it.
2014-08-20 15:14:57 +02:00
to_print = 'Duplicate;{};{};{};'.format(
PST.p_source, PST.p_date, PST.p_name)
if dupl != []:
PST.__setattr__("p_duplicate", dupl)
2014-08-21 12:22:07 +02:00
PST.save_attribute_redis("p_duplicate", dupl)
2014-08-14 17:55:18 +02:00
publisher.info('{}Detected {}'.format(to_print, len(dupl)))
y = time.time()
2014-08-14 17:55:18 +02:00
publisher.debug('{}Processed in {} sec'.format(to_print, y-x))
except IOError:
print "CRC Checksum Failed on :", PST.p_path
2014-08-14 17:55:18 +02:00
publisher.error('{}CRC Checksum Failed'.format(to_print))