|
|
|
@ -136,11 +136,11 @@ class Indexing(): |
|
|
|
|
|
|
|
pipeline.execute() |
|
|
|
|
|
|
|
def get_hash_uuids(self, body_hash: str) -> Tuple[str, str]: |
|
|
|
def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]: |
|
|
|
capture_uuid = self.redis.srandmember(f'bh|{body_hash}|captures') |
|
|
|
entry = self.redis.zrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, 1)[0] |
|
|
|
urlnode_uuid, hostnode_uuid, url = entry.split('|', 2) |
|
|
|
return capture_uuid, urlnode_uuid |
|
|
|
return capture_uuid, urlnode_uuid, hostnode_uuid |
|
|
|
|
|
|
|
def get_body_hash_captures(self, body_hash: str, filter_url: Optional[str]=None, |
|
|
|
limit: int=20) -> Tuple[int, List[Tuple[str, str, str, bool]]]: |
|
|
|
@ -185,39 +185,40 @@ class Context(): |
|
|
|
return all_ressources_hashes |
|
|
|
|
|
|
|
def _cache_known_content(self) -> None: |
|
|
|
p = self.redis.pipeline() |
|
|
|
for filename, file_content in load_known_content().items(): |
|
|
|
if filename == 'generic': |
|
|
|
# 1px images, files with spaces, empty => non-relevant stuff |
|
|
|
for k, type_content in file_content.items(): |
|
|
|
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']}) |
|
|
|
elif filename == 'malicious': |
|
|
|
# User defined as malicious |
|
|
|
for h, details in file_content.items(): |
|
|
|
p.sadd('bh|malicious', h) |
|
|
|
if 'target' in details and details['target']: |
|
|
|
p.sadd(f'{h}|target', *details['target']) |
|
|
|
if 'tag' in details and details['tag']: |
|
|
|
p.sadd(f'{h}|tag', *details['tag']) |
|
|
|
elif filename == 'legitimate': |
|
|
|
# User defined as legitimate |
|
|
|
for h, details in file_content.items(): |
|
|
|
if 'domain' in details and details['domain']: |
|
|
|
p.sadd(f'bh|{h}|legitimate', *details['domain']) |
|
|
|
elif 'description' in details: |
|
|
|
p.hset('known_content', h, details['description']) |
|
|
|
else: |
|
|
|
# Full captures marked as legitimate |
|
|
|
for h, details in file_content.items(): |
|
|
|
p.sadd(f'bh|{h}|legitimate', *details['hostnames']) |
|
|
|
p.execute() |
|
|
|
for dirname in ['known_content', 'known_content_user']: |
|
|
|
for filename, file_content in load_known_content(dirname).items(): |
|
|
|
p = self.redis.pipeline() |
|
|
|
if filename == 'generic': |
|
|
|
# 1px images, files with spaces, empty => non-relevant stuff |
|
|
|
for k, type_content in file_content.items(): |
|
|
|
p.hmset('known_content', {h: type_content['description'] for h in type_content['entries']}) |
|
|
|
elif filename == 'malicious': |
|
|
|
# User defined as malicious |
|
|
|
for h, details in file_content.items(): |
|
|
|
p.sadd('bh|malicious', h) |
|
|
|
if 'target' in details and details['target']: |
|
|
|
p.sadd(f'{h}|target', *details['target']) |
|
|
|
if 'tag' in details and details['tag']: |
|
|
|
p.sadd(f'{h}|tag', *details['tag']) |
|
|
|
elif filename == 'legitimate': |
|
|
|
# User defined as legitimate |
|
|
|
for h, details in file_content.items(): |
|
|
|
if 'domain' in details and details['domain']: |
|
|
|
p.sadd(f'bh|{h}|legitimate', *details['domain']) |
|
|
|
elif 'description' in details: |
|
|
|
p.hset('known_content', h, details['description']) |
|
|
|
else: |
|
|
|
# Full captures marked as legitimate |
|
|
|
for h, details in file_content.items(): |
|
|
|
p.sadd(f'bh|{h}|legitimate', *details['hostnames']) |
|
|
|
p.execute() |
|
|
|
|
|
|
|
def find_known_content(self, har2tree_container: Union[CrawledTree, HostNode, URLNode, str]) -> Dict[str, Any]: |
|
|
|
"""Return a dictionary of content resources found in the local known_content database, or in SaneJS (if enabled)""" |
|
|
|
if isinstance(har2tree_container, str): |
|
|
|
to_lookup: Set[str] = {har2tree_container, } |
|
|
|
else: |
|
|
|
to_lookup: Set[str] = self._get_resources_hashes(har2tree_container) |
|
|
|
to_lookup = self._get_resources_hashes(har2tree_container) |
|
|
|
known_content_table: Dict[str, Any] = {} |
|
|
|
if not to_lookup: |
|
|
|
return known_content_table |
|
|
|
@ -268,7 +269,7 @@ class Context(): |
|
|
|
|
|
|
|
def store_known_legitimate_tree(self, tree: CrawledTree): |
|
|
|
known_content = self.find_known_content(tree) |
|
|
|
capture_file: Path = get_homedir() / 'known_content' / f'{urlsplit(tree.root_url).hostname}.json' |
|
|
|
capture_file: Path = get_homedir() / 'known_content_user' / f'{urlsplit(tree.root_url).hostname}.json' |
|
|
|
if capture_file.exists(): |
|
|
|
with open(capture_file) as f: |
|
|
|
to_store = json.load(f) |
|
|
|
@ -368,7 +369,7 @@ class Context(): |
|
|
|
self.redis.sadd(f'bh|{body_hash}|legitimate', legitimate_hostname) |
|
|
|
|
|
|
|
def store_known_malicious_ressource(self, ressource_hash: str, details: Dict[str, str]): |
|
|
|
known_malicious_ressource_file = get_homedir() / 'known_content' / 'malicious.json' |
|
|
|
known_malicious_ressource_file = get_homedir() / 'known_content_user' / 'malicious.json' |
|
|
|
if known_malicious_ressource_file.exists(): |
|
|
|
with open(known_malicious_ressource_file) as f: |
|
|
|
to_store = json.load(f) |
|
|
|
@ -400,7 +401,7 @@ class Context(): |
|
|
|
p.execute() |
|
|
|
|
|
|
|
def store_known_legitimate_ressource(self, ressource_hash: str, details: Dict[str, str]): |
|
|
|
known_legitimate_ressource_file = get_homedir() / 'known_content' / 'legitimate.json' |
|
|
|
known_legitimate_ressource_file = get_homedir() / 'known_content_user' / 'legitimate.json' |
|
|
|
if known_legitimate_ressource_file.exists(): |
|
|
|
with open(known_legitimate_ressource_file) as f: |
|
|
|
to_store = json.load(f) |
|
|
|
@ -1115,6 +1116,7 @@ class Lookyloo(): |
|
|
|
for ressource_h, blob in blobs: |
|
|
|
if ressource_h == h: |
|
|
|
return 'embedded_ressource.bin', blob |
|
|
|
return None |
|
|
|
|
|
|
|
def get_hostnode_investigator(self, capture_uuid: str, node_uuid: str) -> Tuple[HostNode, List[Dict[str, Any]]]: |
|
|
|
capture_dir = self.lookup_capture_dir(capture_uuid) |
|
|
|
|