From c31aae4efc6c041d0a83e58486a4bc271206bb97 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 24 Jul 2020 08:54:54 +0200 Subject: [PATCH] chg: [crawler] crawler queue + restart docker on error --- bin/Crawler.py | 75 ++++++++------------------- bin/core/Crawler_manager.py | 36 +++++++++---- bin/lib/crawlers.py | 65 +++++++++++++++++++++++ etc/splash/proxy-profiles/default.ini | 4 -- 4 files changed, 112 insertions(+), 68 deletions(-) delete mode 100644 etc/splash/proxy-profiles/default.ini diff --git a/bin/Crawler.py b/bin/Crawler.py index a06b4698..c34f6f80 100755 --- a/bin/Crawler.py +++ b/bin/Crawler.py @@ -19,6 +19,9 @@ sys.path.append(os.environ['AIL_BIN']) from Helper import Process from pubsublogger import publisher +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) +import crawlers + # ======== FUNCTIONS ======== def load_blacklist(service_type): @@ -117,43 +120,6 @@ def unpack_url(url): return to_crawl -# get url, paste and service_type to crawl -def get_elem_to_crawl(rotation_mode): - message = None - domain_service_type = None - - #load_priority_queue - for service_type in rotation_mode: - message = redis_crawler.spop('{}_crawler_priority_queue'.format(service_type)) - if message is not None: - domain_service_type = service_type - break - #load_discovery_queue - if message is None: - for service_type in rotation_mode: - message = redis_crawler.spop('{}_crawler_discovery_queue'.format(service_type)) - if message is not None: - domain_service_type = service_type - break - #load_normal_queue - if message is None: - for service_type in rotation_mode: - message = redis_crawler.spop('{}_crawler_queue'.format(service_type)) - if message is not None: - domain_service_type = service_type - break - - if message: - splitted = message.rsplit(';', 1) - if len(splitted) == 2: - url, paste = splitted - if paste: - paste = paste.replace(PASTES_FOLDER+'/', '') - - message = {'url': url, 'paste': paste, 'type_service': domain_service_type, 'original_message': message} - - return message - def get_crawler_config(redis_server, mode, service_type, domain, url=None): crawler_options = {} if mode=='auto': @@ -237,6 +203,9 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config): # TODO: relaunch docker or send error message nb_retry += 1 + if nb_retry == 2: + crawlers.restart_splash_docker(splash_url) + if nb_retry == 6: on_error_send_message_back_in_queue(type_service, domain, message) publisher.error('{} SPASH DOWN'.format(splash_url)) @@ -304,11 +273,23 @@ def search_potential_source_domain(type_service, domain): if __name__ == '__main__': - if len(sys.argv) != 2 and len(sys.argv) != 3: - print('usage:', 'Crawler.py', 'splash_port') - print('usage:', 'Crawler.py', 'splash_name', 'splash_url') + if len(sys.argv) != 2: + print('usage:', 'Crawler.py', 'splash_url') exit(1) ################################################## + splash_url = sys.argv[1] + + splash_name = crawlers.get_splash_name_by_url(splash_url) + crawler_type = crawlers.get_splash_crawler_type(splash_name) + + print(splash_name) + print(crawler_type) + + #rotation_mode = deque(['onion', 'regular']) + rotation_mode = deque(crawlers.get_crawler_queue_type_by_proxy(splash_name, crawler_type)) + + default_proto_map = {'http': 80, 'https': 443} +######################################################## add ftp ??? publisher.port = 6380 publisher.channel = "Script" @@ -318,20 +299,8 @@ if __name__ == '__main__': # Setup the I/O queues p = Process(config_section) - if len(sys.argv) == 2: - splash_port = sys.argv[1] - splash_url = '{}:{}'.format( p.config.get("Crawler", "splash_url"), splash_port) - else: - splash_name = sys.argv[1] - splash_url = sys.argv[2] - print(splash_name) - print('splash url: {}'.format(splash_url)) - rotation_mode = deque(['onion', 'regular']) - default_proto_map = {'http': 80, 'https': 443} -######################################################## add ftp ??? - PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], p.config.get("Directories", "pastes")) r_serv_metadata = redis.StrictRedis( @@ -391,7 +360,7 @@ if __name__ == '__main__': update_auto_crawler() rotation_mode.rotate() - to_crawl = get_elem_to_crawl(rotation_mode) + to_crawl = crawlers.get_elem_to_crawl_by_queue_type(rotation_mode) if to_crawl: url_data = unpack_url(to_crawl['url']) # remove domain from queue diff --git a/bin/core/Crawler_manager.py b/bin/core/Crawler_manager.py index 3b64ae97..a5ac7dd6 100755 --- a/bin/core/Crawler_manager.py +++ b/bin/core/Crawler_manager.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* -import json import os import sys +import time sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) import ConfigLoader @@ -21,15 +21,7 @@ config_loader = None import screen -if __name__ == '__main__': - - if not crawlers.ping_splash_manager(): - print('Error, Can\'t cnnect to Splash manager') - - crawlers.reload_splash_and_proxies_list() - - # # TODO: handle mutltiple splash_manager - +def launch_crawlers(): for crawler_splash in crawlers_to_launch: splash_name = crawler_splash[0] nb_crawlers = int(crawler_splash[1]) @@ -44,4 +36,26 @@ if __name__ == '__main__': splash_url = all_crawler_urls[i] print(all_crawler_urls[i]) - crawlers.launch_ail_splash_crawler('http://127.0.0.1:8054', script_options='{} {}'.format(splash_name, splash_url)) + crawlers.launch_ail_splash_crawler(splash_url, script_options='{}'.format(splash_url)) + +# # TODO: handle mutltiple splash_manager +if __name__ == '__main__': + + if not crawlers.ping_splash_manager(): + print('Error, Can\'t cnnect to Splash manager') + + crawlers.reload_splash_and_proxies_list() + launch_crawlers() + last_refresh = time.time() + + while True: + + + # refresh splash and proxy list + if False: + crawlers.reload_splash_and_proxies_list() + print('list of splash and proxies refreshed') + else: + time.sleep(10) + + # # TODO: handle mutltiple splash_manager diff --git a/bin/lib/crawlers.py b/bin/lib/crawlers.py index bb072068..06399658 100755 --- a/bin/lib/crawlers.py +++ b/bin/lib/crawlers.py @@ -34,6 +34,7 @@ config_loader = ConfigLoader.ConfigLoader() r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") r_serv_onion = config_loader.get_redis_conn("ARDB_Onion") r_cache = config_loader.get_redis_conn("Redis_Cache") +PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) config_loader = None # load crawler config @@ -545,6 +546,48 @@ def save_har(har_dir, item_id, har_content): with open(filename, 'w') as f: f.write(json.dumps(har_content)) +#### CRAWLER QUEUES #### +def get_crawler_queue_type_by_proxy(splash_name, proxy_type): + all_domain_type = [] + if splash_name != 'default_splash' and splash_name != 'default_splash_tor': + all_domain_type.append(splash_name) + # check if can be used for discovery + if not is_splash_used_in_discovery(splash_name): + return all_domain_type + if proxy_type == 'tor': + all_domain_type.append('onion') + all_domain_type.append('regular') + # proxy_type = web + else: + all_domain_type.append('regular') + return all_domain_type + +def get_elem_to_crawl_by_queue_type(l_queue_type): + ## queues priority: + # 1 - priority queue + # 2 - discovery queue + # 3 - normal queue + ## + all_queue_key = ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue'] + + for queue_key in all_queue_key: + for queue_type in l_queue_type: + message = r_serv_onion.spop(queue_key.format(queue_type)) + if message: + dict_to_crawl = {} + splitted = message.rsplit(';', 1) + if len(splitted) == 2: + url, item_id = splitted + item_id = item_id.replace(PASTES_FOLDER+'/', '') + else: + # # TODO: to check/refractor + item_id = None + url = message + return {'url': url, 'paste': item_id, 'type_service': queue_type, 'original_message': message} + return None + +#### ---- #### + #### SPLASH MANAGER #### def get_splash_manager_url(reload=False): # TODO: add config reload @@ -558,6 +601,17 @@ def get_splash_url_from_manager_url(splash_manager_url, splash_port): host = url.netloc.split(':', 1)[0] return 'http://{}:{}'.format(host, splash_port) +def is_splash_used_in_discovery(splash_name): + res = r_serv_onion.hget('splash:metadata:{}'.format(splash_name), 'discovery_queue') + if res == 'True': + return True + else: + return False + +def restart_splash_docker(splash_url): + splash_port = splash_url.split(':')[-1] + return _restart_splash_docker(splash_port) + ## API ## def ping_splash_manager(): req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False) @@ -580,6 +634,14 @@ def get_all_splash_manager_proxies(): return req.json() else: print(req.json()) + +def _restart_splash_docker(splash_port): + dict_to_send = {'docker_port': splash_port} + req = requests.post('{}/api/v1/splash/restart'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False, json=dict_to_send) + if req.status_code == 200: + return req.json() + else: + print(req.json()) ## -- ## ## SPLASH ## @@ -648,6 +710,9 @@ def delete_all_proxies(): for proxy_name in get_all_proxies(): delete_proxy(proxy_name) +def set_proxy_used_in_discovery(proxy_name, value): + r_serv_onion.hset('splash:metadata:{}'.format(splash_name), 'discovery_queue', value) + def delete_proxy(proxy_name): # # TODO: force delete (delete all proxy) proxy_splash = get_all_splash_by_proxy(proxy_name) if proxy_splash: diff --git a/etc/splash/proxy-profiles/default.ini b/etc/splash/proxy-profiles/default.ini deleted file mode 100644 index 91208135..00000000 --- a/etc/splash/proxy-profiles/default.ini +++ /dev/null @@ -1,4 +0,0 @@ -[proxy] -host=localhost -port=9050 -type=SOCKS5