diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index 966f2a3a..287fc39d 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -414,7 +414,7 @@ class Lookyloo(): to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url) return to_return - def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]: + def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]: if not self.misp.available: return None try: @@ -423,10 +423,14 @@ class Lookyloo(): self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.') return None nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node] - events = {} + to_return: Dict[str, Set[str]] = defaultdict(set) for node in nodes_to_lookup: - events[node.name] = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid)) - return events + hits = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid)) + for event_id, values in hits.items(): + if not isinstance(values, set): + continue + to_return[event_id].update(values) + return to_return def _set_capture_cache(self, capture_dir: Path, force: bool=False, redis_pipeline: Optional[Redis]=None) -> None: '''Populate the redis cache for a capture. Mostly used on the index page.''' diff --git a/lookyloo/modules.py b/lookyloo/modules.py index 3226472d..358e2c48 100644 --- a/lookyloo/modules.py +++ b/lookyloo/modules.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from typing import Dict, Any, Optional, List, Union, Iterable +from typing import Dict, Any, Optional, List, Union, Iterable, Set from datetime import date +from collections import defaultdict import hashlib import json from pathlib import Path @@ -121,19 +122,23 @@ class MISP(): return event return None - def lookup(self, node: URLNode, hostnode: HostNode) -> Union[List[str], Dict]: + def lookup(self, node: URLNode, hostnode: HostNode) -> Union[Dict[str, Set[str]], Dict[str, Any]]: if self.available and self.enable_lookup: to_lookup = [node.name, node.hostname] + hostnode.resolved_ips if hasattr(hostnode, 'cnames'): to_lookup += hostnode.cnames - if attributes := self.client.search(controller='attributes', value=to_lookup, pythonify=True): + if attributes := self.client.search(controller='attributes', value=to_lookup, + enforce_warninglist=True, pythonify=True): if isinstance(attributes, list): - # NOTE: We have MISPAttributes in that list - return list(set(attribute.event_id for attribute in attributes)) # type: ignore + to_return: Dict[str, Set[str]] = defaultdict(set) + # NOTE: We have MISPAttribute in that list + for a in attributes: + to_return[a.event_id].add(a.value) # type: ignore + return to_return else: # The request returned an error return attributes # type: ignore - return [] + return {'info': 'No hits.'} else: return {'error': 'Module not available or lookup not enabled.'} diff --git a/website/web/templates/misp_lookup.html b/website/web/templates/misp_lookup.html index fe2a4c46..50a75a73 100644 --- a/website/web/templates/misp_lookup.html +++ b/website/web/templates/misp_lookup.html @@ -1,23 +1,23 @@ {% from "macros.html" import shorten_string %}