fix: make mypy happy, always return attribute timestamp

pull/907/head
Raphaël Vinot 2024-04-09 11:42:23 +02:00
parent bebdbe77f6
commit c9ddeb0065
2 changed files with 28 additions and 22 deletions

View File

@ -765,21 +765,21 @@ class Lookyloo():
def takedown_filtered(self, hostnode: HostNode) -> dict[str, Any] | None: def takedown_filtered(self, hostnode: HostNode) -> dict[str, Any] | None:
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.optionxform = str config.optionxform = str # type: ignore[method-assign,assignment]
ignorelist_path = get_homedir() / 'config' / 'ignore_list.ini' ignorelist_path = get_homedir() / 'config' / 'ignore_list.ini'
config.read(ignorelist_path) config.read(ignorelist_path)
#checking if domain should be ignored # checking if domain should be ignored
domains = config['domain']['ignore'] domains = config['domain']['ignore']
pattern = r"(https?://)?(www\d?\.)?(?P<domain>[\w\.-]+\.\w+)(/\S*)?" pattern = r"(https?://)?(www\d?\.)?(?P<domain>[\w\.-]+\.\w+)(/\S*)?"
match = re.match(pattern, hostnode.name) match = re.match(pattern, hostnode.name)
if match: if match:
for regex in domains: for regex in domains:
ignore_domain = regex + "$" ignore_domain = regex + "$"
ignore_subdomain = ".*\." + regex + "$" ignore_subdomain = r".*\." + regex + "$"
if (re.match(ignore_domain, match.group("domain")) or re.match(ignore_subdomain, match.group("domain"))) and regex.strip(): if (re.match(ignore_domain, match.group("domain")) or re.match(ignore_subdomain, match.group("domain"))) and regex.strip():
return None return None
result = self.takedown_details(hostnode) result = self.takedown_details(hostnode)
#ignoring mails # ignoring mails
final_mails = [] final_mails = []
replacelist = config['replacelist'] replacelist = config['replacelist']
ignorelist = config['abuse']['ignore'].split('\n') ignorelist = config['abuse']['ignore'].split('\n')
@ -806,7 +806,7 @@ class Lookyloo():
result['all_emails'] = final_mails result['all_emails'] = final_mails
return result return result
def get_filtered_emails(self, capture_uuid, detailed=False) -> set[str] | dict[str, str]: def get_filtered_emails(self, capture_uuid: str, detailed: bool=False) -> set[str] | dict[str, str]:
info = self.contacts(capture_uuid) info = self.contacts(capture_uuid)
final_mails = set() final_mails = set()
for i in info: for i in info:
@ -856,13 +856,17 @@ class Lookyloo():
self.logger.info('There are no MISP instances available for a lookup.') self.logger.info('There are no MISP instances available for a lookup.')
else: else:
for instance_name in self.misps.keys(): for instance_name in self.misps.keys():
if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name, time=True): if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name):
misp_url = occurrences[1] misp_url = occurrences[1]
for element in occurrences[0]: for element in occurrences[0]:
for attribute in occurrences[0][element]: for attribute in occurrences[0][element]:
if attribute[0] == cache.url: if not isinstance(attribute, tuple):
# Issue with the response of the search, ignore
continue
value, timestamp = attribute
if value == initial_url:
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
diff = now - attribute[1] diff = now - timestamp
if diff.days < 1: # MISP event should not be older than 24hours if diff.days < 1: # MISP event should not be older than 24hours
misp += f"\n{attribute[1]:%a %m-%d-%y %I:%M%p(%z %Z)} : {misp_url}events/{element}" misp += f"\n{attribute[1]:%a %m-%d-%y %I:%M%p(%z %Z)} : {misp_url}events/{element}"
break # some events have more than just one timestamp, we just take the first one break # some events have more than just one timestamp, we just take the first one
@ -1147,7 +1151,7 @@ class Lookyloo():
return [event] return [event]
def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None, time: bool=False) -> tuple[dict[str, set[str]], str] | None: def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None:
if instance_name is None: if instance_name is None:
misp = self.misps.default_misp misp = self.misps.default_misp
elif self.misps.get(instance_name) is not None: elif self.misps.get(instance_name) is not None:
@ -1164,11 +1168,11 @@ class Lookyloo():
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.') self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return None return None
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node] nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: dict[str, set[str]] = defaultdict(set) to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
for node in nodes_to_lookup: for node in nodes_to_lookup:
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid), time) hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
for event_id, values in hits.items(): for event_id, values in hits.items():
if not isinstance(values, set): if not isinstance(event_id, int) or not isinstance(values, set):
continue continue
to_return[event_id].update(values) to_return[event_id].update(values)
return to_return, misp.client.root_url return to_return, misp.client.root_url

View File

@ -2,13 +2,13 @@
from __future__ import annotations from __future__ import annotations
import datetime from datetime import datetime
import re import re
from io import BytesIO from io import BytesIO
from collections import defaultdict from collections import defaultdict
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any, TYPE_CHECKING, Iterator, Literal from typing import Any, TYPE_CHECKING, Iterator
import requests import requests
from har2tree import HostNode, URLNode, Har2TreeError from har2tree import HostNode, URLNode, Har2TreeError
@ -252,7 +252,7 @@ class MISP(AbstractModule):
return event return event
return None return None
def lookup(self, node: URLNode, hostnode: HostNode, time: bool=False) -> dict[str, set[str]] | dict[str, Any]: def lookup(self, node: URLNode, hostnode: HostNode) -> dict[int | str, str | set[tuple[str, datetime]]]:
if self.available and self.enable_lookup: if self.available and self.enable_lookup:
tld = self.psl.publicsuffix(hostnode.name) tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1] domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
@ -268,14 +268,16 @@ class MISP(AbstractModule):
if attributes := self.client.search(controller='attributes', value=to_lookup, if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True): enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list): if isinstance(attributes, list):
to_return: dict[str, set[str]] = defaultdict(set) to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
# NOTE: We have MISPAttribute in that list a: MISPAttribute
for a in attributes: for a in attributes: # type: ignore[assignment]
if time: if isinstance(a.value, str):
to_return[a.event_id].add((a.value,a.timestamp)) # a.timestamp is always a datetime in this situation
to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type]
else: else:
to_return[a.event_id].add(a.value) # type: ignore[union-attr,index] # This shouldn't happen (?)
return to_return self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}')
return to_return # type: ignore[return-value]
else: else:
# The request returned an error # The request returned an error
return attributes # type: ignore[return-value] return attributes # type: ignore[return-value]