new: Hash lookup method

pull/105/head
Raphaël Vinot 2020-10-23 20:51:15 +02:00
parent c6c4da981c
commit bdc0488e38
3 changed files with 49 additions and 2 deletions

View File

@ -3,6 +3,7 @@
from urllib.parse import urlsplit from urllib.parse import urlsplit
from typing import List, Tuple, Set, Dict, Optional from typing import List, Tuple, Set, Dict, Optional
from collections import defaultdict
from redis import Redis from redis import Redis
from har2tree import CrawledTree from har2tree import CrawledTree
@ -104,8 +105,8 @@ class Indexing():
# set of all captures with this hash # set of all captures with this hash
pipeline.sadd(f'bh|{h}|captures', crawled_tree.uuid) pipeline.sadd(f'bh|{h}|captures', crawled_tree.uuid)
# ZSet of all urlnode_UUIDs|full_url # ZSet of all urlnode_UUIDs|full_url
pipeline.zincrby(f'bh|{h}|captures|{crawled_tree.uuid}', 1, f'{urlnode.uuid}|{urlnode.hostnode_uuid}|{urlnode.name}') pipeline.zincrby(f'bh|{h}|captures|{crawled_tree.uuid}', 1,
f'{urlnode.uuid}|{urlnode.hostnode_uuid}|{urlnode.name}')
pipeline.execute() pipeline.execute()
def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]: def get_hash_uuids(self, body_hash: str) -> Tuple[str, str, str]:
@ -136,3 +137,12 @@ class Indexing():
def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]: def get_body_hash_domains(self, body_hash: str) -> List[Tuple[str, float]]:
return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True) return self.redis.zrevrange(f'bh|{body_hash}', 0, -1, withscores=True)
def get_body_hash_urls(self, body_hash: str) -> Dict[str, List[Dict[str, str]]]:
all_captures: Set[str] = self.redis.smembers(f'bh|{body_hash}|captures') # type: ignore
urls = defaultdict(list)
for capture_uuid in list(all_captures):
for entry in self.redis.zrevrange(f'bh|{body_hash}|captures|{capture_uuid}', 0, -1):
url_uuid, hostnode_uuid, url = entry.split('|', 2)
urls[url].append({'capture': capture_uuid, 'hostnode': hostnode_uuid, 'urlnode': url_uuid})
return urls

View File

@ -657,6 +657,32 @@ class Lookyloo():
domains = self.indexing.get_body_hash_domains(body_hash) domains = self.indexing.get_body_hash_domains(body_hash)
return captures, domains return captures, domains
def get_body_hash_full(self, body_hash: str) -> Tuple[Dict[str, List[Dict[str, str]]], BytesIO]:
details = self.indexing.get_body_hash_urls(body_hash)
body_content = BytesIO()
# get the body from the first entry in the details list
for url, entries in details.items():
capture_dir = self.lookup_capture_dir(entries[0]['capture'])
if not capture_dir:
raise MissingUUID(f"Unable to find {entries[0]['capture']}")
ct = load_pickle_tree(capture_dir)
if not ct:
raise MissingUUID(f'Unable to find {capture_dir}')
urlnode = ct.root_hartree.get_url_node_by_uuid(entries[0]['urlnode'])
if urlnode.body_hash == body_hash:
# the hash we're looking for is the whole file
body_content = urlnode.body
else:
# The hash is an embedded resource
for mimetype, blobs in urlnode.body_hash.embedded_ressources.items():
for h, b in blobs:
if h == body_hash:
body_content = b
break
break
return details, body_content
def get_cookie_name_investigator(self, cookie_name: str): def get_cookie_name_investigator(self, cookie_name: str):
captures = [] captures = []
for capture_uuid, url_uuid in self.indexing.get_cookies_names_captures(cookie_name): for capture_uuid, url_uuid in self.indexing.get_cookies_names_captures(cookie_name):

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64
from zipfile import ZipFile, ZIP_DEFLATED from zipfile import ZipFile, ZIP_DEFLATED
from io import BytesIO from io import BytesIO
import os import os
@ -599,3 +600,13 @@ def json_redirects(tree_uuid: str):
to_return['response']['redirects'] = cache['redirects'] to_return['response']['redirects'] = cache['redirects']
return jsonify(to_return) return jsonify(to_return)
@app.route('/json/hash_info/<h>', methods=['GET'])
def json_hash_info(h: str):
details, body = lookyloo.get_body_hash_full(h)
if not details:
return {'error': 'Unknown Hash.'}
to_return: Dict[str, Any] = {'response': {'hash': h, 'details': details,
'body': base64.b64encode(body.getvalue()).decode()}}
return jsonify(to_return)