mirror of https://github.com/CIRCL/lookyloo
chg: Normalize ressources details display, refactoring
parent
11598fc3ba
commit
60b767d08f
lookyloo
website/web/templates
|
@ -150,13 +150,52 @@ class Indexing():
|
|||
|
||||
class Context():
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, sanejs: Optional[SaneJavaScript] = None):
|
||||
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True)
|
||||
self.sanejs = sanejs
|
||||
self._cache_known_content()
|
||||
|
||||
def clear_context(self):
|
||||
self.redis.flushdb()
|
||||
|
||||
def _filter(self, urlnodes: Union[URLNode, List[URLNode]], known_hashes: Set[str]) -> Iterator[Tuple[URLNode, str]]:
|
||||
def _get_resources_hashes(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
|
||||
if isinstance(har2tree_container, CrawledTree):
|
||||
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
|
||||
elif isinstance(har2tree_container, HostNode):
|
||||
urlnodes = har2tree_container.urls
|
||||
elif isinstance(har2tree_container, URLNode):
|
||||
urlnodes = [har2tree_container]
|
||||
else:
|
||||
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
|
||||
all_ressources_hashes: Set[str] = set()
|
||||
for urlnode in urlnodes:
|
||||
if hasattr(urlnode, 'resources_hashes'):
|
||||
all_ressources_hashes.update(urlnode.resources_hashes)
|
||||
return all_ressources_hashes
|
||||
|
||||
def _cache_known_content(self) -> None:
|
||||
p = self.redis.pipeline()
|
||||
for filename, file_content in load_known_content().items():
|
||||
for k, type_content in file_content.items():
|
||||
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
|
||||
p.execute()
|
||||
|
||||
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Dict[str, Union[str, List[str]]]:
|
||||
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)"""
|
||||
all_ressources_hashes = self._get_resources_hashes(har2tree_container)
|
||||
# Get from local cache of known content all descriptions related to the ressources.
|
||||
known_content_table = dict(zip(all_ressources_hashes,
|
||||
self.redis.hmget('known_content', all_ressources_hashes)))
|
||||
|
||||
if self.sanejs and self.sanejs.available:
|
||||
# Query sanejs on the remaining ones
|
||||
to_lookup = [h for h, description in known_content_table.items() if not description]
|
||||
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
||||
libname, version, path = entry[0].split("|")
|
||||
known_content_table[h] = (libname, version, path, len(entry))
|
||||
return {h: details for h, details in known_content_table.items() if details}
|
||||
|
||||
def _filter(self, urlnodes: Union[URLNode, List[URLNode]], known_hashes: Iterable[str]) -> Iterator[Tuple[URLNode, str]]:
|
||||
if isinstance(urlnodes, URLNode):
|
||||
_urlnodes = [urlnodes]
|
||||
else:
|
||||
|
@ -166,22 +205,42 @@ class Context():
|
|||
if h not in known_hashes:
|
||||
yield urlnode, h
|
||||
|
||||
def mark_as_legitimate(self, urlnodes: List[URLNode], known_hashes: Set[str]) -> None:
|
||||
def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None:
|
||||
if hostnode_uuid:
|
||||
urlnodes = tree.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls
|
||||
elif urlnode_uuid:
|
||||
urlnodes = [tree.root_hartree.get_url_node_by_uuid(urlnode_uuid)]
|
||||
else:
|
||||
urlnodes = tree.root_hartree.url_tree.traverse()
|
||||
known_content = self.find_known_content(tree)
|
||||
pipeline = self.redis.pipeline()
|
||||
for urlnode, h in self._filter(urlnodes, known_hashes):
|
||||
for urlnode, h in self._filter(urlnodes, known_content):
|
||||
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
|
||||
pipeline.execute()
|
||||
|
||||
def contextualize_tree(self, tree: CrawledTree) -> CrawledTree:
|
||||
hostnodes_with_malicious_content = set()
|
||||
known_content = self.find_known_content(tree)
|
||||
for urlnode in tree.root_hartree.url_tree.traverse():
|
||||
malicious = self.is_malicious(urlnode, known_content)
|
||||
if malicious is True:
|
||||
urlnode.add_feature('malicious', malicious)
|
||||
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
||||
for hostnode_with_malicious_content in hostnodes_with_malicious_content:
|
||||
hostnode = tree.root_hartree.get_host_node_by_uuid(hostnode_with_malicious_content)
|
||||
hostnode.add_feature('malicious', malicious)
|
||||
return tree
|
||||
|
||||
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
|
||||
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname)
|
||||
|
||||
def malicious_node(self, urlnode: URLNode, known_hashes: Set[str]) -> None:
|
||||
def malicious_node(self, urlnode: URLNode, known_hashes: Iterable[str]) -> None:
|
||||
for _, h in self._filter(urlnode, known_hashes):
|
||||
self.redis.sadd('bh|malicious', h)
|
||||
|
||||
# Query DB
|
||||
|
||||
def is_legitimate(self, urlnode: URLNode, known_hashes: Set[str]) -> Optional[bool]:
|
||||
def is_legitimate(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Optional[bool]:
|
||||
"""3 cases:
|
||||
* True if *all* the contents are known legitimate
|
||||
* False if *any* content is malicious
|
||||
|
@ -205,7 +264,7 @@ class Context():
|
|||
return True # All the contents are known legitimate
|
||||
return None
|
||||
|
||||
def is_malicious(self, urlnode: URLNode, known_hashes: Set[str]) -> Optional[bool]:
|
||||
def is_malicious(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Optional[bool]:
|
||||
"""3 cases:
|
||||
* True if *any* content is malicious
|
||||
* False if *all* the contents are known legitimate
|
||||
|
@ -218,7 +277,7 @@ class Context():
|
|||
return True
|
||||
return None
|
||||
|
||||
def legitimacy_details(self, urlnode: URLNode, known_hashes: Set[str]) -> Dict[str, Tuple[bool, Optional[List[str]]]]:
|
||||
def legitimacy_details(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Dict[str, Tuple[bool, Optional[List[str]]]]:
|
||||
to_return = {}
|
||||
for urlnode, h in self._filter(urlnode, known_hashes):
|
||||
hostnames = self.redis.smembers(f'bh|{h}|legitimate')
|
||||
|
@ -239,7 +298,6 @@ class Lookyloo():
|
|||
self.configs: Dict[str, Dict[str, Any]] = load_configs()
|
||||
self.logger.setLevel(self.get_config('loglevel'))
|
||||
self.indexing = Indexing()
|
||||
self.context = Context()
|
||||
self.is_public_instance = self.get_config('public_instance')
|
||||
|
||||
self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
|
||||
|
@ -270,18 +328,14 @@ class Lookyloo():
|
|||
if not self.sanejs.available:
|
||||
self.logger.warning('Unable to setup the SaneJS module')
|
||||
|
||||
# TODO: reorganize startup cache.
|
||||
self.cache_known_content()
|
||||
if hasattr(self, 'sanejs') and self.sanejs.available:
|
||||
self.context = Context(self.sanejs)
|
||||
else:
|
||||
self.context = Context()
|
||||
|
||||
if not self.redis.exists('cache_loaded'):
|
||||
self._init_existing_dumps()
|
||||
|
||||
def cache_known_content(self) -> None:
|
||||
p = self.redis.pipeline()
|
||||
for filename, file_content in load_known_content().items():
|
||||
for k, type_content in file_content.items():
|
||||
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
|
||||
p.execute()
|
||||
|
||||
def cache_user_agents(self, user_agent: str, remote_ip: str) -> None:
|
||||
today = date.today().isoformat()
|
||||
self.redis.zincrby(f'user_agents|{today}', 1, f'{remote_ip}|{user_agent}')
|
||||
|
@ -356,27 +410,7 @@ class Lookyloo():
|
|||
|
||||
def add_to_legitimate(self, capture_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None):
|
||||
ct = self.get_crawled_tree(capture_uuid)
|
||||
known_content = self.find_known_content(ct)
|
||||
if hostnode_uuid:
|
||||
urlnodes = ct.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls
|
||||
elif urlnode_uuid:
|
||||
urlnodes = [ct.root_hartree.get_url_node_by_uuid(urlnode_uuid)]
|
||||
else:
|
||||
urlnodes = ct.root_hartree.url_tree.traverse()
|
||||
self.context.mark_as_legitimate(urlnodes, set(known_content.keys()))
|
||||
|
||||
def bodies_legitimacy_check(self, tree: CrawledTree) -> CrawledTree:
|
||||
hostnodes_with_malicious_content = set()
|
||||
known_content = self.find_known_content(tree)
|
||||
for urlnode in tree.root_hartree.url_tree.traverse():
|
||||
malicious = self.context.is_malicious(urlnode, set(known_content.keys()))
|
||||
if malicious is True:
|
||||
urlnode.add_feature('malicious', malicious)
|
||||
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
||||
for hostnode_with_malicious_content in hostnodes_with_malicious_content:
|
||||
hostnode = tree.root_hartree.get_host_node_by_uuid(hostnode_with_malicious_content)
|
||||
hostnode.add_feature('malicious', malicious)
|
||||
return tree
|
||||
self.context.mark_as_legitimate(ct, hostnode_uuid, urlnode_uuid)
|
||||
|
||||
def load_tree(self, capture_uuid: str) -> Tuple[str, str, str, str, Dict[str, str]]:
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
|
@ -387,7 +421,7 @@ class Lookyloo():
|
|||
with open((capture_dir / 'meta'), 'r') as f:
|
||||
meta = json.load(f)
|
||||
ct = self.get_crawled_tree(capture_uuid)
|
||||
ct = self.bodies_legitimacy_check(ct)
|
||||
ct = self.context.contextualize_tree(ct)
|
||||
return ct.to_json(), ct.start_time.isoformat(), ct.user_agent, ct.root_url, meta
|
||||
|
||||
def remove_pickle(self, capture_uuid: str) -> None:
|
||||
|
@ -859,35 +893,6 @@ class Lookyloo():
|
|||
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
||||
return captures_list
|
||||
|
||||
def _get_resources_hashes(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
|
||||
if isinstance(har2tree_container, CrawledTree):
|
||||
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
|
||||
elif isinstance(har2tree_container, HostNode):
|
||||
urlnodes = har2tree_container.urls
|
||||
elif isinstance(har2tree_container, URLNode):
|
||||
urlnodes = [har2tree_container]
|
||||
else:
|
||||
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
|
||||
all_ressources_hashes: Set[str] = set()
|
||||
for urlnode in urlnodes:
|
||||
if hasattr(urlnode, 'resources_hashes'):
|
||||
all_ressources_hashes.update(urlnode.resources_hashes)
|
||||
return all_ressources_hashes
|
||||
|
||||
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Dict[str, Union[str, List[str]]]:
|
||||
all_ressources_hashes = self._get_resources_hashes(har2tree_container)
|
||||
# Get from local cache of known content all descriptions related to the ressources.
|
||||
known_content_table = dict(zip(all_ressources_hashes,
|
||||
self.redis.hmget('known_content', all_ressources_hashes)))
|
||||
|
||||
if hasattr(self, 'sanejs') and self.sanejs.available:
|
||||
# Query sanejs on the remaining ones
|
||||
to_lookup = [h for h, description in known_content_table.items() if not description]
|
||||
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
||||
libname, version, path = entry[0].split("|")
|
||||
known_content_table[h] = (libname, version, path, len(entry))
|
||||
return {h: details for h, details in known_content_table.items() if details}
|
||||
|
||||
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
|
||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||
if not capture_dir:
|
||||
|
@ -900,7 +905,7 @@ class Lookyloo():
|
|||
if not hostnode:
|
||||
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}')
|
||||
|
||||
known_content_table = self.find_known_content(hostnode)
|
||||
known_content = self.context.find_known_content(hostnode)
|
||||
|
||||
urls: List[Dict[str, Any]] = []
|
||||
for url in hostnode.urls:
|
||||
|
@ -908,7 +913,7 @@ class Lookyloo():
|
|||
# * https vs http
|
||||
# * everything after the domain
|
||||
# * the full URL
|
||||
legit_details = self.context.legitimacy_details(url, set(known_content_table.keys()))
|
||||
legit_details = self.context.legitimacy_details(url, known_content)
|
||||
to_append: Dict[str, Any] = {
|
||||
'encrypted': url.name.startswith('https'),
|
||||
'url_path': url.name.split('/', 3)[-1],
|
||||
|
@ -916,7 +921,6 @@ class Lookyloo():
|
|||
}
|
||||
|
||||
if not url.empty_response:
|
||||
to_append['legitimacy'] = legit_details.get(url.body_hash)
|
||||
# Index lookup
|
||||
# %%% Full body %%%
|
||||
freq = self.indexing.body_hash_fequency(url.body_hash)
|
||||
|
@ -926,7 +930,6 @@ class Lookyloo():
|
|||
|
||||
# %%% Embedded ressources %%%
|
||||
if hasattr(url, 'embedded_ressources') and url.embedded_ressources:
|
||||
# TODO: get entries from legit_details
|
||||
to_append['embedded_ressources'] = {}
|
||||
for mimetype, blobs in url.embedded_ressources.items():
|
||||
for h, blob in blobs:
|
||||
|
@ -935,15 +938,16 @@ class Lookyloo():
|
|||
continue
|
||||
freq_embedded = self.indexing.body_hash_fequency(h)
|
||||
to_append['embedded_ressources'][h] = freq_embedded
|
||||
to_append['embedded_ressources'][h]['body_size'] = blob.getbuffer().nbytes
|
||||
to_append['embedded_ressources'][h]['type'] = mimetype
|
||||
if freq_embedded['hash_freq'] > 1:
|
||||
to_append['embedded_ressources'][h]['other_captures'] = self.hash_lookup(h, url.name, capture_uuid)
|
||||
for h in to_append['embedded_ressources'].keys():
|
||||
if h in known_content_table:
|
||||
to_append['embedded_ressources'][h]['known_content'] = known_content_table[h]
|
||||
to_append['embedded_ressources'][h]['known_content'] = known_content.get(h)
|
||||
to_append['embedded_ressources'][h]['legitimacy'] = legit_details.get(h)
|
||||
|
||||
if url.body_hash in known_content_table:
|
||||
to_append['known_content'] = known_content_table[url.body_hash]
|
||||
to_append['known_content'] = known_content.get(url.body_hash)
|
||||
to_append['legitimacy'] = legit_details.get(url.body_hash)
|
||||
|
||||
# Optional: Cookies sent to server in request -> map to nodes who set the cookie in response
|
||||
if hasattr(url, 'cookies_sent'):
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
{% extends "main.html" %}
|
||||
{% from "macros.html" import known_content_details %}
|
||||
{% from "macros.html" import ressource_legitimacy_details %}
|
||||
{% from "macros.html" import indexed_hash %}
|
||||
{% from "macros.html" import indexed_cookies %}
|
||||
{% from "macros.html" import popup_icons %}
|
||||
|
@ -130,23 +131,7 @@
|
|||
{% if url['url_object'].empty_response %}
|
||||
Empty body.
|
||||
{% else %}
|
||||
{% if url['legitimacy'] and url['legitimacy'][0] == False %}
|
||||
<img src="/static/bomb.svg" title="Known malicious content in the response." width="21" height="21"/>
|
||||
{%endif%}
|
||||
Body size: {{ sizeof_fmt(url['url_object'].body.getbuffer().nbytes) }}
|
||||
{% if url['legitimacy'] %}
|
||||
{% if url['legitimacy'][0] %}
|
||||
- This file is known <b>legitimate</b>.
|
||||
{% elif url['legitimacy'][0] == False %}
|
||||
{% if url['legitimacy'][1] is iterable %}
|
||||
</br>
|
||||
The response sould be considered as <b>phishing</b> unless it is served by <b>the following domain(s)</b>: {{ ', '.join(url['legitimacy'][1]) }}
|
||||
</br>
|
||||
{% else %}
|
||||
- The response is known <b>malicious</b>.
|
||||
{%endif%}
|
||||
{%endif%}
|
||||
{%endif%}
|
||||
{{ ressource_legitimacy_details(url['legitimacy'], url['url_object'].body.getbuffer().nbytes) }}
|
||||
{%endif%}
|
||||
</div>
|
||||
|
||||
|
@ -227,6 +212,7 @@
|
|||
{% if details['known_content'] %}
|
||||
{{ known_content_details(details['known_content']) }}
|
||||
{% endif %}
|
||||
{{ ressource_legitimacy_details(details['legitimacy'], details['body_size']) }}
|
||||
<div>
|
||||
This file (<b>{{ details['type'] }}</b>) can be found <b>{{ details['hash_freq'] }}</b> times
|
||||
across all the captures on this lookyloo instance, in <b>{{ details['hash_domains_freq'] }}</b> unique domains.
|
||||
|
|
|
@ -12,6 +12,26 @@
|
|||
</div>
|
||||
{% endmacro %}
|
||||
|
||||
{% macro ressource_legitimacy_details(details, ressource_size) %}
|
||||
{% if details and details[0] == False %}
|
||||
<img src="/static/bomb.svg" title="Known malicious content in the response." width="21" height="21"/>
|
||||
{%endif%}
|
||||
Body size: {{ sizeof_fmt(ressource_size) }}
|
||||
{% if details %}
|
||||
{% if details[0] %}
|
||||
- This file is known <b>legitimate</b>.
|
||||
{% elif details[0] == False %}
|
||||
{% if details[1] is iterable %}
|
||||
</br>
|
||||
The response sould be considered as <b>phishing</b> unless it is served by <b>the following domain(s)</b>: {{ ', '.join(details[1]) }}
|
||||
</br>
|
||||
{% else %}
|
||||
- The response is known <b>malicious</b>.
|
||||
{%endif%}
|
||||
{%endif%}
|
||||
{%endif%}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro indexed_hash(details, identifier_for_toggle) %}
|
||||
{% set total_captures = details['different_url']|length + details['same_url']|length %}
|
||||
{# Only show details if the hits are in an other capture #}
|
||||
|
|
Loading…
Reference in New Issue