mirror of https://github.com/CIRCL/AIL-framework
fix: [regex tracker] fix timeout
parent
2ad7d912b3
commit
b1d0d067f9
|
@ -10,7 +10,6 @@ import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import signal
|
|
||||||
|
|
||||||
from Helper import Process
|
from Helper import Process
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
|
@ -20,18 +19,15 @@ import NotificationHelper
|
||||||
from packages import Item
|
from packages import Item
|
||||||
from packages import Term
|
from packages import Term
|
||||||
|
|
||||||
|
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib'))
|
||||||
|
import regex_helper
|
||||||
|
|
||||||
full_item_url = "/showsavedpaste/?paste="
|
full_item_url = "/showsavedpaste/?paste="
|
||||||
mail_body_template = "AIL Framework,\nNew occurrence for term tracked regex: {}\nitem id: {}\nurl: {}{}"
|
mail_body_template = "AIL Framework,\nNew occurrence for term tracked regex: {}\nitem id: {}\nurl: {}{}"
|
||||||
|
|
||||||
dict_regex_tracked = Term.get_regex_tracked_words_dict()
|
dict_regex_tracked = Term.get_regex_tracked_words_dict()
|
||||||
last_refresh = time.time()
|
last_refresh = time.time()
|
||||||
|
|
||||||
class TimeoutException(Exception):
|
|
||||||
pass
|
|
||||||
def timeout_handler(signum, frame):
|
|
||||||
raise TimeoutException
|
|
||||||
signal.signal(signal.SIGALRM, timeout_handler)
|
|
||||||
|
|
||||||
def new_term_found(term, term_type, item_id, item_date):
|
def new_term_found(term, term_type, item_id, item_date):
|
||||||
uuid_list = Term.get_term_uuid_list(term, 'regex')
|
uuid_list = Term.get_term_uuid_list(term, 'regex')
|
||||||
print('new tracked term found: {} in {}'.format(term, item_id))
|
print('new tracked term found: {} in {}'.format(term, item_id))
|
||||||
|
@ -56,11 +52,14 @@ if __name__ == "__main__":
|
||||||
publisher.info("Script RegexTracker started")
|
publisher.info("Script RegexTracker started")
|
||||||
|
|
||||||
config_section = 'RegexTracker'
|
config_section = 'RegexTracker'
|
||||||
|
module_name = "RegexTracker"
|
||||||
p = Process(config_section)
|
p = Process(config_section)
|
||||||
max_execution_time = p.config.getint(config_section, "max_execution_time")
|
max_execution_time = p.config.getint(config_section, "max_execution_time")
|
||||||
|
|
||||||
ull_item_url = p.config.get("Notifications", "ail_domain") + full_item_url
|
ull_item_url = p.config.get("Notifications", "ail_domain") + full_item_url
|
||||||
|
|
||||||
|
redis_cache_key = regex_helper.generate_redis_cache_key(module_name)
|
||||||
|
|
||||||
# Regex Frequency
|
# Regex Frequency
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
|
@ -72,20 +71,10 @@ if __name__ == "__main__":
|
||||||
item_content = Item.get_item_content(item_id)
|
item_content = Item.get_item_content(item_id)
|
||||||
|
|
||||||
for regex in dict_regex_tracked:
|
for regex in dict_regex_tracked:
|
||||||
|
matched = regex_helper.regex_search(module_name, redis_cache_key, dict_regex_tracked[regex], item_id, item_content, max_time=max_execution_time)
|
||||||
signal.alarm(max_execution_time)
|
|
||||||
try:
|
|
||||||
matched = dict_regex_tracked[regex].search(item_content)
|
|
||||||
except TimeoutException:
|
|
||||||
print ("{0} processing timeout".format(item_id))
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
signal.alarm(0)
|
|
||||||
|
|
||||||
if matched:
|
if matched:
|
||||||
new_term_found(regex, 'regex', item_id, item_date)
|
new_term_found(regex, 'regex', item_id, item_date)
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
|
|
@ -73,3 +73,30 @@ def regex_findall(module_name, redis_key, regex, item_id, item_content, max_time
|
||||||
print("Caught KeyboardInterrupt, terminating workers")
|
print("Caught KeyboardInterrupt, terminating workers")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
def _regex_search(redis_key, regex, item_content):
|
||||||
|
first_occ = regex.search(item_content)
|
||||||
|
if r_set:
|
||||||
|
r_serv_cache.set(redis_key, first_occ)
|
||||||
|
|
||||||
|
def regex_search(module_name, redis_key, regex, item_id, item_content, max_time=30):
|
||||||
|
proc = Proc(target=_regex_search, args=(redis_key, regex, item_content, ))
|
||||||
|
try:
|
||||||
|
proc.start()
|
||||||
|
proc.join(max_time)
|
||||||
|
if proc.is_alive():
|
||||||
|
proc.terminate()
|
||||||
|
Statistics.incr_module_timeout_statistic(module_name)
|
||||||
|
err_mess = "{}: processing timeout: {}".format(module_name, item_id)
|
||||||
|
print(err_mess)
|
||||||
|
publisher.info(err_mess)
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
first_occ = r_serv_cache.get(redis_key)
|
||||||
|
r_serv_cache.delete(redis_key)
|
||||||
|
proc.terminate()
|
||||||
|
return first_occ
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Caught KeyboardInterrupt, terminating workers")
|
||||||
|
proc.terminate()
|
||||||
|
sys.exit(0)
|
||||||
|
|
Loading…
Reference in New Issue