chg: Improve MISP lookup

pull/210/head
Raphaël Vinot 2021-06-02 13:27:50 -07:00
parent 380e44cc5b
commit 387f2d3c3d
3 changed files with 34 additions and 25 deletions

View File

@ -414,7 +414,7 @@ class Lookyloo():
to_return['pi'][ct.root_hartree.har.root_url] = self.pi.get_url_lookup(ct.root_hartree.har.root_url)
return to_return
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Any]]:
def get_misp_occurrences(self, capture_uuid: str, /) -> Optional[Dict[str, Set[str]]]:
if not self.misp.available:
return None
try:
@ -423,10 +423,14 @@ class Lookyloo():
self.logger.warning(f'Unable to get the modules responses unless the tree ({capture_uuid}) is cached.')
return None
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
events = {}
to_return: Dict[str, Set[str]] = defaultdict(set)
for node in nodes_to_lookup:
events[node.name] = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
return events
hits = self.misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
for event_id, values in hits.items():
if not isinstance(values, set):
continue
to_return[event_id].update(values)
return to_return
def _set_capture_cache(self, capture_dir: Path, force: bool=False, redis_pipeline: Optional[Redis]=None) -> None:
'''Populate the redis cache for a capture. Mostly used on the index page.'''

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Dict, Any, Optional, List, Union, Iterable
from typing import Dict, Any, Optional, List, Union, Iterable, Set
from datetime import date
from collections import defaultdict
import hashlib
import json
from pathlib import Path
@ -121,19 +122,23 @@ class MISP():
return event
return None
def lookup(self, node: URLNode, hostnode: HostNode) -> Union[List[str], Dict]:
def lookup(self, node: URLNode, hostnode: HostNode) -> Union[Dict[str, Set[str]], Dict[str, Any]]:
if self.available and self.enable_lookup:
to_lookup = [node.name, node.hostname] + hostnode.resolved_ips
if hasattr(hostnode, 'cnames'):
to_lookup += hostnode.cnames
if attributes := self.client.search(controller='attributes', value=to_lookup, pythonify=True):
if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list):
# NOTE: We have MISPAttributes in that list
return list(set(attribute.event_id for attribute in attributes)) # type: ignore
to_return: Dict[str, Set[str]] = defaultdict(set)
# NOTE: We have MISPAttribute in that list
for a in attributes:
to_return[a.event_id].add(a.value) # type: ignore
return to_return
else:
# The request returned an error
return attributes # type: ignore
return []
return {'info': 'No hits.'}
else:
return {'error': 'Module not available or lookup not enabled.'}

View File

@ -1,23 +1,23 @@
{% from "macros.html" import shorten_string %}
<div>
<center><h1 class="display-4">MISP hits</h1></center>
<center>
<h1 class="display-4">MISP hits</h1>
<h6>Searching on URL, domain, IPs, and CNAMEs for all the nodes up to the rendered page.</h6>
<h6>Skips the entries in warnings lists enabled on your MISP instance.</h6>
</center>
{% if hits %}
{% for value, entries in hits.items() %}
{% if entries %}
<h3>{{value}}</h3>
<hr>
{% if entries is mapping %}
{{ entries }}
{% elif entries is sequence %}
<ul>
{% for entry in entries %}
<li>{{misp_root_url}}/events/{{entry}}</li>
{% endfor %}
</ul>
{% endif %}
{% endif %}
<ul>
{% for event_id, values in hits.items() %}
<li><a href="{{misp_root_url}}/events/{{event_id}}">Event {{event_id}}</a>:
<ul>
{% for v in values|sort %}
<li>{{ v }}</li>
{% endfor %}
</ul>
</li>
{% endfor %}
</ul>
{% else %}
No hits
{% endif %}