adding filter and misp lookup

pull/907/head
Adrian Maraj 2024-04-04 14:31:22 +02:00 committed by Raphaël Vinot
parent 57b5b41016
commit a9cad984e5
3 changed files with 80 additions and 8 deletions

View File

@ -7,6 +7,8 @@ Initial URL: {initial_url}
{redirects}
{misp}
{comment}

View File

@ -3,18 +3,19 @@
from __future__ import annotations
import base64
import configparser
import copy
import gzip
import json
import logging
import operator
import shutil
import re
import smtplib
import ssl
import time
from collections import defaultdict
from datetime import date, datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from email.message import EmailMessage
from functools import cached_property
from io import BytesIO
@ -761,13 +762,65 @@ class Lookyloo():
to_return['all_emails'] = list(to_return['all_emails'])
return to_return
def takedown_filtered(self, hostnode: HostNode) -> dict[str, Any] | None:
config = configparser.ConfigParser()
config.optionxform = str
config.read('config/domain.ini')
#checking if domain should be ignored
domains = config['domain']['ignore']
pattern = r"(https?://)?(www\d?\.)?(?P<domain>[\w\.-]+\.\w+)(/\S*)?"
match = re.match(pattern, hostnode.name)
if match:
if match.group("domain") in domains:
return None
result = self.takedown_details(hostnode)
#ignoring mails
final_mails = []
replacelist = config['replacelist']
ignorelist = config['abuse']['ignore'].split('\n')
for mail in result['all_emails']:
# ignoring mails
is_valid = True
for regex in ignorelist:
if regex.strip() == '':
continue
match = re.search(regex.strip(), mail)
if match:
is_valid = False
break
if is_valid:
# replacing emails
for replaceable in replacelist:
if mail == replaceable:
final_mails += replacelist[replaceable].split(',')
is_valid = False
break
if is_valid:
# mail is valid and can be added to the result
final_mails += [mail]
result['all_emails'] = final_mails
return result
def get_filtered_emails(self, capture_uuid, detailed=False) -> set[str] | dict[str, str]:
info = self.contacts(capture_uuid)
if detailed:
final_mails = {}
for i in info:
final_mails[i['hostname']] = i['all_emails']
else:
final_mails = set()
for i in info:
for mail in i['all_emails']:
final_mails.add(mail)
return final_mails
def contacts(self, capture_uuid: str, /) -> list[dict[str, Any]]:
capture = self.get_crawled_tree(capture_uuid)
rendered_hostnode = self.get_hostnode_from_tree(capture_uuid, capture.root_hartree.rendered_node.hostnode_uuid)
result = []
for node in reversed(rendered_hostnode.get_ancestors()):
result.append(self.takedown_details(node))
result.append(self.takedown_details(rendered_hostnode))
result.append(self.takedown_filtered(node))
result.append(self.takedown_filtered(rendered_hostnode))
return result
def send_mail(self, capture_uuid: str, /, email: str='', comment: str | None=None) -> bool | dict[str, Any]:
@ -798,7 +851,21 @@ class Lookyloo():
redirects += '\n'.join(cache.redirects)
else:
redirects = "No redirects."
misp = ''
if not self.misps.available:
self.logger.info('There are no MISP instances available for a lookup.')
else:
for instance_name in self.misps.keys():
if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name, time=True):
misp_url = occurrences[1]
for element in occurrences[0]:
for attribute in occurrences[0][element]:
if isinstance(attribute, datetime):
now = datetime.now(timezone.utc)
diff = now - attribute
if diff.days < 1: # MISP event should not be older than 24hours
misp += str(attribute) + ': ' + misp_url + 'events/' + str(element) + '\n'
break # some events have more than just one timestamp, we just take the first one
msg = EmailMessage()
msg['From'] = email_config['from']
if email:
@ -813,6 +880,7 @@ class Lookyloo():
initial_url=initial_url,
redirects=redirects,
comment=comment if comment else '',
misp='MISP occurrences from the last 24h: \n' + misp if misp else '',
sender=msg['From'].addresses[0].display_name,
)
msg.set_content(body)
@ -1079,7 +1147,7 @@ class Lookyloo():
return [event]
def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[str, set[str]], str] | None:
def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None, time: bool=False) -> tuple[dict[str, set[str]], str] | None:
if instance_name is None:
misp = self.misps.default_misp
elif self.misps.get(instance_name) is not None:
@ -1098,7 +1166,7 @@ class Lookyloo():
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: dict[str, set[str]] = defaultdict(set)
for node in nodes_to_lookup:
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid), time)
for event_id, values in hits.items():
if not isinstance(values, set):
continue

View File

@ -251,7 +251,7 @@ class MISP(AbstractModule):
return event
return None
def lookup(self, node: URLNode, hostnode: HostNode) -> dict[str, set[str]] | dict[str, Any]:
def lookup(self, node: URLNode, hostnode: HostNode, time: bool=False) -> dict[str, set[str]] | dict[str, Any]:
if self.available and self.enable_lookup:
tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
@ -271,6 +271,8 @@ class MISP(AbstractModule):
# NOTE: We have MISPAttribute in that list
for a in attributes:
to_return[a.event_id].add(a.value) # type: ignore[union-attr,index]
if time:
to_return[a.event_id].add(a.timestamp)
return to_return
else:
# The request returned an error