From 4bbff479894c3f6648fe1f2a8aaeb33d80c495fc Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 14 May 2021 14:42:16 +0200 Subject: [PATCH] chg: [AIL items + Onion] create AIL item objects + Onion module refactor --- bin/Onion.py | 371 +++++++++++++--------------------- bin/lib/crawlers.py | 166 +++++++++++++-- bin/module/abstract_module.py | 23 ++- bin/packages/Item.py | 36 ++++ configs/core.cfg.sample | 1 + 5 files changed, 350 insertions(+), 247 deletions(-) diff --git a/bin/Onion.py b/bin/Onion.py index 2b6be55e..88ced41e 100755 --- a/bin/Onion.py +++ b/bin/Onion.py @@ -22,254 +22,165 @@ Requirements """ import time -from packages import Paste -from pubsublogger import publisher import datetime import os -import base64 -import subprocess -import redis -import signal +import sys import re -from pyfaup.faup import Faup +from module.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader +from lib import crawlers +from lib import regex_helper +from packages.Item import Item -from Helper import Process +## Manually fetch first page if crawler is disabled +# import base64 +# import subprocess +# +# torclient_host = '127.0.0.1' +# torclient_port = 9050 +# +# def fetch(p, r_cache, urls, domains): +# now = datetime.datetime.now() +# path = os.path.join('onions', str(now.year).zfill(4), +# str(now.month).zfill(2), +# str(now.day).zfill(2), +# str(int(time.mktime(now.utctimetuple())))) +# failed = [] +# downloaded = [] +# print('{} Urls to fetch'.format(len(urls))) +# for url, domain in zip(urls, domains): +# if r_cache.exists(url) or url in failed: +# continue +# to_fetch = base64.standard_b64encode(url.encode('utf8')) +# print('fetching url: {}'.format(to_fetch)) +# process = subprocess.Popen(["python", './tor_fetcher.py', to_fetch], +# stdout=subprocess.PIPE) +# while process.poll() is None: +# time.sleep(1) +# +# if process.returncode == 0: +# r_cache.setbit(url, 0, 1) +# r_cache.expire(url, 360000) +# downloaded.append(url) +# print('downloaded : {}'.format(downloaded)) +# '''tempfile = process.stdout.read().strip() +# tempfile = tempfile.decode('utf8') +# #with open(tempfile, 'r') as f: +# filename = path + domain + '.gz' +# fetched = f.read() +# content = base64.standard_b64decode(fetched) +# save_path = os.path.join(os.environ['AIL_HOME'], +# p.config.get("Directories", "pastes"), +# filename) +# dirname = os.path.dirname(save_path) +# if not os.path.exists(dirname): +# os.makedirs(dirname) +# with open(save_path, 'w') as ff: +# ff.write(content) +# p.populate_set_out(save_path, 'Global') +# p.populate_set_out(url, 'ValidOnion') +# p.populate_set_out(fetched, 'FetchedOnion')''' +# yield url +# #os.unlink(tempfile) +# else: +# r_cache.setbit(url, 0, 0) +# r_cache.expire(url, 3600) +# failed.append(url) +# print('Failed at downloading', url) +# print(process.stdout.read()) +# print('Failed:', len(failed), 'Downloaded:', len(downloaded)) -class TimeoutException(Exception): - pass -def timeout_handler(signum, frame): - raise TimeoutException +class Onion(AbstractModule): + """docstring for Onion module.""" -signal.signal(signal.SIGALRM, timeout_handler) + def __init__(self): + super(Onion, self).__init__() -def fetch(p, r_cache, urls, domains, path): - failed = [] - downloaded = [] - print('{} Urls to fetch'.format(len(urls))) - for url, domain in zip(urls, domains): - if r_cache.exists(url) or url in failed: - continue - to_fetch = base64.standard_b64encode(url.encode('utf8')) - print('fetching url: {}'.format(to_fetch)) - process = subprocess.Popen(["python", './tor_fetcher.py', to_fetch], - stdout=subprocess.PIPE) - while process.poll() is None: - time.sleep(1) + config_loader = ConfigLoader() + self.r_cache = config_loader.get_redis_conn("Redis_Cache") + self.r_onion = config_loader.get_redis_conn("ARDB_Onion") - if process.returncode == 0: - r_cache.setbit(url, 0, 1) - r_cache.expire(url, 360000) - downloaded.append(url) - print('downloaded : {}'.format(downloaded)) - '''tempfile = process.stdout.read().strip() - tempfile = tempfile.decode('utf8') - #with open(tempfile, 'r') as f: - filename = path + domain + '.gz' - fetched = f.read() - content = base64.standard_b64decode(fetched) - save_path = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "pastes"), - filename) - dirname = os.path.dirname(save_path) - if not os.path.exists(dirname): - os.makedirs(dirname) - with open(save_path, 'w') as ff: - ff.write(content) - p.populate_set_out(save_path, 'Global') - p.populate_set_out(url, 'ValidOnion') - p.populate_set_out(fetched, 'FetchedOnion')''' - yield url - #os.unlink(tempfile) + self.pending_seconds = config_loader.get_config_int("Onion", "max_execution_time") + # regex timeout + self.regex_timeout = 30 + + self.faup = crawlers.get_faup() + self.redis_cache_key = regex_helper.generate_redis_cache_key(self.module_name) + + # activate_crawler = p.config.get("Crawler", "activate_crawler") + + + self.url_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)" + self.i2p_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.i2p)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)" + re.compile(self.url_regex) + re.compile(self.i2p_regex) + + self.redis_logger.info(f"Module: {self.module_name} Launched") + + # TEMP var: SAVE I2P Domain (future I2P crawler) + self.save_i2p = config_loader.get_config_boolean("Onion", "save_i2p") + + def compute(self, message): + # list of tuples: (url, subdomains, domain) + urls_to_crawl = [] + + print(message) + id, score = message.split() + item = Item(id) + item_content = item.get_content() + item_content = 'http://33333333.kingdom7rv6wkfzn.onion?sdsd=ooooo http://2222222.kingdom7rv6wkfzn.onion' + + # max execution time on regex + res = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.url_regex, item.get_id(), item_content) + for x in res: + # String to tuple + x = x[2:-2].replace(" '", "").split("',") + url = x[0] + subdomain = x[4].lower() + self.faup.decode(url) + url_unpack = self.faup.get() + try: ## TODO: # FIXME: check faup version + domain = url_unpack['domain'].decode().lower() + except Exception as e: + domain = url_unpack['domain'].lower() + print('----') + print(url) + print(subdomain) + print(domain) + + if crawlers.is_valid_onion_domain(domain): + urls_to_crawl.append((url, subdomain, domain)) + + to_print = f'Onion;{item.get_source()};{item.get_date()};{item.get_basename()};' + if not urls_to_crawl: + self.redis_logger.info(f'{to_print}Onion related;{item.get_id()}') + return + + # TAG Item + msg = f'infoleak:automatic-detection="onion";{item.get_id()}' + self.send_message_to_queue('Tags', msg) + + if crawlers.is_crawler_activated(): + for to_crawl in urls_to_crawl: + crawlers.add_item_to_discovery_queue('onion', to_crawl[2], to_crawl[1], to_crawl[0], item.get_id()) else: - r_cache.setbit(url, 0, 0) - r_cache.expire(url, 3600) - failed.append(url) - print('Failed at downloading', url) - print(process.stdout.read()) - print('Failed:', len(failed), 'Downloaded:', len(downloaded)) - + self.redis_logger.warning(f'{to_print}Detected {len(urls_to_crawl)} .onion(s);{item.get_id()}') + # keep manual fetcher ???? + ## Manually fetch first page if crawler is disabled + # for url in fetch(p, r_cache, urls, domains_list): + # publisher.info('{}Checked {};{}'.format(to_print, url, PST.p_rel_path)) if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - torclient_host = '127.0.0.1' - torclient_port = 9050 - - config_section = 'Onion' - - p = Process(config_section) - r_cache = redis.StrictRedis( - host=p.config.get("Redis_Cache", "host"), - port=p.config.getint("Redis_Cache", "port"), - db=p.config.getint("Redis_Cache", "db"), - decode_responses=True) - - r_onion = redis.StrictRedis( - host=p.config.get("ARDB_Onion", "host"), - port=p.config.getint("ARDB_Onion", "port"), - db=p.config.getint("ARDB_Onion", "db"), - decode_responses=True) - - # FUNCTIONS # - publisher.info("Script subscribed to channel onion_categ") - - # FIXME For retro compatibility - channel = 'onion_categ' - - # Getting the first message from redis. - message = p.get_from_set() - prec_filename = None - - max_execution_time = p.config.getint("Onion", "max_execution_time") - - # send to crawler: - activate_crawler = p.config.get("Crawler", "activate_crawler") - if activate_crawler == 'True': - activate_crawler = True - print('Crawler enabled') - else: - activate_crawler = False - print('Crawler disabled') - - faup = Faup() - - # Thanks to Faup project for this regex - # https://github.com/stricaud/faup - url_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)" - i2p_regex = "((http|https|ftp)?(?:\://)?([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.i2p)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)" - re.compile(url_regex) + module = Onion() + module.run() - while True: - message = p.get_from_set() - if message is not None: - print(message) - filename, score = message.split() - # "For each new paste" - if prec_filename is None or filename != prec_filename: - domains_list = [] - urls = [] - PST = Paste.Paste(filename) - # max execution time on regex - signal.alarm(max_execution_time) - try: - for x in PST.get_regex(url_regex): - print(x) - # Extracting url with regex - url, s, credential, subdomain, domain, host, port, \ - resource_path, query_string, f1, f2, f3, f4 = x - if '.onion' in url: - print(url) - domains_list.append(domain) - urls.append(url) - except TimeoutException: - encoded_list = [] - p.incr_module_timeout_statistic() - print ("{0} processing timeout".format(PST.p_rel_path)) - continue - signal.alarm(0) - ''' - for x in PST.get_regex(i2p_regex): - # Extracting url with regex - url, s, credential, subdomain, domain, host, port, \ - resource_path, query_string, f1, f2, f3, f4 = x - - if '.i2p' in url: - print('add i2p') - print(domain) - if not r_onion.sismember('i2p_domain', domain) and not r_onion.sismember('i2p_domain_crawler_queue', domain): - r_onion.sadd('i2p_domain', domain) - r_onion.sadd('i2p_link', url) - r_onion.sadd('i2p_domain_crawler_queue', domain) - msg = '{};{}'.format(url,PST.p_rel_path) - r_onion.sadd('i2p_crawler_queue', msg) - ''' - - to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, - PST.p_name) - - print(len(domains_list)) - if len(domains_list) > 0: - - if not activate_crawler: - publisher.warning('{}Detected {} .onion(s);{}'.format( - to_print, len(domains_list),PST.p_rel_path)) - else: - publisher.info('{}Detected {} .onion(s);{}'.format( - to_print, len(domains_list),PST.p_rel_path)) - now = datetime.datetime.now() - path = os.path.join('onions', str(now.year).zfill(4), - str(now.month).zfill(2), - str(now.day).zfill(2), - str(int(time.mktime(now.utctimetuple())))) - to_print = 'Onion;{};{};{};'.format(PST.p_source, - PST.p_date, - PST.p_name) - - if activate_crawler: - date_month = datetime.datetime.now().strftime("%Y%m") - date = datetime.datetime.now().strftime("%Y%m%d") - for url in urls: - - faup.decode(url) - url_unpack = faup.get() - ## TODO: # FIXME: remove me - try: - domain = url_unpack['domain'].decode().lower() - except Exception as e: - domain = url_unpack['domain'].lower() - - ## TODO: blackilst by port ? - # check blacklist - if r_onion.sismember('blacklist_onion', domain): - continue - - subdomain = re.findall(url_regex, url) - if len(subdomain) > 0: - subdomain = subdomain[0][4].lower() - else: - continue - - # too many subdomain - if len(subdomain.split('.')) > 3: - subdomain = '{}.{}.onion'.format(subdomain[-3], subdomain[-2]) - - if not r_onion.sismember('month_onion_up:{}'.format(date_month), subdomain) and not r_onion.sismember('onion_down:'+date , subdomain): - if not r_onion.sismember('onion_domain_crawler_queue', subdomain): - print('send to onion crawler') - r_onion.sadd('onion_domain_crawler_queue', subdomain) - msg = '{};{}'.format(url,PST.p_rel_path) - if not r_onion.hexists('onion_metadata:{}'.format(subdomain), 'first_seen'): - r_onion.sadd('onion_crawler_discovery_queue', msg) - print('send to priority queue') - else: - r_onion.sadd('onion_crawler_queue', msg) - # tag if domain was up - if r_onion.sismember('full_onion_up', subdomain): - # TAG Item - msg = 'infoleak:automatic-detection="onion";{}'.format(PST.p_rel_path) - p.populate_set_out(msg, 'Tags') - - else: - for url in fetch(p, r_cache, urls, domains_list, path): - publisher.info('{}Checked {};{}'.format(to_print, url, PST.p_rel_path)) - - # TAG Item - msg = 'infoleak:automatic-detection="onion";{}'.format(PST.p_rel_path) - p.populate_set_out(msg, 'Tags') - else: - publisher.info('{}Onion related;{}'.format(to_print, PST.p_rel_path)) - - prec_filename = filename - else: - publisher.debug("Script url is Idling 10s") - #print('Sleeping') - time.sleep(10) +########################## diff --git a/bin/lib/crawlers.py b/bin/lib/crawlers.py index 69cce642..ee0b7379 100755 --- a/bin/lib/crawlers.py +++ b/bin/lib/crawlers.py @@ -19,7 +19,8 @@ import uuid import subprocess from datetime import datetime, timedelta -from urllib.parse import urlparse +from urllib.parse import urlparse, urljoin +from bs4 import BeautifulSoup from pyfaup.faup import Faup @@ -41,6 +42,7 @@ r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") r_serv_onion = config_loader.get_redis_conn("ARDB_Onion") r_cache = config_loader.get_redis_conn("Redis_Cache") PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) +activate_crawler = config_loader.get_config_str("Crawler", "activate_crawler") config_loader = None faup = Faup() @@ -76,6 +78,64 @@ def is_valid_onion_domain(domain): def get_faup(): return faup +# # # # # # # # +# # +# FAVICON # +# # +# # # # # # # # + +def get_favicon_from_html(html, domain, url): + favicon_urls = extract_favicon_from_html(html, url) + # add root favicom + if not favicon_urls: + favicon_urls.add(f'{urlparse(url).scheme}://{domain}/favicon.ico') + print(favicon_urls) + return favicon_urls + +def extract_favicon_from_html(html, url): + favicon_urls = set() + soup = BeautifulSoup(html, 'html.parser') + set_icons = set() + # If there are multiple s, the browser uses their media, + # type, and sizes attributes to select the most appropriate icon. + # If several icons are equally appropriate, the last one is used. + # If the most appropriate icon is later found to be inappropriate, + # for example because it uses an unsupported format, + # the browser proceeds to the next-most appropriate, and so on. + # # DEBUG: /!\ firefox load all favicon ??? + + # iOS Safari 'apple-touch-icon' + # Safari pinned tabs 'mask-icon' + # Android Chrome 'manifest' + # Edge and IE 12: + # - + # - + + # desktop browser 'shortcut icon' (older browser), 'icon' + for favicon_tag in ['icon', 'shortcut icon']: + if soup.head: + for icon in soup.head.find_all('link', attrs={'rel': lambda x : x and x.lower() == favicon_tag, 'href': True}): + set_icons.add(icon) + + # # TODO: handle base64 favicon + for tag in set_icons: + icon_url = tag.get('href') + if icon_url: + if icon_url.startswith('//'): + icon_url = icon_url.replace('//', '/') + if icon_url.startswith('data:'): + # # TODO: handle base64 favicon + pass + else: + icon_url = urljoin(url, icon_url) + icon_url = urlparse(icon_url, scheme=urlparse(url).scheme).geturl() + favicon_urls.add(icon_url) + return favicon_urls + + +# # # - - # # # + + ################################################################################ # # TODO: handle prefix cookies @@ -412,6 +472,13 @@ def api_create_cookie(user_id, cookiejar_uuid, cookie_dict): #### CRAWLER GLOBAL #### +## TODO: # FIXME: config db, dynamic load +def is_crawler_activated(): + return activate_crawler == 'True' + +def get_crawler_all_types(): + return ['onion', 'regular'] + def get_all_spash_crawler_status(): crawler_metadata = [] all_crawlers = r_cache.smembers('all_splash_crawlers') @@ -741,6 +808,83 @@ def api_add_crawled_item(dict_crawled): create_item_metadata(item_id, domain, 'last_url', port, 'father') #### CRAWLER QUEUES #### + +## queues priority: +# 1 - priority queue +# 2 - discovery queue +# 3 - default queue +## +def get_all_queues_names(): + return ['priority', 'discovery', 'default'] + +def get_all_queues_keys(): + return ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue'] + +def get_queue_key_by_name(queue_name): + if queue_name == 'priority': + return '{}_crawler_priority_queue' + elif queue_name == 'discovery': + return '{}_crawler_discovery_queue' + else: # default + return '{}_crawler_queue' + +def get_stats_elem_to_crawl_by_queue_type(queue_type): + dict_stats = {} + for queue_name in get_all_queues_names(): + dict_stats[queue_name] = r_serv_onion.scard(get_queue_key_by_name(queue_name).format(queue_type)) + return dict_stats + +def get_all_queues_stats(): + print(get_all_crawlers_queues_types()) + dict_stats = {} + for queue_type in get_crawler_all_types(): + dict_stats[queue_type] = get_stats_elem_to_crawl_by_queue_type(queue_type) + for queue_type in get_all_splash(): + dict_stats[queue_type] = get_stats_elem_to_crawl_by_queue_type(queue_type) + return dict_stats + +def add_item_to_discovery_queue(queue_type, domain, subdomain, url, item_id): + date_month = datetime.now().strftime("%Y%m") + date = datetime.now().strftime("%Y%m%d") + + # check blacklist + if r_serv_onion.sismember(f'blacklist_{queue_type}', domain): + return + + # too many subdomain # # FIXME: move to crawler module ? + if len(subdomain.split('.')) > 3: + subdomain = f'{subdomain[-3]}.{subdomain[-2]}.{queue_type}' + + if not r_serv_onion.sismember(f'month_{queue_type}_up:{date_month}', subdomain) and not r_serv_onion.sismember(f'{queue_type}_down:{date}' , subdomain): + if not r_serv_onion.sismember(f'{queue_type}_domain_crawler_queue', subdomain): + r_serv_onion.sadd(f'{queue_type}_domain_crawler_queue', subdomain) + msg = f'{url};{item_id}' + # First time we see this domain => Add to discovery queue (priority=2) + if not r_serv_onion.hexists(f'{queue_type}_metadata:{subdomain}', 'first_seen'): + r_serv_onion.sadd(f'{queue_type}_crawler_discovery_queue', msg) + print(f'sent to priority queue: {subdomain}') + # Add to default queue (priority=3) + else: + r_serv_onion.sadd(f'{queue_type}_crawler_queue', msg) + print(f'sent to queue: {subdomain}') + +def remove_task_from_crawler_queue(queue_name, queue_type, key_to_remove): + r_serv_onion.srem(queue_name.format(queue_type), key_to_remove) + +# # TODO: keep auto crawler ? +def clear_crawler_queues(): + for queue_key in get_all_queues_keys(): + for queue_type in get_crawler_all_types(): + r_serv_onion.delete(queue_key.format(queue_type)) + +################################################################################### +def get_nb_elem_to_crawl_by_type(queue_type): # # TODO: rename me + nb = r_serv_onion.scard('{}_crawler_priority_queue'.format(queue_type)) + nb += r_serv_onion.scard('{}_crawler_discovery_queue'.format(queue_type)) + nb += r_serv_onion.scard('{}_crawler_queue'.format(queue_type)) + return nb +################################################################################### + def get_all_crawlers_queues_types(): all_queues_types = set() all_splash_name = get_all_crawlers_to_launch_splash_name() @@ -782,9 +926,8 @@ def get_elem_to_crawl_by_queue_type(l_queue_type): # 2 - discovery queue # 3 - normal queue ## - all_queue_key = ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue'] - for queue_key in all_queue_key: + for queue_key in get_all_queues_keys(): for queue_type in l_queue_type: message = r_serv_onion.spop(queue_key.format(queue_type)) if message: @@ -801,12 +944,6 @@ def get_elem_to_crawl_by_queue_type(l_queue_type): return {'url': url, 'paste': item_id, 'type_service': crawler_type, 'queue_type': queue_type, 'original_message': message} return None -def get_nb_elem_to_crawl_by_type(queue_type): - nb = r_serv_onion.scard('{}_crawler_priority_queue'.format(queue_type)) - nb += r_serv_onion.scard('{}_crawler_discovery_queue'.format(queue_type)) - nb += r_serv_onion.scard('{}_crawler_queue'.format(queue_type)) - return nb - #### ---- #### # # # # # # # # # # # # @@ -1281,8 +1418,9 @@ def test_ail_crawlers(): #### ---- #### if __name__ == '__main__': - res = get_splash_manager_version() - res = test_ail_crawlers() - res = is_test_ail_crawlers_successful() - print(res) - print(get_test_ail_crawlers_message()) + # res = get_splash_manager_version() + # res = test_ail_crawlers() + # res = is_test_ail_crawlers_successful() + # print(res) + # print(get_test_ail_crawlers_message()) + #print(get_all_queues_stats()) diff --git a/bin/module/abstract_module.py b/bin/module/abstract_module.py index 7c853af6..60f9151e 100644 --- a/bin/module/abstract_module.py +++ b/bin/module/abstract_module.py @@ -15,7 +15,6 @@ import time from pubsublogger import publisher from Helper import Process - class AbstractModule(ABC): """ Abstract Module class @@ -38,6 +37,7 @@ class AbstractModule(ABC): self.redis_logger.port = 6380 # Channel name to publish logs self.redis_logger.channel = 'Script' + # # TODO: refactor logging # TODO modify generic channel Script to a namespaced channel like: # publish module logs to script: channel # self.redis_logger.channel = 'script:%s'%(self.module_name) @@ -51,6 +51,23 @@ class AbstractModule(ABC): # Setup the I/O queues self.process = Process(self.queue_name) + def get_message(self): + """ + Get message from the Redis Queue (QueueIn) + Input message can change between modules + ex: '' + """ + return self.process.get_from_set() + + def send_message_to_queue(self, queue_name, message): + """ + Send message to queue + :param queue_name: queue or module name + :param message: message to send in queue + + ex: send_to_queue(item_id, 'Global') + """ + self.process.populate_set_out(message, queue_name) def run(self): """ @@ -59,8 +76,8 @@ class AbstractModule(ABC): # Endless loop processing messages from the input queue while self.proceed: - # Get one message (paste) from the QueueIn (copy of Redis_Global publish) - message = self.process.get_from_set() + # Get one message (ex:item id) from the Redis Queue (QueueIn) + message = self.get_message() if message is None: self.computeNone() diff --git a/bin/packages/Item.py b/bin/packages/Item.py index 36a236e0..eb4939ca 100755 --- a/bin/packages/Item.py +++ b/bin/packages/Item.py @@ -25,6 +25,7 @@ import Decoded import Screenshot import Username +from ail_objects import AbstractObject from item_basic import * config_loader = ConfigLoader.ConfigLoader() @@ -549,7 +550,42 @@ def delete_domain_node(item_id): for child_id in get_all_domain_node_by_item_id(item_id): delete_item(child_id) + +class Item(AbstractObject): + """ + AIL Item Object. (strings) + """ + + def __init__(self, id): + super(Item, self).__init__('item', id) + + def get_date(self, separator=False): + """ + Returns Item date + """ + return item_basic.get_item_date(self.id, add_separator=separator) + + def get_source(self): + """ + Returns Item source/feeder name + """ + return item_basic.get_source(self.id) + + def get_basename(self): + return os.path.basename(self.id) + + def get_content(self): + """ + Returns Item content + """ + return item_basic.get_item_content(self.id) + # if __name__ == '__main__': +# +# item = Item('') +# res = item.get_date(separator=True) +# print(res) + # import Domain # domain = Domain.Domain('domain.onion') # for domain_history in domain.get_domain_history(): diff --git a/configs/core.cfg.sample b/configs/core.cfg.sample index df6fed66..dad35ce4 100644 --- a/configs/core.cfg.sample +++ b/configs/core.cfg.sample @@ -77,6 +77,7 @@ minTopPassList=5 max_execution_time = 90 [Onion] +save_i2p = False max_execution_time = 180 [PgpDump]