mirror of https://github.com/CIRCL/AIL-framework
unnecessarily parenthesis removed
parent
7fb796172c
commit
4af54b7ff2
|
@ -57,14 +57,14 @@ def is_in_role(user_id, role):
|
||||||
|
|
||||||
def check_term_uuid_valid_access(term_uuid, user_id):
|
def check_term_uuid_valid_access(term_uuid, user_id):
|
||||||
if not is_valid_uuid_v4(term_uuid):
|
if not is_valid_uuid_v4(term_uuid):
|
||||||
return ({"status": "error", "reason": "Invalid uuid"}, 400)
|
return {"status": "error", "reason": "Invalid uuid"}, 400
|
||||||
level = r_serv_term.hget('tracker:{}'.format(term_uuid), 'level')
|
level = r_serv_term.hget('tracker:{}'.format(term_uuid), 'level')
|
||||||
if not level:
|
if not level:
|
||||||
return ({"status": "error", "reason": "Unknown uuid"}, 404)
|
return {"status": "error", "reason": "Unknown uuid"}, 404
|
||||||
if level == 0:
|
if level == 0:
|
||||||
if r_serv_term.hget('tracker:{}'.format(term_uuid), 'user_id') != user_id:
|
if r_serv_term.hget('tracker:{}'.format(term_uuid), 'user_id') != user_id:
|
||||||
if not is_in_role(user_id, 'admin'):
|
if not is_in_role(user_id, 'admin'):
|
||||||
return ({"status": "error", "reason": "Unknown uuid"}, 404)
|
return {"status": "error", "reason": "Unknown uuid"}, 404
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ def is_valid_mail(email):
|
||||||
def verify_mail_list(mail_list):
|
def verify_mail_list(mail_list):
|
||||||
for mail in mail_list:
|
for mail in mail_list:
|
||||||
if not is_valid_mail(mail):
|
if not is_valid_mail(mail):
|
||||||
return ({'status': 'error', 'reason': 'Invalid email', 'value': mail}, 400)
|
return {'status': 'error', 'reason': 'Invalid email', 'value': mail}, 400
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def is_valid_regex(term_regex):
|
def is_valid_regex(term_regex):
|
||||||
|
@ -148,10 +148,10 @@ def is_term_tracked_in_user_level(term, term_type, user_id):
|
||||||
def parse_json_term_to_add(dict_input, user_id):
|
def parse_json_term_to_add(dict_input, user_id):
|
||||||
term = dict_input.get('term', None)
|
term = dict_input.get('term', None)
|
||||||
if not term:
|
if not term:
|
||||||
return ({"status": "error", "reason": "Term not provided"}, 400)
|
return {"status": "error", "reason": "Term not provided"}, 400
|
||||||
term_type = dict_input.get('type', None)
|
term_type = dict_input.get('type', None)
|
||||||
if not term_type:
|
if not term_type:
|
||||||
return ({"status": "error", "reason": "Term type not provided"}, 400)
|
return {"status": "error", "reason": "Term type not provided"}, 400
|
||||||
nb_words = dict_input.get('nb_words', 1)
|
nb_words = dict_input.get('nb_words', 1)
|
||||||
description = dict_input.get('description', '')
|
description = dict_input.get('description', '')
|
||||||
description = escape(description)
|
description = escape(description)
|
||||||
|
@ -180,27 +180,27 @@ def parse_json_term_to_add(dict_input, user_id):
|
||||||
# check if term already tracked in global
|
# check if term already tracked in global
|
||||||
if level==1:
|
if level==1:
|
||||||
if is_term_tracked_in_global_level(term, term_type):
|
if is_term_tracked_in_global_level(term, term_type):
|
||||||
return ({"status": "error", "reason": "Term already tracked"}, 409)
|
return {"status": "error", "reason": "Term already tracked"}, 409
|
||||||
else:
|
else:
|
||||||
if is_term_tracked_in_user_level(term, term_type, user_id):
|
if is_term_tracked_in_user_level(term, term_type, user_id):
|
||||||
return ({"status": "error", "reason": "Term already tracked"}, 409)
|
return {"status": "error", "reason": "Term already tracked"}, 409
|
||||||
|
|
||||||
term_uuid = add_tracked_term(term , term_type, user_id, level, tags, mails, description)
|
term_uuid = add_tracked_term(term , term_type, user_id, level, tags, mails, description)
|
||||||
|
|
||||||
return ({'term': term, 'type': term_type, 'uuid': term_uuid}, 200)
|
return {'term': term, 'type': term_type, 'uuid': term_uuid}, 200
|
||||||
|
|
||||||
|
|
||||||
def parse_tracked_term_to_add(term , term_type, nb_words=1):
|
def parse_tracked_term_to_add(term , term_type, nb_words=1):
|
||||||
if term_type=='regex':
|
if term_type=='regex':
|
||||||
if not is_valid_regex(term):
|
if not is_valid_regex(term):
|
||||||
return ({"status": "error", "reason": "Invalid regex"}, 400)
|
return {"status": "error", "reason": "Invalid regex"}, 400
|
||||||
elif term_type=='word' or term_type=='set':
|
elif term_type=='word' or term_type=='set':
|
||||||
# force lowercase
|
# force lowercase
|
||||||
term = term.lower()
|
term = term.lower()
|
||||||
word_set = set(term)
|
word_set = set(term)
|
||||||
set_inter = word_set.intersection(special_characters)
|
set_inter = word_set.intersection(special_characters)
|
||||||
if set_inter:
|
if set_inter:
|
||||||
return ({"status": "error", "reason": "special character not allowed", "message": "Please use a regex or remove all special characters"}, 400)
|
return {"status": "error", "reason": "special character not allowed", "message": "Please use a regex or remove all special characters"}, 400
|
||||||
words = term.split()
|
words = term.split()
|
||||||
# not a word
|
# not a word
|
||||||
if term_type=='word' and len(words)>1:
|
if term_type=='word' and len(words)>1:
|
||||||
|
@ -226,13 +226,13 @@ def parse_tracked_term_to_add(term , term_type, nb_words=1):
|
||||||
|
|
||||||
elif term_type=='yara_custom':
|
elif term_type=='yara_custom':
|
||||||
if not Tracker.is_valid_yara_rule(term):
|
if not Tracker.is_valid_yara_rule(term):
|
||||||
return ({"status": "error", "reason": "Invalid custom Yara Rule"}, 400)
|
return {"status": "error", "reason": "Invalid custom Yara Rule"}, 400
|
||||||
elif term_type=='yara_default':
|
elif term_type=='yara_default':
|
||||||
if not Tracker.is_valid_default_yara_rule(term):
|
if not Tracker.is_valid_default_yara_rule(term):
|
||||||
return ({"status": "error", "reason": "The Yara Rule doesn't exist"}, 400)
|
return {"status": "error", "reason": "The Yara Rule doesn't exist"}, 400
|
||||||
else:
|
else:
|
||||||
return ({"status": "error", "reason": "Incorrect type"}, 400)
|
return {"status": "error", "reason": "Incorrect type"}, 400
|
||||||
return ({"status": "success", "term": term, "type": term_type}, 200)
|
return {"status": "success", "term": term, "type": term_type}, 200
|
||||||
|
|
||||||
def add_tracked_term(term , term_type, user_id, level, tags, mails, description, dashboard=0):
|
def add_tracked_term(term , term_type, user_id, level, tags, mails, description, dashboard=0):
|
||||||
|
|
||||||
|
@ -281,14 +281,15 @@ def add_tracked_term(term , term_type, user_id, level, tags, mails, description,
|
||||||
|
|
||||||
return term_uuid
|
return term_uuid
|
||||||
|
|
||||||
|
|
||||||
def parse_tracked_term_to_delete(dict_input, user_id):
|
def parse_tracked_term_to_delete(dict_input, user_id):
|
||||||
term_uuid = dict_input.get("uuid", None)
|
term_uuid = dict_input.get("uuid", None)
|
||||||
res = check_term_uuid_valid_access(term_uuid, user_id)
|
res = check_term_uuid_valid_access(term_uuid, user_id)
|
||||||
if res:
|
if res:
|
||||||
return res
|
return res
|
||||||
|
|
||||||
delete_term(term_uuid)
|
delete_term(term_uuid)
|
||||||
return ({"uuid": term_uuid}, 200)
|
return {"uuid": term_uuid}, 200
|
||||||
|
|
||||||
|
|
||||||
# # TODO: MOVE IN TRACKER
|
# # TODO: MOVE IN TRACKER
|
||||||
def delete_term(term_uuid):
|
def delete_term(term_uuid):
|
||||||
|
@ -328,7 +329,6 @@ def delete_term(term_uuid):
|
||||||
r_serv_term.delete('tracker:sources:{}'.format(term_uuid))
|
r_serv_term.delete('tracker:sources:{}'.format(term_uuid))
|
||||||
|
|
||||||
# remove item set
|
# remove item set
|
||||||
#########################3
|
|
||||||
all_item_date = r_serv_term.zrange(f'tracker:stat:{term_uuid}', 0, -1, withscores=True)
|
all_item_date = r_serv_term.zrange(f'tracker:stat:{term_uuid}', 0, -1, withscores=True)
|
||||||
if all_item_date:
|
if all_item_date:
|
||||||
all_item_date = dict(all_item_date)
|
all_item_date = dict(all_item_date)
|
||||||
|
@ -426,12 +426,8 @@ def parse_get_tracker_term_item(dict_input, user_id):
|
||||||
all_item_id = Tracker.get_tracker_items_by_daterange(term_uuid, date_from, date_to)
|
all_item_id = Tracker.get_tracker_items_by_daterange(term_uuid, date_from, date_to)
|
||||||
all_item_id = Item.get_item_list_desc(all_item_id)
|
all_item_id = Item.get_item_list_desc(all_item_id)
|
||||||
|
|
||||||
res_dict = {}
|
res_dict = {'uuid': term_uuid, 'date_from': date_from, 'date_to': date_to, 'items': all_item_id}
|
||||||
res_dict['uuid'] = term_uuid
|
return res_dict, 200
|
||||||
res_dict['date_from'] = date_from
|
|
||||||
res_dict['date_to'] = date_to
|
|
||||||
res_dict['items'] = all_item_id
|
|
||||||
return (res_dict, 200)
|
|
||||||
|
|
||||||
def get_tracked_term_first_seen(term_uuid):
|
def get_tracked_term_first_seen(term_uuid):
|
||||||
return Tracker.get_tracker_first_seen(term_uuid)
|
return Tracker.get_tracker_first_seen(term_uuid)
|
||||||
|
@ -444,7 +440,6 @@ def get_term_metedata(term_uuid, user_id=False, description=False, level=False,
|
||||||
dict_uuid['term'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'tracked')
|
dict_uuid['term'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'tracked')
|
||||||
dict_uuid['type'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'type')
|
dict_uuid['type'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'type')
|
||||||
dict_uuid['date'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'date')
|
dict_uuid['date'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'date')
|
||||||
dict_uuid['description'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'description')
|
|
||||||
dict_uuid['first_seen'] = get_tracked_term_first_seen(term_uuid)
|
dict_uuid['first_seen'] = get_tracked_term_first_seen(term_uuid)
|
||||||
dict_uuid['last_seen'] = get_tracked_term_last_seen(term_uuid)
|
dict_uuid['last_seen'] = get_tracked_term_last_seen(term_uuid)
|
||||||
if user_id:
|
if user_id:
|
||||||
|
@ -457,6 +452,8 @@ def get_term_metedata(term_uuid, user_id=False, description=False, level=False,
|
||||||
dict_uuid['tags'] = get_list_trackeed_term_tags(term_uuid)
|
dict_uuid['tags'] = get_list_trackeed_term_tags(term_uuid)
|
||||||
if sparkline:
|
if sparkline:
|
||||||
dict_uuid['sparkline'] = get_tracked_term_sparkline(term_uuid)
|
dict_uuid['sparkline'] = get_tracked_term_sparkline(term_uuid)
|
||||||
|
if description:
|
||||||
|
dict_uuid['description'] = r_serv_term.hget('tracker:{}'.format(term_uuid), 'description')
|
||||||
dict_uuid['uuid'] = term_uuid
|
dict_uuid['uuid'] = term_uuid
|
||||||
return dict_uuid
|
return dict_uuid
|
||||||
|
|
||||||
|
|
|
@ -112,11 +112,9 @@ def add_tracked_menu():
|
||||||
else:
|
else:
|
||||||
tracker = yara_default_rule
|
tracker = yara_default_rule
|
||||||
tracker_type='yara_default'
|
tracker_type='yara_default'
|
||||||
# #
|
|
||||||
|
|
||||||
if level == 'on':
|
if level == 'on':
|
||||||
level = 1
|
level = 1
|
||||||
|
|
||||||
if mails:
|
if mails:
|
||||||
mails = mails.split()
|
mails = mails.split()
|
||||||
if tags:
|
if tags:
|
||||||
|
|
Loading…
Reference in New Issue