mirror of https://github.com/CIRCL/lookyloo
chg: Refactoring and normalizing the known hashes lookups
parent
70b2bbe2b7
commit
b16a5768ea
|
@ -15,7 +15,7 @@ from pathlib import Path
|
||||||
import pickle
|
import pickle
|
||||||
import smtplib
|
import smtplib
|
||||||
import socket
|
import socket
|
||||||
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable, Iterator
|
from typing import Union, Dict, List, Tuple, Optional, Any, MutableMapping, Set, Iterable
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
|
@ -138,16 +138,18 @@ class Indexing():
|
||||||
|
|
||||||
pipeline.execute()
|
pipeline.execute()
|
||||||
|
|
||||||
def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None) -> List[Tuple[str, str, str, bool]]:
|
def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None,
|
||||||
|
limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]:
|
||||||
to_return: List[Tuple[str, str, str, bool]] = []
|
to_return: List[Tuple[str, str, str, bool]] = []
|
||||||
for capture_uuid in self.redis.smembers(f'bh|{body_hash}|captures'):
|
all_captures = self.redis.smembers(f'bh|{body_hash}|captures')
|
||||||
|
for capture_uuid in list(all_captures)[:limit]:
|
||||||
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
|
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
|
||||||
url_uuid, hostnode_uuid, url = entry.split('|', 2)
|
url_uuid, hostnode_uuid, url = entry.split('|', 2)
|
||||||
if filter_url:
|
if filter_url:
|
||||||
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url))
|
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, url == filter_url))
|
||||||
else:
|
else:
|
||||||
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False))
|
to_return.append((capture_uuid, hostnode_uuid, urlsplit(url).hostname, False))
|
||||||
return to_return
|
return len(all_captures), to_return
|
||||||
|
|
||||||
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
|
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
|
||||||
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
|
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
|
||||||
|
@ -182,87 +184,122 @@ class Context():
|
||||||
p = self.redis.pipeline()
|
p = self.redis.pipeline()
|
||||||
for filename, file_content in load_known_content().items():
|
for filename, file_content in load_known_content().items():
|
||||||
if filename == 'generic':
|
if filename == 'generic':
|
||||||
|
# 1px images, files with spaces, empty => non-relevant stuff
|
||||||
for k, type_content in file_content.items():
|
for k, type_content in file_content.items():
|
||||||
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
|
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']})
|
||||||
elif filename == 'malicious':
|
elif filename == 'malicious':
|
||||||
|
# User defined as malicious
|
||||||
for h, details in file_content.items():
|
for h, details in file_content.items():
|
||||||
p.sadd('bh|malicious', h)
|
p.sadd('bh|malicious', h)
|
||||||
|
if 'target' in details:
|
||||||
|
p.sadd(f'{h}|target', *details['target'])
|
||||||
|
if 'tag' in details:
|
||||||
|
p.sadd(f'{h}|tag', *details['tag'])
|
||||||
elif filename == 'legitimate':
|
elif filename == 'legitimate':
|
||||||
|
# User defined as legitimate
|
||||||
for h, details in file_content.items():
|
for h, details in file_content.items():
|
||||||
if 'domain' in details and details['domain']:
|
if 'domain' in details and details['domain']:
|
||||||
p.sadd(f'bh|{h}|legitimate', *details['domain'])
|
p.sadd(f'bh|{h}|legitimate', *details['domain'])
|
||||||
elif 'description' in details:
|
elif 'description' in details:
|
||||||
p.hset('known_content', h, details['description'])
|
p.hset('known_content', h, details['description'])
|
||||||
else:
|
else:
|
||||||
|
# Full captures marked as legitimate
|
||||||
for h, details in file_content.items():
|
for h, details in file_content.items():
|
||||||
p.sadd(f'bh|{h}|legitimate', *details['hostnames'])
|
p.sadd(f'bh|{h}|legitimate', *details['hostnames'])
|
||||||
p.execute()
|
p.execute()
|
||||||
|
|
||||||
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Dict[str, Union[str, List[str]]]:
|
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode]) -> Dict[str, Any]:
|
||||||
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)"""
|
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)"""
|
||||||
all_ressources_hashes = self._get_resources_hashes(har2tree_container)
|
to_lookup: Set[str] = self._get_resources_hashes(har2tree_container)
|
||||||
# Get from local cache of known content all descriptions related to the ressources.
|
known_content_table: Dict[str, Any] = {}
|
||||||
if not all_ressources_hashes:
|
if not to_lookup:
|
||||||
return {}
|
return known_content_table
|
||||||
known_content_table = dict(zip(all_ressources_hashes,
|
# get generic known content
|
||||||
self.redis.hmget('known_content', all_ressources_hashes)))
|
known_in_generic = zip(to_lookup, self.redis.hmget('known_content', to_lookup))
|
||||||
|
for h, details in known_in_generic:
|
||||||
|
if not details:
|
||||||
|
continue
|
||||||
|
known_content_table[h] = {'type': 'generic', 'details': details}
|
||||||
|
|
||||||
if self.sanejs and self.sanejs.available:
|
to_lookup = to_lookup - set(known_content_table.keys())
|
||||||
|
if not to_lookup:
|
||||||
|
return known_content_table
|
||||||
|
|
||||||
|
# get known malicious
|
||||||
|
for h in to_lookup:
|
||||||
|
if self.redis.sismember('bh|malicious', h):
|
||||||
|
known_content_table[h] = {'type': 'malicious', 'details': {}}
|
||||||
|
targets = self.redis.smembers(f'{h}|target')
|
||||||
|
tags = self.redis.smembers(f'{h}|tag')
|
||||||
|
if targets:
|
||||||
|
known_content_table[h]['details']['target'] = targets
|
||||||
|
if tags:
|
||||||
|
known_content_table[h]['details']['tag'] = tags
|
||||||
|
|
||||||
|
to_lookup = to_lookup - set(known_content_table.keys())
|
||||||
|
if not to_lookup:
|
||||||
|
return known_content_table
|
||||||
|
|
||||||
|
# get known legitimate with domain
|
||||||
|
for h in to_lookup:
|
||||||
|
domains = self.redis.smembers(f'bh|{h}|legitimate')
|
||||||
|
if not domains:
|
||||||
|
continue
|
||||||
|
known_content_table[h] = {'type': 'legitimate_on_domain', 'details': domains}
|
||||||
|
|
||||||
|
to_lookup = to_lookup - set(known_content_table.keys())
|
||||||
|
if not to_lookup:
|
||||||
|
return known_content_table
|
||||||
|
|
||||||
|
if to_lookup and self.sanejs and self.sanejs.available:
|
||||||
# Query sanejs on the remaining ones
|
# Query sanejs on the remaining ones
|
||||||
to_lookup = [h for h, description in known_content_table.items() if not description]
|
|
||||||
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
for h, entry in self.sanejs.hashes_lookup(to_lookup).items():
|
||||||
libname, version, path = entry[0].split("|")
|
libname, version, path = entry[0].split("|")
|
||||||
known_content_table[h] = (libname, version, path, len(entry))
|
known_content_table[h] = {'type': 'sanejs',
|
||||||
return {h: details for h, details in known_content_table.items() if details}
|
'details': (libname, version, path, len(entry))}
|
||||||
|
return known_content_table
|
||||||
def _filter(self, urlnodes: Union[URLNode, List[URLNode]], known_hashes: Iterable[str]) -> Iterator[Tuple[URLNode, str]]:
|
|
||||||
if isinstance(urlnodes, URLNode):
|
|
||||||
_urlnodes = [urlnodes]
|
|
||||||
else:
|
|
||||||
_urlnodes = urlnodes
|
|
||||||
for urlnode in _urlnodes:
|
|
||||||
for h in urlnode.resources_hashes:
|
|
||||||
if h not in known_hashes:
|
|
||||||
yield urlnode, h
|
|
||||||
|
|
||||||
def store_known_legitimate_tree(self, tree: CrawledTree):
|
def store_known_legitimate_tree(self, tree: CrawledTree):
|
||||||
known_content = self.find_known_content(tree)
|
known_content = self.find_known_content(tree)
|
||||||
urlnodes = tree.root_hartree.url_tree.traverse()
|
capture_file: Path = get_homedir() / 'known_content' / f'{urlsplit(tree.root_url).hostname}.json'
|
||||||
root_hostname = urlsplit(tree.root_url).hostname
|
if capture_file.exists():
|
||||||
known_content_file: Path = get_homedir() / 'known_content' / f'{root_hostname}.json'
|
with open(capture_file) as f:
|
||||||
if known_content_file.exists():
|
|
||||||
with open(known_content_file) as f:
|
|
||||||
to_store = json.load(f)
|
to_store = json.load(f)
|
||||||
else:
|
else:
|
||||||
to_store = {}
|
to_store = {}
|
||||||
for urlnode, h in self._filter(urlnodes, known_content):
|
for urlnode in tree.root_hartree.url_tree.traverse():
|
||||||
mimetype = ''
|
for h in urlnode.resources_hashes:
|
||||||
if h != urlnode.body_hash:
|
if h in known_content and known_content[h]['type'] != 'malicious':
|
||||||
# this is the hash of an embeded content so it won't have a filename but has a different mimetype
|
# when we mark a tree as legitimate, we may get a hash that was marked
|
||||||
# FIXME: this is ugly.
|
# as malicious beforehand but turn out legitimate on that specific domain.
|
||||||
for ressource_mimetype, blobs in urlnode.embedded_ressources.items():
|
continue
|
||||||
for ressource_h, b in blobs:
|
mimetype = ''
|
||||||
if ressource_h == h:
|
if h != urlnode.body_hash:
|
||||||
mimetype = ressource_mimetype.split(';')[0]
|
# this is the hash of an embeded content so it won't have a filename but has a different mimetype
|
||||||
|
# FIXME: this is ugly.
|
||||||
|
for ressource_mimetype, blobs in urlnode.embedded_ressources.items():
|
||||||
|
for ressource_h, b in blobs:
|
||||||
|
if ressource_h == h:
|
||||||
|
mimetype = ressource_mimetype.split(';')[0]
|
||||||
|
break
|
||||||
|
if mimetype:
|
||||||
break
|
break
|
||||||
if mimetype:
|
else:
|
||||||
break
|
if urlnode.mimetype:
|
||||||
else:
|
mimetype = urlnode.mimetype.split(';')[0]
|
||||||
if urlnode.mimetype:
|
if h not in to_store:
|
||||||
mimetype = urlnode.mimetype.split(';')[0]
|
to_store[h] = {'filenames': set(), 'description': '', 'hostnames': set(), 'mimetype': mimetype}
|
||||||
if h not in to_store:
|
else:
|
||||||
to_store[h] = {'filenames': set(), 'description': '', 'hostnames': set(), 'mimetype': mimetype}
|
to_store[h]['filenames'] = set(to_store[h]['filenames'])
|
||||||
else:
|
to_store[h]['hostnames'] = set(to_store[h]['hostnames'])
|
||||||
to_store[h]['filenames'] = set(to_store[h]['filenames'])
|
|
||||||
to_store[h]['hostnames'] = set(to_store[h]['hostnames'])
|
|
||||||
|
|
||||||
to_store[h]['hostnames'].add(urlnode.hostname)
|
to_store[h]['hostnames'].add(urlnode.hostname)
|
||||||
if urlnode.url_split.path:
|
if urlnode.url_split.path:
|
||||||
filename = Path(urlnode.url_split.path).name
|
filename = Path(urlnode.url_split.path).name
|
||||||
if filename:
|
if filename:
|
||||||
to_store[h]['filenames'].add(filename)
|
to_store[h]['filenames'].add(filename)
|
||||||
|
|
||||||
with open(known_content_file, 'w') as f:
|
with open(capture_file, 'w') as f:
|
||||||
json.dump(to_store, f, indent=2, default=dump_to_json)
|
json.dump(to_store, f, indent=2, default=dump_to_json)
|
||||||
|
|
||||||
def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None:
|
def mark_as_legitimate(self, tree: CrawledTree, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> None:
|
||||||
|
@ -275,13 +312,23 @@ class Context():
|
||||||
self.store_known_legitimate_tree(tree)
|
self.store_known_legitimate_tree(tree)
|
||||||
known_content = self.find_known_content(tree)
|
known_content = self.find_known_content(tree)
|
||||||
pipeline = self.redis.pipeline()
|
pipeline = self.redis.pipeline()
|
||||||
for urlnode, h in self._filter(urlnodes, known_content):
|
for urlnode in urlnodes:
|
||||||
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
|
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
|
||||||
# They are expected to be on the same domain as urlnode. This code work as expected.
|
# They are expected to be on the same domain as urlnode. This code work as expected.
|
||||||
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
|
for h in urlnode.resources_hashes:
|
||||||
|
if h in known_content and known_content[h]['type'] != 'malicious':
|
||||||
|
# when we mark a tree as legitimate, we may get a hash that was marked
|
||||||
|
# as malicious beforehand but turn out legitimate on that specific domain.
|
||||||
|
continue
|
||||||
|
pipeline.sadd(f'bh|{h}|legitimate', urlnode.hostname)
|
||||||
pipeline.execute()
|
pipeline.execute()
|
||||||
|
|
||||||
def contextualize_tree(self, tree: CrawledTree) -> CrawledTree:
|
def contextualize_tree(self, tree: CrawledTree) -> CrawledTree:
|
||||||
|
"""Iterate through all the URL nodes in the tree, add context to Host nodes accordingly
|
||||||
|
* malicious: At least one URLnode in the Hostnode is marked as malicious
|
||||||
|
* legitimate: All the URLnodes in the Hostnode are marked as legitimate
|
||||||
|
* empty: All the the URLnodes in the Hostnode have an empty body in their response
|
||||||
|
"""
|
||||||
hostnodes_with_malicious_content = set()
|
hostnodes_with_malicious_content = set()
|
||||||
known_content = self.find_known_content(tree)
|
known_content = self.find_known_content(tree)
|
||||||
for urlnode in tree.root_hartree.url_tree.traverse():
|
for urlnode in tree.root_hartree.url_tree.traverse():
|
||||||
|
@ -290,28 +337,24 @@ class Context():
|
||||||
|
|
||||||
malicious = self.is_malicious(urlnode, known_content)
|
malicious = self.is_malicious(urlnode, known_content)
|
||||||
if malicious is True:
|
if malicious is True:
|
||||||
urlnode.add_feature('malicious', malicious)
|
urlnode.add_feature('malicious', True)
|
||||||
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
hostnodes_with_malicious_content.add(urlnode.hostnode_uuid)
|
||||||
elif malicious is False:
|
elif malicious is False:
|
||||||
# Marked as legitimate
|
# Marked as legitimate
|
||||||
urlnode.add_feature('legitimate', True)
|
urlnode.add_feature('legitimate', True)
|
||||||
elif not urlnode.empty_response and urlnode.body_hash in known_content:
|
else:
|
||||||
urlnode.add_feature('legitimate', True)
|
# malicious is None => we cannot say.
|
||||||
|
pass
|
||||||
for hostnode_with_malicious_content in hostnodes_with_malicious_content:
|
|
||||||
hostnode = tree.root_hartree.get_host_node_by_uuid(hostnode_with_malicious_content)
|
|
||||||
hostnode.add_feature('malicious', malicious)
|
|
||||||
|
|
||||||
for hostnode in tree.root_hartree.hostname_tree.traverse():
|
for hostnode in tree.root_hartree.hostname_tree.traverse():
|
||||||
if 'malicious' not in hostnode.features:
|
if hostnode.uuid in hostnodes_with_malicious_content:
|
||||||
if all(urlnode.empty_response for urlnode in hostnode.urls):
|
hostnode.add_feature('malicious', True)
|
||||||
hostnode.add_feature('all_empty', True)
|
elif all(urlnode.empty_response for urlnode in hostnode.urls):
|
||||||
continue
|
hostnode.add_feature('all_empty', True)
|
||||||
|
else:
|
||||||
legit = [urlnode.legitimate for urlnode in hostnode.urls if hasattr(urlnode, 'legitimate')]
|
legit = [True for urlnode in hostnode.urls if hasattr(urlnode, 'legitimate')]
|
||||||
if len(legit) == len(hostnode.urls) and all(legit):
|
if len(legit) == len(hostnode.urls):
|
||||||
hostnode.add_feature('legitimate', True)
|
hostnode.add_feature('legitimate', True)
|
||||||
|
|
||||||
return tree
|
return tree
|
||||||
|
|
||||||
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
|
def legitimate_body(self, body_hash: str, legitimate_hostname: str) -> None:
|
||||||
|
@ -381,28 +424,29 @@ class Context():
|
||||||
# Query DB
|
# Query DB
|
||||||
|
|
||||||
def is_legitimate(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Optional[bool]:
|
def is_legitimate(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Optional[bool]:
|
||||||
"""3 cases:
|
"""
|
||||||
|
If legitimate if generic, marked as legitimate or known on sanejs, loaded from the right domain
|
||||||
|
3 cases:
|
||||||
* True if *all* the contents are known legitimate
|
* True if *all* the contents are known legitimate
|
||||||
* False if *any* content is malicious
|
* False if *any* content is malicious
|
||||||
* None in all other cases
|
* None in all other cases
|
||||||
"""
|
"""
|
||||||
status: List[Optional[bool]] = []
|
status: List[Optional[bool]] = []
|
||||||
for urlnode, h in self._filter(urlnode, known_hashes):
|
for h in urlnode.resources_hashes:
|
||||||
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
|
# Note: we can have multiple hashes on the same urlnode (see embedded resources).
|
||||||
# They are expected to be on the same domain as urlnode. This code work as expected.
|
if h not in known_hashes:
|
||||||
if self.redis.sismember('bh|malicious', h):
|
# We do not return here, because we want to return False if
|
||||||
# Malicious, no need to go any further
|
# *any* of the contents is malicious
|
||||||
return False
|
|
||||||
hostnames = self.redis.smembers(f'bh|{h}|legitimate')
|
|
||||||
if hostnames:
|
|
||||||
if urlnode.hostname in hostnames:
|
|
||||||
status.append(True) # legitimate
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
return False # Malicious
|
|
||||||
else:
|
|
||||||
# NOTE: we do not return here, because we want to return False if *any* of the contents is malicious
|
|
||||||
status.append(None) # Unknown
|
status.append(None) # Unknown
|
||||||
|
elif known_hashes[h]['type'] == 'malicious':
|
||||||
|
return False
|
||||||
|
elif known_hashes[h]['type'] in ['generic', 'sanejs']:
|
||||||
|
status.append(True)
|
||||||
|
elif known_hashes[h]['type'] == 'legitimate_on_domain':
|
||||||
|
if urlnode.hostname in known_hashes[h]['details']:
|
||||||
|
status.append(True)
|
||||||
|
else:
|
||||||
|
return False
|
||||||
if status and all(status):
|
if status and all(status):
|
||||||
return True # All the contents are known legitimate
|
return True # All the contents are known legitimate
|
||||||
return None
|
return None
|
||||||
|
@ -420,21 +464,6 @@ class Context():
|
||||||
return True
|
return True
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def legitimacy_details(self, urlnode: URLNode, known_hashes: Iterable[str]) -> Dict[str, Tuple[bool, Optional[List[str]]]]:
|
|
||||||
to_return = {}
|
|
||||||
for urlnode, h in self._filter(urlnode, known_hashes):
|
|
||||||
# Note: we can have multiple hahes on the same urlnode (see embedded resources).
|
|
||||||
# They are expected to be on the same domain as urlnode. This code work as expected.
|
|
||||||
hostnames = self.redis.smembers(f'bh|{h}|legitimate')
|
|
||||||
if hostnames:
|
|
||||||
if urlnode.hostname in hostnames:
|
|
||||||
to_return[h] = (True, hostnames)
|
|
||||||
else:
|
|
||||||
to_return[h] = (False, hostnames)
|
|
||||||
elif self.redis.sismember('bh|malicious', urlnode.body_hash):
|
|
||||||
to_return[h] = (False, None)
|
|
||||||
return to_return
|
|
||||||
|
|
||||||
|
|
||||||
class Lookyloo():
|
class Lookyloo():
|
||||||
|
|
||||||
|
@ -1013,7 +1042,8 @@ class Lookyloo():
|
||||||
|
|
||||||
def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
|
def get_body_hash_investigator(self, body_hash: str) -> Tuple[List[Tuple[str, str]], List[Tuple[str, float]]]:
|
||||||
captures = []
|
captures = []
|
||||||
for capture_uuid, url_uuid, url_hostname, _ in self.indexing.get_body_hash_captures(body_hash):
|
total_captures, details = self.indexing.get_body_hash_captures(body_hash, limit=-1)
|
||||||
|
for capture_uuid, url_uuid, url_hostname, _ in details:
|
||||||
cache = self.capture_cache(capture_uuid)
|
cache = self.capture_cache(capture_uuid)
|
||||||
if cache:
|
if cache:
|
||||||
captures.append((capture_uuid, cache['title']))
|
captures.append((capture_uuid, cache['title']))
|
||||||
|
@ -1032,7 +1062,8 @@ class Lookyloo():
|
||||||
|
|
||||||
def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Dict[str, List[Tuple[str, str, str, str, str]]]:
|
def hash_lookup(self, blob_hash: str, url: str, capture_uuid: str) -> Dict[str, List[Tuple[str, str, str, str, str]]]:
|
||||||
captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []}
|
captures_list: Dict[str, List[Tuple[str, str, str, str, str]]] = {'same_url': [], 'different_url': []}
|
||||||
for h_capture_uuid, url_uuid, url_hostname, same_url in self.indexing.get_body_hash_captures(blob_hash, url):
|
total_captures, details = self.indexing.get_body_hash_captures(blob_hash, url)
|
||||||
|
for h_capture_uuid, url_uuid, url_hostname, same_url in details:
|
||||||
if h_capture_uuid == capture_uuid:
|
if h_capture_uuid == capture_uuid:
|
||||||
# Skip self.
|
# Skip self.
|
||||||
continue
|
continue
|
||||||
|
@ -1042,7 +1073,25 @@ class Lookyloo():
|
||||||
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
captures_list['same_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
||||||
else:
|
else:
|
||||||
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
captures_list['different_url'].append((h_capture_uuid, url_uuid, cache['title'], cache['timestamp'], url_hostname))
|
||||||
return captures_list
|
return total_captures, captures_list
|
||||||
|
|
||||||
|
def _normalize_known_content(self, h: str, known_content: Dict[str, Any], url: URLNode):
|
||||||
|
known: Optional[Union[str, List[Any]]] = None
|
||||||
|
legitimate: Optional[Tuple[bool, Any]] = None
|
||||||
|
if h not in known_content:
|
||||||
|
return known, legitimate
|
||||||
|
|
||||||
|
if known_content[h]['type'] in ['generic', 'sanejs']:
|
||||||
|
known = known_content[h]['details']
|
||||||
|
elif known_content[h]['type'] == 'legitimate_on_domain':
|
||||||
|
legit = False
|
||||||
|
if url.hostname in known_content[h]['details']:
|
||||||
|
legit = True
|
||||||
|
legitimate = (legit, known_content[h]['details'])
|
||||||
|
elif known_content[h]['type'] == 'malicious':
|
||||||
|
legitimate = (False, known_content[h]['details'])
|
||||||
|
|
||||||
|
return known, legitimate
|
||||||
|
|
||||||
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
|
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]:
|
||||||
capture_dir = self.lookup_capture_dir(capture_uuid)
|
capture_dir = self.lookup_capture_dir(capture_uuid)
|
||||||
|
@ -1064,7 +1113,6 @@ class Lookyloo():
|
||||||
# * https vs http
|
# * https vs http
|
||||||
# * everything after the domain
|
# * everything after the domain
|
||||||
# * the full URL
|
# * the full URL
|
||||||
legit_details = self.context.legitimacy_details(url, known_content)
|
|
||||||
to_append: Dict[str, Any] = {
|
to_append: Dict[str, Any] = {
|
||||||
'encrypted': url.name.startswith('https'),
|
'encrypted': url.name.startswith('https'),
|
||||||
'url_path': url.name.split('/', 3)[-1],
|
'url_path': url.name.split('/', 3)[-1],
|
||||||
|
@ -1094,11 +1142,17 @@ class Lookyloo():
|
||||||
if freq_embedded['hash_freq'] > 1:
|
if freq_embedded['hash_freq'] > 1:
|
||||||
to_append['embedded_ressources'][h]['other_captures'] = self.hash_lookup(h, url.name, capture_uuid)
|
to_append['embedded_ressources'][h]['other_captures'] = self.hash_lookup(h, url.name, capture_uuid)
|
||||||
for h in to_append['embedded_ressources'].keys():
|
for h in to_append['embedded_ressources'].keys():
|
||||||
to_append['embedded_ressources'][h]['known_content'] = known_content.get(h)
|
known, legitimate = self._normalize_known_content(h, known_content, url)
|
||||||
to_append['embedded_ressources'][h]['legitimacy'] = legit_details.get(h)
|
if known:
|
||||||
|
to_append['embedded_ressources'][h]['known_content'] = known
|
||||||
|
elif legitimate:
|
||||||
|
to_append['embedded_ressources'][h]['legitimacy'] = legitimate
|
||||||
|
|
||||||
to_append['known_content'] = known_content.get(url.body_hash)
|
known, legitimate = self._normalize_known_content(url.body_hash, known_content, url)
|
||||||
to_append['legitimacy'] = legit_details.get(url.body_hash)
|
if known:
|
||||||
|
to_append['known_content'] = known
|
||||||
|
elif legitimate:
|
||||||
|
to_append['legitimacy'] = legitimate
|
||||||
|
|
||||||
# Optional: Cookies sent to server in request -> map to nodes who set the cookie in response
|
# Optional: Cookies sent to server in request -> map to nodes who set the cookie in response
|
||||||
if hasattr(url, 'cookies_sent'):
|
if hasattr(url, 'cookies_sent'):
|
||||||
|
|
|
@ -40,10 +40,10 @@ class SaneJavaScript():
|
||||||
today_dir = self.storage_dir / date.today().isoformat()
|
today_dir = self.storage_dir / date.today().isoformat()
|
||||||
today_dir.mkdir(parents=True, exist_ok=True)
|
today_dir.mkdir(parents=True, exist_ok=True)
|
||||||
sanejs_unknowns = today_dir / 'unknown'
|
sanejs_unknowns = today_dir / 'unknown'
|
||||||
unknown_hashes = []
|
unknown_hashes = set()
|
||||||
if sanejs_unknowns.exists():
|
if sanejs_unknowns.exists():
|
||||||
with sanejs_unknowns.open() as f:
|
with sanejs_unknowns.open() as f:
|
||||||
unknown_hashes = [line.strip() for line in f.readlines()]
|
unknown_hashes = set(line.strip() for line in f.readlines())
|
||||||
|
|
||||||
to_return: Dict[str, List[str]] = {}
|
to_return: Dict[str, List[str]] = {}
|
||||||
|
|
||||||
|
@ -52,6 +52,7 @@ class SaneJavaScript():
|
||||||
else:
|
else:
|
||||||
to_lookup = [h for h in hashes if (h not in unknown_hashes
|
to_lookup = [h for h in hashes if (h not in unknown_hashes
|
||||||
and not (today_dir / h).exists())]
|
and not (today_dir / h).exists())]
|
||||||
|
has_new_unknown = False
|
||||||
for h in to_lookup:
|
for h in to_lookup:
|
||||||
response = self.client.sha512(h)
|
response = self.client.sha512(h)
|
||||||
if 'error' in response:
|
if 'error' in response:
|
||||||
|
@ -63,7 +64,8 @@ class SaneJavaScript():
|
||||||
json.dump(response['response'], f)
|
json.dump(response['response'], f)
|
||||||
to_return[h] = response['response']
|
to_return[h] = response['response']
|
||||||
else:
|
else:
|
||||||
unknown_hashes.append(h)
|
has_new_unknown = True
|
||||||
|
unknown_hashes.add(h)
|
||||||
|
|
||||||
for h in hashes:
|
for h in hashes:
|
||||||
cached_path = today_dir / h
|
cached_path = today_dir / h
|
||||||
|
@ -73,8 +75,10 @@ class SaneJavaScript():
|
||||||
with cached_path.open() as f:
|
with cached_path.open() as f:
|
||||||
to_return[h] = json.load(f)
|
to_return[h] = json.load(f)
|
||||||
|
|
||||||
with sanejs_unknowns.open('w') as f:
|
if has_new_unknown:
|
||||||
f.writelines(f'{h}\n' for h in unknown_hashes)
|
with sanejs_unknowns.open('w') as f:
|
||||||
|
f.writelines(f'{h}\n' for h in unknown_hashes)
|
||||||
|
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,21 +19,28 @@
|
||||||
Body size: {{ sizeof_fmt(ressource_size) }}
|
Body size: {{ sizeof_fmt(ressource_size) }}
|
||||||
{% if details %}
|
{% if details %}
|
||||||
{% if details[0] %}
|
{% if details[0] %}
|
||||||
- This file is known <b>legitimate</b>.
|
- This file is known <b>legitimate</b> on the following domains: {{ ', '.join(details[1]) }}.
|
||||||
{% elif details[0] == False %}
|
{% elif details[0] == False %}
|
||||||
{% if details[1] is iterable %}
|
</br>
|
||||||
</br>
|
The response sould be considered as
|
||||||
The response sould be considered as <b>phishing</b> unless it is served by <b>the following domain(s)</b>: {{ ', '.join(details[1]) }}
|
{% if details[1] is mapping and details[1].get('tag') %}
|
||||||
</br>
|
<b>{{ ', '.join(details[1]['tag']) }}</b>
|
||||||
{% else %}
|
{% else %}
|
||||||
- The response is known <b>malicious</b>.
|
<b>phishing</b>
|
||||||
{%endif%}
|
{%endif%}
|
||||||
|
{% if details[1] is mapping and details[1].get('target') %}
|
||||||
|
unless it is served by <b>the following domain(s)</b>: {{ ', '.join(details[1]['target']) }}
|
||||||
|
{% else %}
|
||||||
|
unless it is served by <b>the following domain(s)</b>: {{ ', '.join(details[1]) }}
|
||||||
|
{%endif%}
|
||||||
|
</br>
|
||||||
{%endif%}
|
{%endif%}
|
||||||
{%endif%}
|
{%endif%}
|
||||||
{% endmacro %}
|
{% endmacro %}
|
||||||
|
|
||||||
{% macro indexed_hash(details, identifier_for_toggle) %}
|
{% macro indexed_hash(details, identifier_for_toggle) %}
|
||||||
{% set total_captures = details['different_url']|length + details['same_url']|length %}
|
{% set total_captures = details[0] %}
|
||||||
|
{% set other_captures = details[1] %}
|
||||||
{# Only show details if the hits are in an other capture #}
|
{# Only show details if the hits are in an other capture #}
|
||||||
{% if total_captures > 0 %}
|
{% if total_captures > 0 %}
|
||||||
<p>
|
<p>
|
||||||
|
@ -46,19 +53,19 @@ Body size: {{ sizeof_fmt(ressource_size) }}
|
||||||
{# Lists of other captures loading the same content... #}
|
{# Lists of other captures loading the same content... #}
|
||||||
<div class="collapse" id="captureslist_{{ identifier_for_toggle }}">
|
<div class="collapse" id="captureslist_{{ identifier_for_toggle }}">
|
||||||
<div class="card card-body">
|
<div class="card card-body">
|
||||||
{% if details['different_url']|length > 0 %}
|
{% if other_captures['different_url']|length > 0 %}
|
||||||
{# ... on other URLs #}
|
{# ... on other URLs #}
|
||||||
<div>
|
<div>
|
||||||
<p>The following captures get the same file from a <b>different URL</b></p>
|
<p>The following captures get the same file from a <b>different URL</b></p>
|
||||||
{{ other_captures_table(details['different_url']) }}
|
{{ other_captures_table(other_captures['different_url']) }}
|
||||||
</div>
|
</div>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</br>
|
</br>
|
||||||
{% if details['same_url']|length > 0 %}
|
{% if other_captures['same_url']|length > 0 %}
|
||||||
{# ... on the same URL #}
|
{# ... on the same URL #}
|
||||||
<div>
|
<div>
|
||||||
<p>The following captures get the same file from the <b>same URL</b></p>
|
<p>The following captures get the same file from the <b>same URL</b></p>
|
||||||
{{ other_captures_table(details['same_url']) }}
|
{{ other_captures_table(other_captures['same_url']) }}
|
||||||
</div>
|
</div>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
|
|
Loading…
Reference in New Issue