mirror of https://github.com/CIRCL/lookyloo
new: Store directories by day, refactor indexing
parent
1b987c38b7
commit
7791eff842
407
bin/archiver.py
407
bin/archiver.py
|
@ -8,17 +8,15 @@ import os
|
|||
import random
|
||||
import shutil
|
||||
|
||||
from collections import defaultdict
|
||||
from collections.abc import Mapping
|
||||
from datetime import datetime, timedelta, date
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Set
|
||||
|
||||
from redis import Redis
|
||||
import s3fs # type: ignore
|
||||
|
||||
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
|
||||
from lookyloo.helpers import get_captures_dir, is_locked
|
||||
from lookyloo.helpers import get_captures_dir, is_locked, make_ts_from_dirname, make_dirs_list
|
||||
|
||||
logging.config.dictConfig(get_config('logging'))
|
||||
|
||||
|
@ -67,88 +65,105 @@ class Archiver(AbstractManager):
|
|||
break
|
||||
archiving_done = self._archive()
|
||||
self._load_indexes()
|
||||
# The HARs are supposedly all compressed so this call shouldn't be required
|
||||
# unless you're processing old captures for the first time.
|
||||
# self._compress_hars()
|
||||
if not self.shutdown_requested():
|
||||
# This call takes a very long time on MinIO
|
||||
self._update_all_capture_indexes()
|
||||
|
||||
def _update_index(self, root_dir: Path, *, s3fs: bool=False) -> None:
|
||||
current_index: Dict[str, str] = {}
|
||||
if s3fs:
|
||||
self.logger.info(f'Updating index for {root_dir} (s3fs)')
|
||||
self.s3fs_client.invalidate_cache(self.s3fs_bucket)
|
||||
# On s3fs, the path is bucket_name/year/month
|
||||
# root_dir is /full/local/path/to/archived_captures/year/month
|
||||
s3fs_dir = '/'.join([self.s3fs_bucket, root_dir.parent.name, root_dir.name])
|
||||
all_s3fs_captures = self.s3fs_client.ls(s3fs_dir, detail=False, refresh=True)
|
||||
if not all_s3fs_captures:
|
||||
self.logger.warning(f'{root_dir} is empty on s3fs ({s3fs_dir}).')
|
||||
return
|
||||
else:
|
||||
self.logger.debug(f'Updating index for {root_dir}')
|
||||
if not any(os.scandir(root_dir)):
|
||||
# the directory is empty, we can safely remove it
|
||||
root_dir.rmdir()
|
||||
return
|
||||
def _update_index(self, root_dir: Path, *, s3fs_parent_dir: Optional[str]=None) -> Optional[Path]:
|
||||
# returns a path to the index for the given directory
|
||||
logmsg = f'Updating index for {root_dir}'
|
||||
if s3fs_parent_dir:
|
||||
logmsg = f'{logmsg} (s3fs)'
|
||||
self.logger.info(logmsg)
|
||||
|
||||
current_index: Dict[str, str] = {}
|
||||
index_file = root_dir / 'index'
|
||||
if index_file.exists():
|
||||
# Skip index if the directory has been archived.
|
||||
try:
|
||||
with index_file.open('r') as _f:
|
||||
current_index = {uuid: dirname for uuid, dirname in csv.reader(_f)
|
||||
if uuid and dirname}
|
||||
current_index = self.__load_index(index_file, ignore_sub=True)
|
||||
except Exception as e:
|
||||
# the index file is broken, it will be recreated.
|
||||
self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}')
|
||||
pass
|
||||
if not current_index:
|
||||
# The file is either empty or only contains subs
|
||||
# NOTE: should we remove if it has subs?
|
||||
index_file.unlink()
|
||||
|
||||
curent_index_dirs = set(current_index.values())
|
||||
sub_indexes: List[Path] = []
|
||||
current_index_dirs: Set[str] = set(current_index.values())
|
||||
new_captures: Set[Path] = set()
|
||||
|
||||
if s3fs_parent_dir:
|
||||
s3fs_dir = '/'.join([s3fs_parent_dir, root_dir.name])
|
||||
# the call below will spit out a mix of directories:
|
||||
# * <datetime>
|
||||
# * <day> (which contains a <datetime> directory)
|
||||
for entry in self.s3fs_client.ls(s3fs_dir, detail=False, refresh=False):
|
||||
if not self.s3fs_client.isdir(entry):
|
||||
# index
|
||||
continue
|
||||
dir_on_disk = root_dir / entry.rsplit('/', 1)[-1]
|
||||
if dir_on_disk.name.isdigit():
|
||||
# got a day directory that contains captures
|
||||
sub_index = self._update_index(dir_on_disk, s3fs_parent_dir=s3fs_dir)
|
||||
if sub_index:
|
||||
sub_indexes.append(sub_index)
|
||||
else:
|
||||
# got a capture
|
||||
if str(dir_on_disk) not in current_index_dirs:
|
||||
new_captures.add(dir_on_disk)
|
||||
|
||||
if s3fs:
|
||||
new_captures = {existing_capture.rsplit('/', 1)[-1] for existing_capture in all_s3fs_captures
|
||||
if existing_capture.rsplit('/', 1)[-1]
|
||||
and (existing_capture.rsplit('/', 1)[-1] not in curent_index_dirs)
|
||||
and self.s3fs_client.isdir(existing_capture)}
|
||||
else:
|
||||
with os.scandir(root_dir) as it:
|
||||
new_captures = {existing_capture.name for existing_capture in it
|
||||
if (existing_capture.name not in curent_index_dirs)
|
||||
and existing_capture.is_dir()}
|
||||
for entry in it:
|
||||
# can be index, sub directory (digit), or isoformat
|
||||
if not entry.is_dir():
|
||||
# index
|
||||
continue
|
||||
dir_on_disk = Path(entry)
|
||||
if dir_on_disk.name.isdigit():
|
||||
sub_index = self._update_index(dir_on_disk)
|
||||
if sub_index:
|
||||
sub_indexes.append(sub_index)
|
||||
else:
|
||||
# isoformat
|
||||
if str(dir_on_disk) not in current_index_dirs:
|
||||
new_captures.add(dir_on_disk)
|
||||
|
||||
if not new_captures:
|
||||
if s3fs:
|
||||
self.logger.info(f'No new captures in {root_dir} (s3fs directory)')
|
||||
else:
|
||||
self.logger.debug(f'No new captures in {root_dir}')
|
||||
# No new captures, quitting
|
||||
return
|
||||
if not current_index and not new_captures and not sub_indexes:
|
||||
# No captures at all in the directory and subdirectories, quitting
|
||||
logmsg = f'No captures in {root_dir}'
|
||||
if s3fs_parent_dir:
|
||||
logmsg = f'{logmsg} (s3fs directory)'
|
||||
self.logger.info(logmsg)
|
||||
return None
|
||||
|
||||
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
|
||||
if new_captures:
|
||||
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
|
||||
|
||||
for capture_dir_name in new_captures:
|
||||
capture_dir = root_dir / capture_dir_name
|
||||
for capture_dir in new_captures:
|
||||
# capture_dir_name is *only* the isoformat of the capture.
|
||||
# This directory will either be directly in the month directory (old format)
|
||||
# or in the day directory (new format)
|
||||
if not next(capture_dir.iterdir(), None):
|
||||
self.logger.warning(f'{capture_dir} is empty, removing.')
|
||||
capture_dir.rmdir()
|
||||
continue
|
||||
uuid_file = capture_dir / 'uuid'
|
||||
if not uuid_file.exists():
|
||||
self.logger.warning(f'No UUID file in {capture_dir}.')
|
||||
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
||||
continue
|
||||
with uuid_file.open() as _f:
|
||||
uuid = _f.read().strip()
|
||||
|
||||
try:
|
||||
uuid_file = capture_dir / 'uuid'
|
||||
if not uuid_file.exists():
|
||||
self.logger.warning(f'No UUID file in {capture_dir}.')
|
||||
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
||||
continue
|
||||
|
||||
with uuid_file.open() as _f:
|
||||
uuid = _f.read().strip()
|
||||
if not uuid:
|
||||
self.logger.warning(f'{uuid_file} is empty')
|
||||
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
||||
continue
|
||||
|
||||
if uuid in current_index:
|
||||
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
|
||||
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
||||
|
@ -156,34 +171,24 @@ class Archiver(AbstractManager):
|
|||
except OSError as e:
|
||||
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
|
||||
continue
|
||||
current_index[uuid] = capture_dir.name
|
||||
|
||||
current_index[uuid] = uuid_file.parent.name
|
||||
|
||||
if not current_index:
|
||||
if not current_index and not sub_indexes:
|
||||
# The directory has been archived. It is probably safe to unlink, but
|
||||
# if it's not, we will lose a whole buch of captures. Moving instead for safety.
|
||||
shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name))
|
||||
return
|
||||
self.logger.warning(f'Nothing to index in {root_dir}')
|
||||
return None
|
||||
|
||||
with index_file.open('w') as _f:
|
||||
index_writer = csv.writer(_f)
|
||||
for uuid, dirname in current_index.items():
|
||||
index_writer.writerow([uuid, dirname])
|
||||
for sub_path in sub_indexes:
|
||||
# Only keep the dir name
|
||||
index_writer.writerow(['sub_index', sub_path.parent.name])
|
||||
|
||||
def _make_dirs_list(self, root_dir: Path) -> List[Path]:
|
||||
directories = []
|
||||
year_now = date.today().year
|
||||
while True:
|
||||
year_dir = root_dir / str(year_now)
|
||||
if not year_dir.exists():
|
||||
# if we do not have a directory with this year, quit the loop
|
||||
break
|
||||
for month in range(12, 0, -1):
|
||||
month_dir = year_dir / f'{month:02}'
|
||||
if month_dir.exists():
|
||||
directories.append(month_dir)
|
||||
year_now -= 1
|
||||
return directories
|
||||
return index_file
|
||||
|
||||
def _update_all_capture_indexes(self):
|
||||
'''Run that after the captures are in the proper directories'''
|
||||
|
@ -194,7 +199,7 @@ class Archiver(AbstractManager):
|
|||
# and we only care about the root directory (ex: 2023/06)
|
||||
# directories_to_index = {capture_dir.parent.parent
|
||||
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
|
||||
for directory_to_index in self._make_dirs_list(get_captures_dir()):
|
||||
for directory_to_index in make_dirs_list(get_captures_dir()):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
|
@ -202,173 +207,157 @@ class Archiver(AbstractManager):
|
|||
self.logger.info('Recent indexes updated')
|
||||
# Archived captures
|
||||
self.logger.info('Update archives indexes')
|
||||
for directory_to_index in self._make_dirs_list(self.archived_captures_dir):
|
||||
for directory_to_index in make_dirs_list(self.archived_captures_dir):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
# Updating the indexes can take a while, just run this call once in N calls
|
||||
if random.randrange(20) == 0:
|
||||
self._update_index(directory_to_index, s3fs=self.archive_on_s3fs)
|
||||
year = directory_to_index.parent.name
|
||||
if self.archive_on_s3fs:
|
||||
# Updating the indexes can take a while, just run this call once in N calls
|
||||
if random.randrange(20) == 0:
|
||||
self._update_index(directory_to_index,
|
||||
s3fs_parent_dir='/'.join([self.s3fs_bucket, year]))
|
||||
else:
|
||||
self._update_index(directory_to_index)
|
||||
self.logger.info('Archived indexes updated')
|
||||
|
||||
def __archive_single_capture(self, capture_path: Path) -> Path:
|
||||
capture_timestamp = make_ts_from_dirname(capture_path.name)
|
||||
dest_dir = self.archived_captures_dir / str(capture_timestamp.year) / f'{capture_timestamp.month:02}' / f'{capture_timestamp.day:02}'
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
# If the HAR isn't archived yet, archive it before copy
|
||||
for har in capture_path.glob('*.har'):
|
||||
with har.open('rb') as f_in:
|
||||
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
har.unlink()
|
||||
|
||||
# read uuid before copying over to (maybe) S3
|
||||
with (capture_path / 'uuid').open() as _uuid:
|
||||
uuid = _uuid.read().strip()
|
||||
|
||||
(capture_path / 'tree.pickle').unlink(missing_ok=True)
|
||||
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
|
||||
shutil.move(str(capture_path), str(dest_dir))
|
||||
# Update index in parent
|
||||
with (dest_dir / 'index').open('a') as _index:
|
||||
index_writer = csv.writer(_index)
|
||||
index_writer.writerow([uuid, capture_path.name])
|
||||
# Update redis cache all at once.
|
||||
p = self.redis.pipeline()
|
||||
p.delete(str(capture_path))
|
||||
p.hset('lookup_dirs_archived', mapping={uuid: str(dest_dir / capture_path.name)})
|
||||
p.hdel('lookup_dirs', uuid)
|
||||
p.execute()
|
||||
|
||||
return dest_dir / capture_path.name
|
||||
|
||||
def _archive(self):
|
||||
archive_interval = timedelta(days=get_config('generic', 'archive'))
|
||||
cut_time = (datetime.now() - archive_interval).date()
|
||||
cut_time = cut_time.replace(day=1)
|
||||
cut_time = (datetime.now() - archive_interval)
|
||||
self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.')
|
||||
archiving_done = True
|
||||
|
||||
# Format:
|
||||
# { 2020: { 12: [(directory, uuid)] } }
|
||||
to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list))
|
||||
# In order to avoid scanning the complete directory on each run, we check if year and month are
|
||||
# older than the cut time.
|
||||
for index in get_captures_dir().glob('*/*/index'):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
# Let's use the indexes instead of listing directories to find what we want to archive.
|
||||
capture_breakpoint = 300
|
||||
for u, p in self.redis.hscan_iter('lookup_dirs'):
|
||||
uuid = u.decode()
|
||||
path = p.decode()
|
||||
if capture_breakpoint <= 0:
|
||||
# Break and restart later
|
||||
self.logger.info('Archived many captures will keep going later.')
|
||||
archiving_done = False
|
||||
break
|
||||
month = int(index.parent.name)
|
||||
year = int(index.parent.parent.name)
|
||||
if date(year, month, 1) >= cut_time:
|
||||
continue
|
||||
|
||||
for capture_uuid in index.parent.glob('*/uuid'):
|
||||
try:
|
||||
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f')
|
||||
except ValueError:
|
||||
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S')
|
||||
if timestamp.date() >= cut_time:
|
||||
continue
|
||||
to_archive[timestamp.year][timestamp.month].append(capture_uuid.parent)
|
||||
self.logger.debug(f'Archiving {capture_uuid.parent}.')
|
||||
|
||||
if not to_archive:
|
||||
self.logger.info('Nothing to archive.')
|
||||
return archiving_done
|
||||
|
||||
for year, month_captures in to_archive.items():
|
||||
for month, captures in month_captures.items():
|
||||
dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
capture_breakpoint = 300
|
||||
self.logger.info(f'{len(captures)} captures to archive in {year}-{month}.')
|
||||
for capture_path in captures:
|
||||
if capture_breakpoint <= 0:
|
||||
# Break and restart later
|
||||
self.logger.info(f'Archived many captures in {year}-{month}, will keep going later.')
|
||||
archiving_done = False
|
||||
break
|
||||
elif capture_breakpoint % 10:
|
||||
# Just check if we requested a shutdown.
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
|
||||
lock_file = capture_path / 'lock'
|
||||
if try_make_file(lock_file):
|
||||
# Lock created, we can proceede
|
||||
with lock_file.open('w') as f:
|
||||
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
|
||||
else:
|
||||
# The directory is locked because a pickle is being created, try again later
|
||||
if is_locked(capture_path):
|
||||
# call this method to remove dead locks
|
||||
continue
|
||||
|
||||
capture_breakpoint -= 1
|
||||
# If the HAR isn't archived yet, archive it before copy
|
||||
for har in capture_path.glob('*.har'):
|
||||
with har.open('rb') as f_in:
|
||||
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
har.unlink()
|
||||
|
||||
try:
|
||||
(capture_path / 'tree.pickle').unlink(missing_ok=True)
|
||||
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
|
||||
shutil.move(str(capture_path), str(dest_dir))
|
||||
self.redis.delete(str(capture_path))
|
||||
except OSError as e:
|
||||
self.logger.warning(f'Unable to archive capture: {e}')
|
||||
finally:
|
||||
(dest_dir / capture_path.name / 'lock').unlink(missing_ok=True)
|
||||
# we archived some captures, update relevant index
|
||||
self._update_index(dest_dir, s3fs=self.archive_on_s3fs)
|
||||
if not archiving_done:
|
||||
elif capture_breakpoint % 10:
|
||||
# Just check if we requested a shutdown.
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
|
||||
capture_time_isoformat = os.path.basename(path)
|
||||
if not capture_time_isoformat:
|
||||
continue
|
||||
capture_time = make_ts_from_dirname(capture_time_isoformat)
|
||||
if capture_time >= cut_time:
|
||||
continue
|
||||
# archive the capture.
|
||||
capture_path = Path(path)
|
||||
if not capture_path.exists():
|
||||
if not self.redis.hexists('lookup_dirs_archived', uuid):
|
||||
self.logger.warning(f'Missing capture directory for {uuid}, unable to archive {capture_path}')
|
||||
continue
|
||||
lock_file = capture_path / 'lock'
|
||||
if try_make_file(lock_file):
|
||||
# Lock created, we can proceede
|
||||
with lock_file.open('w') as f:
|
||||
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
|
||||
else:
|
||||
break
|
||||
# The directory is locked because a pickle is being created, try again later
|
||||
if is_locked(capture_path):
|
||||
# call this method to remove dead locks
|
||||
continue
|
||||
|
||||
try:
|
||||
new_capture_path = self.__archive_single_capture(capture_path)
|
||||
capture_breakpoint -= 1
|
||||
except OSError as e:
|
||||
self.logger.warning(f'Unable to archive capture: {e}')
|
||||
finally:
|
||||
(new_capture_path / 'lock').unlink(missing_ok=True)
|
||||
|
||||
if archiving_done:
|
||||
self.logger.info('Archiving done.')
|
||||
return archiving_done
|
||||
|
||||
def _compress_hars(self):
|
||||
"""This method is very slow (it checks every single capture for non-compressed HARs)
|
||||
The new approach is to compress the har of every capture by default so this shouldn't be
|
||||
needed anymore. Keeping it here just for reference, or to process old archives that contain
|
||||
non-gziped HARs.
|
||||
"""
|
||||
self.logger.info('Compressing archived captures')
|
||||
for index in self.archived_captures_dir.glob('*/*/index'):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
with index.open('r') as _f:
|
||||
for uuid, dirname in csv.reader(_f):
|
||||
for har in (index.parent / dirname).glob('*.har'):
|
||||
with har.open('rb') as f_in:
|
||||
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
har.unlink()
|
||||
self.logger.info('Archived captures compressed')
|
||||
def __load_index(self, index_path: Path, ignore_sub: bool=False) -> Dict[str, str]:
|
||||
'''Loads the given index file and all the subsequent ones if they exist'''
|
||||
# NOTE: this method is used on recent and archived captures, it must never trigger a dir listing
|
||||
indexed_captures = {}
|
||||
with index_path.open() as _i:
|
||||
for key, path_name in csv.reader(_i):
|
||||
if key == 'sub_index' and not ignore_sub:
|
||||
sub_index_file = index_path.parent / path_name / 'index'
|
||||
if sub_index_file.exists():
|
||||
indexed_captures.update(self.__load_index(sub_index_file))
|
||||
else:
|
||||
self.logger.warning(f'Missing sub index file: {sub_index_file}')
|
||||
else:
|
||||
# NOTE: we were initially checking if that path exists,
|
||||
# but that's something we can do when we update the indexes instead.
|
||||
# And a missing capture directory is already handled at rendering
|
||||
indexed_captures[key] = str(index_path.parent / path_name)
|
||||
return indexed_captures
|
||||
|
||||
def _load_indexes(self):
|
||||
# Initialize archives
|
||||
for index in get_captures_dir().glob('*/*/index'):
|
||||
# capture_dir / Year / Month / index <- should always exists. If not, created by _update_index
|
||||
# Initialize recent index
|
||||
for index in sorted(get_captures_dir().glob('*/*/index'), reverse=True):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
|
||||
self.logger.info(f'Loading {index}')
|
||||
with index.open('r') as _f:
|
||||
recent_uuids: Mapping = {uuid: str(index.parent / dirname)
|
||||
for uuid, dirname in csv.reader(_f)
|
||||
if (index.parent / dirname).exists()}
|
||||
if recent_uuids:
|
||||
self.logger.info(f'{len(recent_uuids)} captures in directory.')
|
||||
self.redis.hset('lookup_dirs', mapping=recent_uuids)
|
||||
if recent_uuids := self.__load_index(index):
|
||||
self.logger.debug(f'{len(recent_uuids)} captures in directory {index.parent}.')
|
||||
self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore
|
||||
else:
|
||||
index.unlink()
|
||||
self.logger.info('Recent indexes loaded')
|
||||
total_recent_captures = self.redis.hlen('lookup_dirs')
|
||||
self.logger.info(f'Recent indexes loaded: {total_recent_captures} entries.')
|
||||
|
||||
already_archived_uuids = {k.decode() for k in self.redis.hkeys('lookup_dirs_archived')}
|
||||
self.logger.info(f'Already have {len(already_archived_uuids)} UUIDs archived')
|
||||
# Initialize archives
|
||||
# Initialize archives index
|
||||
for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True):
|
||||
if self.shutdown_requested():
|
||||
self.logger.warning('Shutdown requested, breaking.')
|
||||
break
|
||||
self.logger.debug(f'Loading {index}')
|
||||
with index.open('r') as _f:
|
||||
archived_uuids: Mapping = {uuid: index.parent / dirname
|
||||
for uuid, dirname in csv.reader(_f)}
|
||||
if archived_uuids:
|
||||
self.logger.debug(f'{len(archived_uuids)} captures in directory.')
|
||||
new_uuids = set(archived_uuids.keys()) - already_archived_uuids
|
||||
if not new_uuids:
|
||||
self.logger.debug('No new archived UUID to check.')
|
||||
continue
|
||||
|
||||
self.logger.info(f'Loading {index}, {len(archived_uuids)} captures in directory, {len(new_uuids)} archived UUID to check.')
|
||||
# NOTE: Only check if the directory exists if the UUID isn't in the cache.
|
||||
self.redis.hset('lookup_dirs_archived',
|
||||
mapping={uuid: str(dirname)
|
||||
for uuid, dirname in archived_uuids.items()
|
||||
if uuid in new_uuids and dirname.exists()})
|
||||
self.redis.hdel('lookup_dirs', *archived_uuids.keys())
|
||||
if archived_uuids := self.__load_index(index):
|
||||
self.logger.debug(f'{len(archived_uuids)} captures in directory {index.parent}.')
|
||||
self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore
|
||||
else:
|
||||
index.unlink()
|
||||
self.logger.info('Archived indexes loaded')
|
||||
total_archived_captures = self.redis.hlen('lookup_dirs_archived')
|
||||
self.logger.info(f'Archived indexes loaded: {total_archived_captures} entries.')
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
@ -5,14 +5,14 @@ import logging.config
|
|||
import os
|
||||
import shutil
|
||||
|
||||
from datetime import date
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
from typing import Optional
|
||||
|
||||
from lookyloo.default import AbstractManager, get_config
|
||||
from lookyloo.exceptions import MissingUUID, NoValidHarFile
|
||||
from lookyloo.lookyloo import Lookyloo
|
||||
from lookyloo.helpers import is_locked
|
||||
from lookyloo.helpers import is_locked, get_sorted_captures_from_disk, make_dirs_list
|
||||
|
||||
|
||||
logging.config.dictConfig(get_config('logging'))
|
||||
|
@ -34,67 +34,56 @@ class BackgroundIndexer(AbstractManager):
|
|||
self._check_indexes()
|
||||
self.lookyloo.update_tree_cache_info(os.getpid(), self.script_name)
|
||||
|
||||
def _make_dirs_list(self, root_dir: Path) -> List[Path]:
|
||||
directories = []
|
||||
year_now = date.today().year
|
||||
while True:
|
||||
year_dir = root_dir / str(year_now)
|
||||
if not year_dir.exists():
|
||||
# if we do not have a directory with this year, quit the loop
|
||||
break
|
||||
for month in range(12, 0, -1):
|
||||
month_dir = year_dir / f'{month:02}'
|
||||
if month_dir.exists():
|
||||
directories.append(month_dir)
|
||||
year_now -= 1
|
||||
return directories
|
||||
|
||||
def _build_missing_pickles(self) -> bool:
|
||||
self.logger.debug('Build missing pickles...')
|
||||
# Sometimes, we have a huge backlog and the process might get stuck on old captures for a very long time
|
||||
# This value makes sure we break out of the loop and build pickles of the most recent captures
|
||||
max_captures = 50
|
||||
got_new_captures = False
|
||||
for month_dir in self._make_dirs_list(self.lookyloo.capture_dir):
|
||||
for uuid_path in sorted(month_dir.glob('*/uuid'), reverse=True):
|
||||
if ((uuid_path.parent / 'tree.pickle.gz').exists() or (uuid_path.parent / 'tree.pickle').exists()):
|
||||
|
||||
# Initialize time where we do not want to build the pickles anymore.
|
||||
archive_interval = timedelta(days=get_config('generic', 'archive'))
|
||||
cut_time = (datetime.now() - archive_interval)
|
||||
for month_dir in make_dirs_list(self.lookyloo.capture_dir):
|
||||
for capture_time, path in get_sorted_captures_from_disk(month_dir, cut_time=cut_time, keep_more_recent=True):
|
||||
if ((path / 'tree.pickle.gz').exists() or (path / 'tree.pickle').exists()):
|
||||
# We already have a pickle file
|
||||
self.logger.debug(f'{uuid_path.parent} has a pickle.')
|
||||
self.logger.debug(f'{path} has a pickle.')
|
||||
continue
|
||||
if not list(uuid_path.parent.rglob('*.har.gz')) and not list(uuid_path.parent.rglob('*.har')):
|
||||
if not list(path.rglob('*.har.gz')) and not list(path.rglob('*.har')):
|
||||
# No HAR file
|
||||
self.logger.debug(f'{uuid_path.parent} has no HAR file.')
|
||||
self.logger.debug(f'{path} has no HAR file.')
|
||||
continue
|
||||
|
||||
if is_locked(uuid_path.parent):
|
||||
if is_locked(path):
|
||||
# it is really locked
|
||||
self.logger.debug(f'{uuid_path.parent} is locked, pickle generated by another process.')
|
||||
self.logger.debug(f'{path} is locked, pickle generated by another process.')
|
||||
continue
|
||||
|
||||
with uuid_path.open() as f:
|
||||
with (path / 'uuid').open() as f:
|
||||
uuid = f.read()
|
||||
|
||||
if not self.lookyloo.redis.hexists('lookup_dirs', uuid):
|
||||
# The capture with this UUID exists, but it is for some reason missing in lookup_dirs
|
||||
self.lookyloo.redis.hset('lookup_dirs', uuid, str(uuid_path.parent))
|
||||
self.lookyloo.redis.hset('lookup_dirs', uuid, str(path))
|
||||
else:
|
||||
cached_path = Path(self.lookyloo.redis.hget('lookup_dirs', uuid))
|
||||
if cached_path != uuid_path.parent:
|
||||
if cached_path != path:
|
||||
# we have a duplicate UUID, it is proably related to some bad copy/paste
|
||||
if cached_path.exists():
|
||||
# Both paths exist, move the one that isn't in lookup_dirs
|
||||
self.logger.critical(f'Duplicate UUID for {uuid} in {cached_path} and {uuid_path.parent}, discarding the latest')
|
||||
self.logger.critical(f'Duplicate UUID for {uuid} in {cached_path} and {path}, discarding the latest')
|
||||
try:
|
||||
shutil.move(str(uuid_path.parent), str(self.discarded_captures_dir / uuid_path.parent.name))
|
||||
shutil.move(str(path), str(self.discarded_captures_dir / path.name))
|
||||
except FileNotFoundError as e:
|
||||
self.logger.warning(f'Unable to move capture: {e}')
|
||||
continue
|
||||
else:
|
||||
# The path in lookup_dirs for that UUID doesn't exists, just update it.
|
||||
self.lookyloo.redis.hset('lookup_dirs', uuid, str(uuid_path.parent))
|
||||
self.lookyloo.redis.hset('lookup_dirs', uuid, str(path))
|
||||
|
||||
try:
|
||||
self.logger.info(f'Build pickle for {uuid}: {uuid_path.parent.name}')
|
||||
self.logger.info(f'Build pickle for {uuid}: {path.name}')
|
||||
self.lookyloo.get_crawled_tree(uuid)
|
||||
self.lookyloo.trigger_modules(uuid, auto_trigger=True)
|
||||
self.logger.info(f'Pickle for {uuid} build.')
|
||||
|
@ -103,14 +92,14 @@ class BackgroundIndexer(AbstractManager):
|
|||
except MissingUUID:
|
||||
self.logger.warning(f'Unable to find {uuid}. That should not happen.')
|
||||
except NoValidHarFile as e:
|
||||
self.logger.critical(f'There are no HAR files in the capture {uuid}: {uuid_path.parent.name} - {e}')
|
||||
self.logger.critical(f'There are no HAR files in the capture {uuid}: {path.name} - {e}')
|
||||
except FileNotFoundError:
|
||||
self.logger.warning(f'Capture {uuid} disappeared during processing, probably archived.')
|
||||
except Exception:
|
||||
self.logger.exception(f'Unable to build pickle for {uuid}: {uuid_path.parent.name}')
|
||||
self.logger.exception(f'Unable to build pickle for {uuid}: {path.name}')
|
||||
# The capture is not working, moving it away.
|
||||
try:
|
||||
shutil.move(str(uuid_path.parent), str(self.discarded_captures_dir / uuid_path.parent.name))
|
||||
shutil.move(str(path), str(self.discarded_captures_dir / path.name))
|
||||
self.lookyloo.redis.hdel('lookup_dirs', uuid)
|
||||
except FileNotFoundError as e:
|
||||
self.logger.warning(f'Unable to move capture: {e}')
|
||||
|
|
|
@ -5,12 +5,12 @@ import logging
|
|||
import os
|
||||
import time
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, date
|
||||
from functools import lru_cache
|
||||
from importlib.metadata import version
|
||||
from io import BufferedIOBase
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
from typing import Any, Dict, List, Optional, Set, Union, Tuple
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
|
@ -76,6 +76,60 @@ def get_email_template() -> str:
|
|||
return f.read()
|
||||
|
||||
|
||||
def make_dirs_list(root_dir: Path) -> List[Path]:
|
||||
directories = []
|
||||
year_now = date.today().year
|
||||
while True:
|
||||
year_dir = root_dir / str(year_now)
|
||||
if not year_dir.exists():
|
||||
# if we do not have a directory with this year, quit the loop
|
||||
break
|
||||
for month in range(12, 0, -1):
|
||||
month_dir = year_dir / f'{month:02}'
|
||||
if month_dir.exists():
|
||||
directories.append(month_dir)
|
||||
year_now -= 1
|
||||
return directories
|
||||
|
||||
|
||||
@lru_cache
|
||||
def make_ts_from_dirname(dirname: str) -> datetime:
|
||||
try:
|
||||
return datetime.strptime(dirname, '%Y-%m-%dT%H:%M:%S.%f')
|
||||
except ValueError:
|
||||
return datetime.strptime(dirname, '%Y-%m-%dT%H:%M:%S')
|
||||
|
||||
|
||||
def get_sorted_captures_from_disk(captures_dir: Path, /, *,
|
||||
cut_time: Optional[Union[datetime, date]]=None,
|
||||
keep_more_recent: bool=True) -> List[Tuple[datetime, Path]]:
|
||||
'''Recursively gets all the captures present in a specific directory, doesn't use the indexes.
|
||||
|
||||
NOTE: this method should never be used on archived captures as it's going to take forever on S3
|
||||
'''
|
||||
|
||||
all_paths: List[Tuple[datetime, Path]] = []
|
||||
for entry in captures_dir.iterdir():
|
||||
if not entry.is_dir():
|
||||
# index file
|
||||
continue
|
||||
if entry.name.isdigit():
|
||||
# sub directory
|
||||
all_paths += get_sorted_captures_from_disk(entry, cut_time=cut_time, keep_more_recent=keep_more_recent)
|
||||
else:
|
||||
# capture directory
|
||||
capture_time = make_ts_from_dirname(entry.name)
|
||||
if cut_time:
|
||||
if keep_more_recent and capture_time >= cut_time:
|
||||
all_paths.append((capture_time, entry))
|
||||
elif capture_time < cut_time:
|
||||
# keep only older
|
||||
all_paths.append((capture_time, entry))
|
||||
else:
|
||||
all_paths.append((capture_time, entry))
|
||||
return sorted(all_paths)
|
||||
|
||||
|
||||
class UserAgents:
|
||||
|
||||
def __init__(self):
|
||||
|
|
|
@ -1497,7 +1497,7 @@ class Lookyloo():
|
|||
) -> None:
|
||||
|
||||
now = datetime.now()
|
||||
dirpath = self.capture_dir / str(now.year) / f'{now.month:02}' / now.isoformat()
|
||||
dirpath = self.capture_dir / str(now.year) / f'{now.month:02}' / f'{now.day:02}' / now.isoformat()
|
||||
safe_create_dir(dirpath)
|
||||
|
||||
if os or browser:
|
||||
|
|
Loading…
Reference in New Issue