mirror of https://github.com/CIRCL/lookyloo
chg: Major refactoring of the contextualization of the body contents
parent
1b0cdde84e
commit
11598fc3ba
|
@ -15,7 +15,7 @@ from pathlib import Path
|
||||||
import pickle
|
import pickle
|
||||||
import smtplib
|
import smtplib
|
||||||
import socket
|
import socket
|
||||||
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable
|
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable, Iterator
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
|
@ -147,74 +147,90 @@ class Indexing():
|
||||||
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
|
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
|
||||||
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
|
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
|
||||||
|
|
||||||
def legitimate_capture(self, crawled_tree: CrawledTree) -> None:
|
|
||||||
pipeline = self.redis.pipeline()
|
|
||||||
for urlnode in crawled_tree.root_hartree.url_tree.traverse():
|
|
||||||
if urlnode.empty_response:
|
|
||||||
continue
|
|
||||||
pipeline.sadd(f'bh|{urlnode.body_hash}|legitimate', urlnode.hostname)
|
|
||||||
pipeline.execute()
|
|
||||||
|
|
||||||
def legitimate_hostnode(self, hostnode: HostNode) -> None:
|
class Context():
|
||||||
pipeline = self.redis.pipeline()
|
|
||||||
for urlnode in hostnode.urls:
|
|
||||||
if urlnode.empty_response:
|
|
||||||
continue
|
|
||||||
pipeline.sadd(f'bh|{urlnode.body_hash}|legitimate', urlnode.hostname)
|
|
||||||
pipeline.execute()
|
|
||||||
|
|
||||||
def legitimate_urlnode(self, urlnode: URLNode) -> None:
|
def __init__(self):
|
||||||
if urlnode.empty_response:
|
self.redis: Redis = Redis(unix_socket_path=get_socket_path('indexing'), db=1, decode_responses=True)
|
||||||
return
|
|
||||||
self.redis.sadd(f'bh|{urlnode.body_hash}|legitimate', urlnode.hostname)
|
def clear_context(self):
|
||||||
|
self.redis.flushdb()
|
||||||
|
|
||||||
|
def _filter(self, urlnodes: Union[URLNode, List[URLNode]], known_hashes: Set[str]) -> Iterator[Tuple[URLNode, str]]:
|
||||||
|
if isinstance(urlnodes, URLNode):
|
||||||
|
_urlnodes = [urlnodes]
|
||||||
|
else:
|
||||||
|
_urlnodes = urlnodes
|
||||||
|
for urlnode in _urlnodes:
|
||||||
|
for h in urlnode.resources_hashes:
|
||||||
|
if h not in known_hashes:
|
||||||
|
yield urlnode, h
|
||||||
|
|
||||||
|
def mark_as_legitimate(self, urlnodes: List[URLNode], known_hashes: Set[str]) -> None:
|
||||||
|
pipeline = self.redis.pipeline()
|
||||||
|
for urlnode, h in self._filter(urlnodes, known_hashes):
|
||||||
|
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
|
||||||
|
pipeline.execute()
|
||||||
|
|
||||||
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
|
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
|
||||||
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname)
|
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname)
|
||||||
|
|
||||||
def malicious_node(self, urlnode: URLNode) -> None:
|
def malicious_node(self, urlnode: URLNode, known_hashes: Set[str]) -> None:
|
||||||
if urlnode.empty_response:
|
for _, h in self._filter(urlnode, known_hashes):
|
||||||
return
|
self.redis.sadd('bh|malicious', h)
|
||||||
self.redis.sadd('bh|malicious', urlnode.body_hash)
|
|
||||||
|
|
||||||
# Query DB
|
# Query DB
|
||||||
|
|
||||||
def is_legitimate(self, urlnode: URLNode) -> Optional[bool]:
|
def is_legitimate(self, urlnode: URLNode, known_hashes: Set[str]) -> Optional[bool]:
|
||||||
if urlnode.empty_response:
|
"""3 cases:
|
||||||
return None
|
* True if *all* the contents are known legitimate
|
||||||
hostnames = self.redis.smembers(f'bh|{urlnode.body_hash}|legitimate')
|
* False if *any* content is malicious
|
||||||
|
* None in all other cases
|
||||||
|
"""
|
||||||
|
status: List[Optional[bool]] = []
|
||||||
|
for urlnode, h in self._filter(urlnode, known_hashes):
|
||||||
|
hostnames = self.redis.smembers(f'bh|{h}|legitimate')
|
||||||
if hostnames:
|
if hostnames:
|
||||||
if urlnode.hostname in hostnames:
|
if urlnode.hostname in hostnames:
|
||||||
return True # Legitimate
|
status.append(True) # legitimate
|
||||||
return False # Malicious
|
continue
|
||||||
elif self.redis.sismember('bh|malicious', urlnode.body_hash):
|
|
||||||
return False
|
|
||||||
return None # Unknown
|
|
||||||
|
|
||||||
def is_malicious(self, urlnode: URLNode) -> Optional[bool]:
|
|
||||||
if urlnode.empty_response:
|
|
||||||
return None
|
|
||||||
if self.redis.sismember('bh|malicious', urlnode.body_hash):
|
|
||||||
return True
|
|
||||||
legitimate = self.is_legitimate(urlnode)
|
|
||||||
if legitimate is True:
|
|
||||||
return False
|
|
||||||
if legitimate is False:
|
|
||||||
return True
|
|
||||||
return None
|
|
||||||
|
|
||||||
def legitimacy_details(self, urlnode: URLNode) -> Optional[Tuple[bool, Optional[List[str]]]]:
|
|
||||||
if urlnode.empty_response:
|
|
||||||
return None
|
|
||||||
hostnames = self.redis.smembers(f'bh|{urlnode.body_hash}|legitimate')
|
|
||||||
if hostnames:
|
|
||||||
if urlnode.hostname in hostnames:
|
|
||||||
return True, hostnames
|
|
||||||
else:
|
else:
|
||||||
return False, hostnames
|
return False # Malicious
|
||||||
elif self.redis.sismember('bh|malicious', urlnode.body_hash):
|
elif self.redis.sismember('bh|malicious', h):
|
||||||
return False, None
|
return False # Malicious
|
||||||
|
else:
|
||||||
|
# NOTE: we do not return here, because we want to return False if *any* of the contents is malicious
|
||||||
|
status.append(None) # Unknown
|
||||||
|
if status and all(status):
|
||||||
|
return True # All the contents are known legitimate
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def is_malicious(self, urlnode: URLNode, known_hashes: Set[str]) -> Optional[bool]:
|
||||||
|
"""3 cases:
|
||||||
|
* True if *any* content is malicious
|
||||||
|
* False if *all* the contents are known legitimate
|
||||||
|
* None in all other cases
|
||||||
|
"""
|
||||||
|
legitimate = self.is_legitimate(urlnode, known_hashes)
|
||||||
|
if legitimate:
|
||||||
|
return False
|
||||||
|
elif legitimate is False:
|
||||||
|
return True
|
||||||
|
return None
|
||||||
|
|
||||||
|
def legitimacy_details(self, urlnode: URLNode, known_hashes: Set[str]) -> Dict[str, Tuple[bool, Optional[List[str]]]]:
|
||||||
|
to_return = {}
|
||||||
|
for urlnode, h in self._filter(urlnode, known_hashes):
|
||||||
|
hostnames = self.redis.smembers(f'bh|{h}|legitimate')
|
||||||
|
if hostnames:
|
||||||
|
if urlnode.hostname in hostnames:
|
||||||
|
to_return[h] = (True, hostnames)
|
||||||
|
else:
|
||||||
|
to_return[h] = (False, hostnames)
|
||||||
|
elif self.redis.sismember('bh|malicious', urlnode.body_hash):
|
||||||
|
to_return[h] = (False, None)
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
|
||||||
class Lookyloo():
|
class Lookyloo():
|
||||||
|
|
||||||
|
@ -223,6 +239,7 @@ class Lookyloo():
|
||||||
self.configs: Dict[str, Dict[str, Any]] = load_configs()
|
self.configs: Dict[str, Dict[str, Any]] = load_configs()
|
||||||
self.logger.setLevel(self.get_config('loglevel'))
|
self.logger.setLevel(self.get_config('loglevel'))
|
||||||
self.indexing = Indexing()
|
self.indexing = Indexing()
|
||||||
|
self.context = Context()
|
||||||
self.is_public_instance = self.get_config('public_instance')
|
self.is_public_instance = self.get_config('public_instance')
|
||||||
|
|
||||||
self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
|
self.redis: Redis = Redis(unix_socket_path=get_socket_path('cache'), decode_responses=True)
|
||||||
|
@ -339,22 +356,21 @@ class Lookyloo():
|
||||||
|
|
||||||
def add_to_legitimate(self, capture_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None):
|
def add_to_legitimate(self, capture_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None):
|
||||||
ct = self.get_crawled_tree(capture_uuid)
|
ct = self.get_crawled_tree(capture_uuid)
|
||||||
if not hostnode_uuid and not urlnode_uuid:
|
known_content = self.find_known_content(ct)
|
||||||
self.indexing.legitimate_capture(ct)
|
|
||||||
return
|
|
||||||
|
|
||||||
if hostnode_uuid:
|
if hostnode_uuid:
|
||||||
hostnode = ct.root_hartree.get_host_node_by_uuid(hostnode_uuid)
|
urlnodes = ct.root_hartree.get_host_node_by_uuid(hostnode_uuid).urls
|
||||||
self.indexing.legitimate_hostnode(hostnode)
|
elif urlnode_uuid:
|
||||||
if urlnode_uuid:
|
urlnodes = [ct.root_hartree.get_url_node_by_uuid(urlnode_uuid)]
|
||||||
urlnode = ct.root_hartree.get_url_node_by_uuid(urlnode_uuid)
|
else:
|
||||||
self.indexing.legitimate_urlnode(urlnode)
|
urlnodes = ct.root_hartree.url_tree.traverse()
|
||||||
|
self.context.mark_as_legitimate(urlnodes, set(known_content.keys()))
|
||||||
|
|
||||||
def bodies_legitimacy_check(self, tree: CrawledTree) -> CrawledTree:
|
def bodies_legitimacy_check(self, tree: CrawledTree) -> CrawledTree:
|
||||||
hostnodes_with_malicious_content = set()
|
hostnodes_with_malicious_content = set()
|
||||||
|
known_content = self.find_known_content(tree)
|
||||||
for urlnode in tree.root_hartree.url_tree.traverse():
|
for urlnode in tree.root_hartree.url_tree.traverse():
|
||||||
malicious = self.indexing.is_malicious(urlnode)
|
malicious = self.context.is_malicious(urlnode, set(known_content.keys()))
|
||||||
if malicious is not None:
|
if malicious is True:
|
||||||
urlnode.add_feature('malicious', malicious)
|
urlnode.add_feature('malicious', malicious)
|
||||||
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
||||||
for hostnode_with_malicious_content in hostnodes_with_malicious_content:
|
for hostnode_with_malicious_content in hostnodes_with_malicious_content:
|
||||||
|
@ -843,6 +859,35 @@ class Lookyloo():
|
||||||
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
||||||
return captures_list
|
return captures_list
|
||||||
|
|
||||||
|
def _get_resources_hashes(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Set[str]:
|
||||||
|
if isinstance(har2tree_container, CrawledTree):
|
||||||
|
urlnodes = har2tree_container.root_hartree.url_tree.traverse()
|
||||||
|
elif isinstance(har2tree_container, HostNode):
|
||||||
|
urlnodes = har2tree_container.urls
|
||||||
|
elif isinstance(har2tree_container, URLNode):
|
||||||
|
urlnodes = [har2tree_container]
|
||||||
|
else:
|
||||||
|
raise Exception(f'har2tree_container cannot be {type(har2tree_container)}')
|
||||||
|
all_ressources_hashes: Set[str] = set()
|
||||||
|
for urlnode in urlnodes:
|
||||||
|
if hasattr(urlnode, 'resources_hashes'):
|
||||||
|
all_ressources_hashes.update(urlnode.resources_hashes)
|
||||||
|
return all_ressources_hashes
|
||||||
|
|
||||||
|
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Dict[str, Union[str, List[str]]]:
|
||||||
|
all_ressources_hashes = self._get_resources_hashes(har2tree_container)
|
||||||
|
# Get from local cache of known content all descriptions related to the ressources.
|
||||||
|
known_content_table = dict(zip(all_ressources_hashes,
|
||||||
|
self.redis.hmget('known_content', all_ressources_hashes)))
|
||||||
|
|
||||||
|
if hasattr(self, 'sanejs') and self.sanejs.available:
|
||||||
|
# Query sanejs on the remaining ones
|
||||||
|
to_lookup = [h for h, description in known_content_table.items() if not description]
|
||||||
|
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
||||||
|
libname, version, path = entry[0].split("|")
|
||||||
|
known_content_table[h] = (libname, version, path, len(entry))
|
||||||
|
return {h: details for h, details in known_content_table.items() if details}
|
||||||
|
|
||||||
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
|
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
|
||||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||||
if not capture_dir:
|
if not capture_dir:
|
||||||
|
@ -855,25 +900,7 @@ class Lookyloo():
|
||||||
if not hostnode:
|
if not hostnode:
|
||||||
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}')
|
raise MissingUUID(f'Unable to find UUID {node_uuid} in {capture_dir}')
|
||||||
|
|
||||||
# Gather all the ressources in the hostnode.
|
known_content_table = self.find_known_content(hostnode)
|
||||||
all_ressources_hashes = set()
|
|
||||||
for url in hostnode.urls:
|
|
||||||
if hasattr(url, 'body_hash'):
|
|
||||||
all_ressources_hashes.add(url.body_hash)
|
|
||||||
if hasattr(url, 'embedded_ressources') and url.embedded_ressources:
|
|
||||||
for mimetype, blobs in url.embedded_ressources.items():
|
|
||||||
all_ressources_hashes.update([h for h, b in blobs])
|
|
||||||
|
|
||||||
# Get from local cache of known content all descriptions related to the ressources.
|
|
||||||
known_content_table = dict(zip(all_ressources_hashes,
|
|
||||||
self.redis.hmget('known_content', all_ressources_hashes)))
|
|
||||||
|
|
||||||
if hasattr(self, 'sanejs') and self.sanejs.available:
|
|
||||||
# Query sanejs on the remaining ones
|
|
||||||
to_lookup = [h for h, description in known_content_table.items() if not description]
|
|
||||||
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
|
||||||
libname, version, path = entry[0].split("|")
|
|
||||||
known_content_table[h] = (libname, version, path, len(entry))
|
|
||||||
|
|
||||||
urls: List[Dict[str, Any]] = []
|
urls: List[Dict[str, Any]] = []
|
||||||
for url in hostnode.urls:
|
for url in hostnode.urls:
|
||||||
|
@ -881,14 +908,15 @@ class Lookyloo():
|
||||||
# * https vs http
|
# * https vs http
|
||||||
# * everything after the domain
|
# * everything after the domain
|
||||||
# * the full URL
|
# * the full URL
|
||||||
|
legit_details = self.context.legitimacy_details(url, set(known_content_table.keys()))
|
||||||
to_append: Dict[str, Any] = {
|
to_append: Dict[str, Any] = {
|
||||||
'encrypted': url.name.startswith('https'),
|
'encrypted': url.name.startswith('https'),
|
||||||
'url_path': url.name.split('/', 3)[-1],
|
'url_path': url.name.split('/', 3)[-1],
|
||||||
'url_object': url,
|
'url_object': url,
|
||||||
'legitimacy': self.indexing.legitimacy_details(url)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if not url.empty_response:
|
if not url.empty_response:
|
||||||
|
to_append['legitimacy'] = legit_details.get(url.body_hash)
|
||||||
# Index lookup
|
# Index lookup
|
||||||
# %%% Full body %%%
|
# %%% Full body %%%
|
||||||
freq = self.indexing.body_hash_fequency(url.body_hash)
|
freq = self.indexing.body_hash_fequency(url.body_hash)
|
||||||
|
@ -898,6 +926,7 @@ class Lookyloo():
|
||||||
|
|
||||||
# %%% Embedded ressources %%%
|
# %%% Embedded ressources %%%
|
||||||
if hasattr(url, 'embedded_ressources') and url.embedded_ressources:
|
if hasattr(url, 'embedded_ressources') and url.embedded_ressources:
|
||||||
|
# TODO: get entries from legit_details
|
||||||
to_append['embedded_ressources'] = {}
|
to_append['embedded_ressources'] = {}
|
||||||
for mimetype, blobs in url.embedded_ressources.items():
|
for mimetype, blobs in url.embedded_ressources.items():
|
||||||
for h, blob in blobs:
|
for h, blob in blobs:
|
||||||
|
|
Loading…
Reference in New Issue