mirror of https://github.com/CIRCL/lookyloo
chg: Improve url and hostnames search
parent
b3dfffe2a0
commit
1f9f5f1a9a
|
@ -311,6 +311,8 @@ def load_pickle_tree(capture_dir: Path) -> Optional[CrawledTree]:
|
||||||
return pickle.load(_p)
|
return pickle.load(_p)
|
||||||
except pickle.UnpicklingError:
|
except pickle.UnpicklingError:
|
||||||
remove_pickle_tree(capture_dir)
|
remove_pickle_tree(capture_dir)
|
||||||
|
except EOFError:
|
||||||
|
remove_pickle_tree(capture_dir)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -815,40 +815,64 @@ class Lookyloo():
|
||||||
break
|
break
|
||||||
return details, body_content
|
return details, body_content
|
||||||
|
|
||||||
def get_url_occurrences(self, url: str):
|
def get_url_occurrences(self, url: str, limit: int=20) -> List[Dict]:
|
||||||
'''Get all the captures and URL nodes the URL has been seen on the instance.'''
|
'''Get the most recent captures and URL nodes where the URL has been seen.'''
|
||||||
capture_uuids = self.indexing.get_captures_url(url)
|
captures: List[CaptureCache] = []
|
||||||
to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids}
|
for uuid in self.indexing.get_captures_url(url):
|
||||||
for capture_uuid in capture_uuids:
|
c = self.capture_cache(uuid)
|
||||||
ct = self.get_crawled_tree(capture_uuid)
|
if not c:
|
||||||
to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat()
|
continue
|
||||||
to_return[capture_uuid]['urlnodes'] = {}
|
if hasattr(c, 'timestamp'):
|
||||||
|
captures.append(c)
|
||||||
|
captures.sort(key=operator.attrgetter('timestamp'), reverse=True)
|
||||||
|
|
||||||
|
to_return: List[Dict] = []
|
||||||
|
for capture in captures[:limit]:
|
||||||
|
ct = self.get_crawled_tree(capture.uuid)
|
||||||
|
to_append: Dict[str, Union[str, Dict]] = {'capture_uuid': capture.uuid,
|
||||||
|
'start_timestamp': capture.timestamp.isoformat()}
|
||||||
|
urlnodes: Dict[str, Dict[str, str]] = {}
|
||||||
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
|
for urlnode in ct.root_hartree.url_tree.search_nodes(name=url):
|
||||||
to_return[capture_uuid]['urlnodes'][urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
|
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
|
||||||
'hostnode_uuid': urlnode.hostnode_uuid}
|
'hostnode_uuid': urlnode.hostnode_uuid}
|
||||||
if hasattr(urlnode, 'body_hash'):
|
if hasattr(urlnode, 'body_hash'):
|
||||||
to_return[capture_uuid]['urlnodes'][urlnode.uuid]['hash'] = urlnode.body_hash
|
urlnodes[urlnode.uuid]['hash'] = urlnode.body_hash
|
||||||
|
to_append['urlnodes'] = urlnodes
|
||||||
|
to_return.append(to_append)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False):
|
def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False, limit: int=20) -> List[Dict]:
|
||||||
'''Get all the captures and URL nodes the hostname has been seen on the instance.'''
|
'''Get the most recent captures and URL nodes where the hostname has been seen.'''
|
||||||
capture_uuids = self.indexing.get_captures_hostname(hostname)
|
captures: List[CaptureCache] = []
|
||||||
to_return: Dict[str, Dict] = {cuuid: {} for cuuid in capture_uuids}
|
for uuid in self.indexing.get_captures_hostname(hostname):
|
||||||
for capture_uuid in capture_uuids:
|
c = self.capture_cache(uuid)
|
||||||
ct = self.get_crawled_tree(capture_uuid)
|
if not c:
|
||||||
to_return[capture_uuid]['start_timestamp'] = ct.root_hartree.start_time.isoformat()
|
continue
|
||||||
to_return[capture_uuid]['hostnodes'] = []
|
if hasattr(c, 'timestamp'):
|
||||||
|
captures.append(c)
|
||||||
|
captures.sort(key=operator.attrgetter('timestamp'), reverse=True)
|
||||||
|
|
||||||
|
to_return: List[Dict] = []
|
||||||
|
for capture in captures[:limit]:
|
||||||
|
ct = self.get_crawled_tree(capture.uuid)
|
||||||
|
to_append: Dict[str, Union[str, List, Dict]] = {'capture_uuid': capture.uuid,
|
||||||
|
'start_timestamp': capture.timestamp.isoformat()}
|
||||||
|
hostnodes: List[str] = []
|
||||||
if with_urls_occurrences:
|
if with_urls_occurrences:
|
||||||
to_return[capture_uuid]['urlnodes'] = {}
|
urlnodes: Dict[str, Dict[str, str]] = {}
|
||||||
for hostnode in ct.root_hartree.hostname_tree.search_nodes(name=hostname):
|
for hostnode in ct.root_hartree.hostname_tree.search_nodes(name=hostname):
|
||||||
to_return[capture_uuid]['hostnodes'].append(hostnode.uuid)
|
hostnodes.append(hostnode.uuid)
|
||||||
if with_urls_occurrences:
|
if with_urls_occurrences:
|
||||||
for urlnode in hostnode.urls:
|
for urlnode in hostnode.urls:
|
||||||
to_return[capture_uuid]['urlnodes'][urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
|
urlnodes[urlnode.uuid] = {'start_time': urlnode.start_time.isoformat(),
|
||||||
'url': urlnode.name,
|
'url': urlnode.name,
|
||||||
'hostnode_uuid': urlnode.hostnode_uuid}
|
'hostnode_uuid': urlnode.hostnode_uuid}
|
||||||
if hasattr(urlnode, 'body_hash'):
|
if hasattr(urlnode, 'body_hash'):
|
||||||
to_return[capture_uuid]['urlnodes'][urlnode.uuid]['hash'] = urlnode.body_hash
|
urlnodes[urlnode.uuid]['hash'] = urlnode.body_hash
|
||||||
|
to_append['hostnodes'] = hostnodes
|
||||||
|
if with_urls_occurrences:
|
||||||
|
to_append['urlnodes'] = urlnodes
|
||||||
|
to_return.append(to_append)
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
def get_cookie_name_investigator(self, cookie_name: str):
|
def get_cookie_name_investigator(self, cookie_name: str):
|
||||||
|
|
Loading…
Reference in New Issue