mirror of https://github.com/CIRCL/AIL-framework
197 lines
7.0 KiB
Python
197 lines
7.0 KiB
Python
|
#!/usr/bin/env python2
|
||
|
# -*-coding:UTF-8 -*
|
||
|
|
||
|
"""
|
||
|
The Duplicate module
|
||
|
====================
|
||
|
|
||
|
This huge module is, in short term, checking duplicates.
|
||
|
|
||
|
Requirements:
|
||
|
-------------
|
||
|
|
||
|
|
||
|
"""
|
||
|
import redis, zmq, ConfigParser, time, datetime, pprint, time, os
|
||
|
from packages import Paste as P
|
||
|
from packages import ZMQ_PubSub
|
||
|
from datetime import date
|
||
|
from pubsublogger import publisher
|
||
|
from pybloomfilter import BloomFilter
|
||
|
|
||
|
configfile = './packages/config.cfg'
|
||
|
|
||
|
def main():
|
||
|
"""Main Function"""
|
||
|
|
||
|
# CONFIG #
|
||
|
cfg = ConfigParser.ConfigParser()
|
||
|
cfg.read(configfile)
|
||
|
|
||
|
# REDIS #
|
||
|
# DB QUEUE ( MEMORY )
|
||
|
r_Q_serv = redis.StrictRedis(
|
||
|
host = cfg.get("Redis_Queues", "host"),
|
||
|
port = cfg.getint("Redis_Queues", "port"),
|
||
|
db = cfg.getint("Redis_Queues", "db"))
|
||
|
|
||
|
r_serv_merge = redis.StrictRedis(
|
||
|
host = cfg.get("Redis_Data_Merging", "host"),
|
||
|
port = cfg.getint("Redis_Data_Merging", "port"),
|
||
|
db = cfg.getint("Redis_Data_Merging", "db"))
|
||
|
|
||
|
|
||
|
# REDIS #
|
||
|
# DB OBJECT & HASHS ( DISK )
|
||
|
dico_redis = {}
|
||
|
for year in xrange(2013, 2015):
|
||
|
for month in xrange(0,16):
|
||
|
dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis(
|
||
|
host = cfg.get("Redis_Level_DB", "host"),
|
||
|
port = year,
|
||
|
db = month)
|
||
|
|
||
|
# LOGGING #
|
||
|
publisher.channel = "Script"
|
||
|
|
||
|
# ZMQ #
|
||
|
channel = cfg.get("PubSub_Global", "channel")
|
||
|
subscriber_name = "duplicate"
|
||
|
subscriber_config_section = "PubSub_Global"
|
||
|
|
||
|
Sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
||
|
|
||
|
# FUNCTIONS #
|
||
|
publisher.info("""Script duplicate subscribed to channel {0}""".format(cfg.get("PubSub_Global", "channel")))
|
||
|
|
||
|
set_limit = 100
|
||
|
|
||
|
while True:
|
||
|
try:
|
||
|
super_dico = {}
|
||
|
hash_dico = {}
|
||
|
dupl = []
|
||
|
nb_hash_current = 0
|
||
|
|
||
|
x = time.time()
|
||
|
|
||
|
message = Sub.get_msg_from_queue(r_Q_serv)
|
||
|
if message != None:
|
||
|
path = message.split(" ",-1)[-1]
|
||
|
PST = P.Paste(path)
|
||
|
else:
|
||
|
publisher.debug("Script Attribute is idling 10s")
|
||
|
time.sleep(10)
|
||
|
if r_Q_serv.sismember("SHUTDOWN_FLAGS", "Duplicate"):
|
||
|
r_Q_serv.srem("SHUTDOWN_FLAGS", "Duplicate")
|
||
|
print "Shutdown Flag Up: Terminating"
|
||
|
publisher.warning("Shutdown Flag Up: Terminating.")
|
||
|
break
|
||
|
continue
|
||
|
|
||
|
PST._set_p_hash_kind("md5")
|
||
|
|
||
|
#Assignate the correct redis connexion
|
||
|
r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month]
|
||
|
|
||
|
#Creating the bloom filter name: bloomyyyymm
|
||
|
bloomname = 'bloom' + PST.p_date.year + PST.p_date.month
|
||
|
|
||
|
bloompath = cfg.get("Directories", "bloomfilters")
|
||
|
|
||
|
filebloompath = bloompath + bloomname
|
||
|
|
||
|
#datetime.date(int(PST.p_date.year),int(PST.p_date.month),int(PST.p_date.day)).timetuple().tm_yday % 7
|
||
|
|
||
|
if os.path.exists(filebloompath):
|
||
|
bloom = BloomFilter.open(filebloompath)
|
||
|
else:
|
||
|
bloom = BloomFilter(100000000, 0.01, filebloompath)
|
||
|
r_Q_serv.sadd("bloomlist", filebloompath)
|
||
|
|
||
|
# UNIQUE INDEX HASHS TABLE
|
||
|
r_serv0 = dico_redis["201300"]
|
||
|
r_serv0.incr("current_index")
|
||
|
index = r_serv0.get("current_index")+str(PST.p_date)
|
||
|
# HASHTABLES PER MONTH (because of r_serv1 changing db)
|
||
|
r_serv1.set(index, PST.p_path)
|
||
|
r_serv1.sadd("INDEX", index)
|
||
|
|
||
|
#For each bloom filter
|
||
|
opened_bloom = []
|
||
|
for bloo in r_Q_serv.smembers("bloomlist"):
|
||
|
#Opening blooms
|
||
|
opened_bloom.append(BloomFilter.open(bloo))
|
||
|
|
||
|
# For each hash of the paste
|
||
|
for hash in PST._get_hash_lines(min = 5, start = 1, jump = 0):
|
||
|
nb_hash_current += 1
|
||
|
|
||
|
#Adding the hash in Redis & limiting the set
|
||
|
if r_serv1.scard(hash) <= set_limit:
|
||
|
r_serv1.sadd(hash, index)
|
||
|
r_serv1.sadd("HASHS", hash)
|
||
|
#Adding the hash in the bloom of the month
|
||
|
bloom.add(hash)
|
||
|
|
||
|
#Go throught the Database of the bloom filter (of the month)
|
||
|
for bloo in opened_bloom:
|
||
|
if hash in bloo:
|
||
|
db = bloo.name[-6:]
|
||
|
#Go throught the Database of the bloom filter (of the month)
|
||
|
r_serv_bloom = dico_redis[db]
|
||
|
|
||
|
#set of index paste: set([1,2,4,65])
|
||
|
hash_current = r_serv_bloom.smembers(hash)
|
||
|
#removing itself from the list
|
||
|
hash_current = hash_current - set([index])
|
||
|
|
||
|
# if the hash is present at least in 1 files (already processed)
|
||
|
if len(hash_current) != 0:
|
||
|
hash_dico[hash] = hash_current
|
||
|
|
||
|
#if there is data in this dictionnary
|
||
|
if len(hash_dico) != 0:
|
||
|
super_dico[index] = hash_dico
|
||
|
else:
|
||
|
# The hash is not in this bloom
|
||
|
pass
|
||
|
|
||
|
###########################################################################################
|
||
|
|
||
|
#if there is data in this dictionnary
|
||
|
if len(super_dico) != 0:
|
||
|
# current = current paste, phash_dico = {hash: set, ...}
|
||
|
occur_dico = {}
|
||
|
for current, phash_dico in super_dico.items():
|
||
|
nb_similar_hash = len(phash_dico)
|
||
|
# phash = hash, pset = set([ pastes ...])
|
||
|
for phash, pset in hash_dico.items():
|
||
|
|
||
|
for p_fname in pset:
|
||
|
occur_dico.setdefault(p_fname, 0)
|
||
|
# Count how much hash is similar per file occuring in the dictionnary
|
||
|
if occur_dico[p_fname] >= 0:
|
||
|
occur_dico[p_fname] = occur_dico[p_fname] + 1
|
||
|
|
||
|
for paste, count in occur_dico.items():
|
||
|
percentage = round((count/float(nb_hash_current))*100, 2)
|
||
|
if percentage >= 50:
|
||
|
dupl.append((paste, percentage))
|
||
|
|
||
|
# Creating the object attribute and save it.
|
||
|
if dupl != []:
|
||
|
PST.__setattr__("p_duplicate", dupl)
|
||
|
PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl)
|
||
|
|
||
|
y = time.time()
|
||
|
|
||
|
publisher.debug('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "Processed in "+str(y-x)+ " sec" ))
|
||
|
except IOError:
|
||
|
print "CRC Checksum Failed on :", PST.p_path
|
||
|
publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed" ))
|
||
|
pass
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|