From f67a24f5c2c7b45be4f24512782157e71c5b1511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 15 Nov 2023 15:31:11 +0100 Subject: [PATCH] new: Store directories by day, refactor indexing (WiP) --- bin/archiver.py | 388 +++++++++++++++++++------------------- bin/background_indexer.py | 42 +++-- lookyloo/helpers.py | 42 ++++- lookyloo/lookyloo.py | 2 +- 4 files changed, 260 insertions(+), 214 deletions(-) diff --git a/bin/archiver.py b/bin/archiver.py index 1a18301..b3352df 100755 --- a/bin/archiver.py +++ b/bin/archiver.py @@ -8,17 +8,15 @@ import os import random import shutil -from collections import defaultdict -from collections.abc import Mapping from datetime import datetime, timedelta, date from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Set from redis import Redis import s3fs # type: ignore from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file -from lookyloo.helpers import get_captures_dir, is_locked +from lookyloo.helpers import get_captures_dir, is_locked, make_ts_from_dirname logging.config.dictConfig(get_config('logging')) @@ -67,88 +65,105 @@ class Archiver(AbstractManager): break archiving_done = self._archive() self._load_indexes() - # The HARs are supposedly all compressed so this call shouldn't be required - # unless you're processing old captures for the first time. - # self._compress_hars() if not self.shutdown_requested(): # This call takes a very long time on MinIO self._update_all_capture_indexes() - def _update_index(self, root_dir: Path, *, s3fs: bool=False) -> None: - current_index: Dict[str, str] = {} - if s3fs: - self.logger.info(f'Updating index for {root_dir} (s3fs)') - self.s3fs_client.invalidate_cache(self.s3fs_bucket) - # On s3fs, the path is bucket_name/year/month - # root_dir is /full/local/path/to/archived_captures/year/month - s3fs_dir = '/'.join([self.s3fs_bucket, root_dir.parent.name, root_dir.name]) - all_s3fs_captures = self.s3fs_client.ls(s3fs_dir, detail=False, refresh=True) - if not all_s3fs_captures: - self.logger.warning(f'{root_dir} is empty on s3fs ({s3fs_dir}).') - return - else: - self.logger.debug(f'Updating index for {root_dir}') - if not any(os.scandir(root_dir)): - # the directory is empty, we can safely remove it - root_dir.rmdir() - return + def _update_index(self, root_dir: Path, *, s3fs_parent_dir: Optional[str]=None) -> Optional[Path]: + # returns a path to the index for the given directory + logmsg = f'Updating index for {root_dir}' + if s3fs_parent_dir: + logmsg = f'{logmsg} (s3fs)' + self.logger.info(logmsg) + current_index: Dict[str, str] = {} index_file = root_dir / 'index' if index_file.exists(): - # Skip index if the directory has been archived. try: - with index_file.open('r') as _f: - current_index = {uuid: dirname for uuid, dirname in csv.reader(_f) - if uuid and dirname} + current_index = self.__load_index(index_file, ignore_sub=True) except Exception as e: # the index file is broken, it will be recreated. self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}') - pass if not current_index: + # The file is either empty or only contains subs + # NOTE: should we remove if it has subs? index_file.unlink() - curent_index_dirs = set(current_index.values()) + sub_indexes: List[Path] = [] + current_index_dirs: Set[str] = set(current_index.values()) + new_captures: Set[Path] = set() + + if s3fs_parent_dir: + s3fs_dir = '/'.join([s3fs_parent_dir, root_dir.name]) + # the call below will spit out a mix of directories: + # * + # * (which contains a directory) + for entry in self.s3fs_client.ls(s3fs_dir, detail=False, refresh=False): + if not self.s3fs_client.isdir(entry): + # index + continue + dir_on_disk = root_dir / entry.rsplit('/', 1)[-1] + if dir_on_disk.name.isdigit(): + # got a day directory that contains captures + sub_index = self._update_index(dir_on_disk, s3fs_parent_dir=s3fs_dir) + if sub_index: + sub_indexes.append(sub_index) + else: + # got a capture + if str(dir_on_disk) not in current_index_dirs: + new_captures.add(dir_on_disk) - if s3fs: - new_captures = {existing_capture.rsplit('/', 1)[-1] for existing_capture in all_s3fs_captures - if existing_capture.rsplit('/', 1)[-1] - and (existing_capture.rsplit('/', 1)[-1] not in curent_index_dirs) - and self.s3fs_client.isdir(existing_capture)} else: with os.scandir(root_dir) as it: - new_captures = {existing_capture.name for existing_capture in it - if (existing_capture.name not in curent_index_dirs) - and existing_capture.is_dir()} + for entry in it: + # can be index, sub directory (digit), or isoformat + if not entry.is_dir(): + # index + continue + dir_on_disk = Path(entry) + if dir_on_disk.name.isdigit(): + sub_index = self._update_index(dir_on_disk) + if sub_index: + sub_indexes.append(sub_index) + else: + # isoformat + if str(dir_on_disk) not in current_index_dirs: + new_captures.add(dir_on_disk) - if not new_captures: - if s3fs: - self.logger.info(f'No new captures in {root_dir} (s3fs directory)') - else: - self.logger.debug(f'No new captures in {root_dir}') - # No new captures, quitting - return + if not current_index and not new_captures and not sub_indexes: + # No captures at all in the directory and subdirectories, quitting + logmsg = f'No captures in {root_dir}' + if s3fs_parent_dir: + logmsg = f'{logmsg} (s3fs directory)' + self.logger.info(logmsg) + return None - self.logger.info(f'{len(new_captures)} new captures in {root_dir}.') + if new_captures: + self.logger.info(f'{len(new_captures)} new captures in {root_dir}.') - for capture_dir_name in new_captures: - capture_dir = root_dir / capture_dir_name + for capture_dir in new_captures: + # capture_dir_name is *only* the isoformat of the capture. + # This directory will either be directly in the month directory (old format) + # or in the day directory (new format) if not next(capture_dir.iterdir(), None): self.logger.warning(f'{capture_dir} is empty, removing.') capture_dir.rmdir() continue - uuid_file = capture_dir / 'uuid' - if not uuid_file.exists(): - self.logger.warning(f'No UUID file in {capture_dir}.') - shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures')) - continue - with uuid_file.open() as _f: - uuid = _f.read().strip() try: + uuid_file = capture_dir / 'uuid' + if not uuid_file.exists(): + self.logger.warning(f'No UUID file in {capture_dir}.') + shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures')) + continue + + with uuid_file.open() as _f: + uuid = _f.read().strip() if not uuid: self.logger.warning(f'{uuid_file} is empty') shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures')) continue + if uuid in current_index: self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}') shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures')) @@ -156,19 +171,24 @@ class Archiver(AbstractManager): except OSError as e: self.logger.warning(f'Error when discarding capture {capture_dir}: {e}') continue + current_index[uuid] = capture_dir.name - current_index[uuid] = uuid_file.parent.name - - if not current_index: + if not current_index and not sub_indexes: # The directory has been archived. It is probably safe to unlink, but # if it's not, we will lose a whole buch of captures. Moving instead for safety. shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name)) - return + self.logger.warning(f'Nothing to index in {root_dir}') + return None with index_file.open('w') as _f: index_writer = csv.writer(_f) for uuid, dirname in current_index.items(): index_writer.writerow([uuid, dirname]) + for sub_path in sub_indexes: + # Only keep the dir name + index_writer.writerow(['sub_index', sub_path.parent.name]) + + return index_file def _make_dirs_list(self, root_dir: Path) -> List[Path]: directories = [] @@ -206,169 +226,153 @@ class Archiver(AbstractManager): if self.shutdown_requested(): self.logger.warning('Shutdown requested, breaking.') break - # Updating the indexes can take a while, just run this call once in N calls - if random.randrange(20) == 0: - self._update_index(directory_to_index, s3fs=self.archive_on_s3fs) + year = directory_to_index.parent.name + if self.archive_on_s3fs: + # Updating the indexes can take a while, just run this call once in N calls + if random.randrange(20) == 0: + self._update_index(directory_to_index, + s3fs_parent_dir='/'.join([self.s3fs_bucket, year])) + else: + self._update_index(directory_to_index) self.logger.info('Archived indexes updated') + def __archive_single_capture(self, capture_path: Path) -> Path: + capture_timestamp = make_ts_from_dirname(capture_path.name) + dest_dir = self.archived_captures_dir / str(capture_timestamp.year) / f'{capture_timestamp.month:02}' / f'{capture_timestamp.day:02}' + dest_dir.mkdir(parents=True, exist_ok=True) + # If the HAR isn't archived yet, archive it before copy + for har in capture_path.glob('*.har'): + with har.open('rb') as f_in: + with gzip.open(f'{har}.gz', 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + har.unlink() + + # read uuid before copying over to (maybe) S3 + with (capture_path / 'uuid').open() as _uuid: + uuid = _uuid.read().strip() + + (capture_path / 'tree.pickle').unlink(missing_ok=True) + (capture_path / 'tree.pickle.gz').unlink(missing_ok=True) + shutil.move(str(capture_path), str(dest_dir)) + # Update index in parent + with (dest_dir / 'index').open('a') as _index: + index_writer = csv.writer(_index) + index_writer.writerow([uuid, capture_path.name]) + # Update redis cache all at once. + p = self.redis.pipeline() + p.delete(str(capture_path)) + p.hset('lookup_dirs_archived', mapping={uuid: str(dest_dir / capture_path.name)}) + p.hdel('lookup_dirs', uuid) + p.execute() + + return dest_dir / capture_path.name + def _archive(self): archive_interval = timedelta(days=get_config('generic', 'archive')) - cut_time = (datetime.now() - archive_interval).date() - cut_time = cut_time.replace(day=1) + cut_time = (datetime.now() - archive_interval) self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.') archiving_done = True - # Format: - # { 2020: { 12: [(directory, uuid)] } } - to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list)) - # In order to avoid scanning the complete directory on each run, we check if year and month are - # older than the cut time. - for index in get_captures_dir().glob('*/*/index'): - if self.shutdown_requested(): - self.logger.warning('Shutdown requested, breaking.') + # Let's use the indexes instead of listing directories to find what we want to archive. + capture_breakpoint = 300 + for u, p in self.redis.hscan_iter('lookup_dirs'): + uuid = u.decode() + path = p.decode() + if capture_breakpoint <= 0: + # Break and restart later + self.logger.info('Archived many captures will keep going later.') + archiving_done = False break - month = int(index.parent.name) - year = int(index.parent.parent.name) - if date(year, month, 1) >= cut_time: - continue - - for capture_uuid in index.parent.glob('*/uuid'): - try: - timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f') - except ValueError: - timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S') - if timestamp.date() >= cut_time: - continue - to_archive[timestamp.year][timestamp.month].append(capture_uuid.parent) - self.logger.debug(f'Archiving {capture_uuid.parent}.') - - if not to_archive: - self.logger.info('Nothing to archive.') - return archiving_done - - for year, month_captures in to_archive.items(): - for month, captures in month_captures.items(): - dest_dir = self.archived_captures_dir / str(year) / f'{month:02}' - dest_dir.mkdir(parents=True, exist_ok=True) - capture_breakpoint = 300 - self.logger.info(f'{len(captures)} captures to archive in {year}-{month}.') - for capture_path in captures: - if capture_breakpoint <= 0: - # Break and restart later - self.logger.info(f'Archived many captures in {year}-{month}, will keep going later.') - archiving_done = False - break - elif capture_breakpoint % 10: - # Just check if we requested a shutdown. - if self.shutdown_requested(): - self.logger.warning('Shutdown requested, breaking.') - break - - lock_file = capture_path / 'lock' - if try_make_file(lock_file): - # Lock created, we can proceede - with lock_file.open('w') as f: - f.write(f"{datetime.now().isoformat()};{os.getpid()}") - else: - # The directory is locked because a pickle is being created, try again later - if is_locked(capture_path): - # call this method to remove dead locks - continue - - capture_breakpoint -= 1 - # If the HAR isn't archived yet, archive it before copy - for har in capture_path.glob('*.har'): - with har.open('rb') as f_in: - with gzip.open(f'{har}.gz', 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - har.unlink() - - try: - (capture_path / 'tree.pickle').unlink(missing_ok=True) - (capture_path / 'tree.pickle.gz').unlink(missing_ok=True) - shutil.move(str(capture_path), str(dest_dir)) - self.redis.delete(str(capture_path)) - except OSError as e: - self.logger.warning(f'Unable to archive capture: {e}') - finally: - (dest_dir / capture_path.name / 'lock').unlink(missing_ok=True) - # we archived some captures, update relevant index - self._update_index(dest_dir, s3fs=self.archive_on_s3fs) - if not archiving_done: + elif capture_breakpoint % 10: + # Just check if we requested a shutdown. + if self.shutdown_requested(): + self.logger.warning('Shutdown requested, breaking.') break + + capture_time_isoformat = os.path.basename(path) + if not capture_time_isoformat: + continue + capture_time = make_ts_from_dirname(capture_time_isoformat) + if capture_time >= cut_time: + continue + # archive the capture. + capture_path = Path(path) + if not capture_path.exists(): + if not self.redis.hexists('lookup_dirs_archived', uuid): + self.logger.warning(f'Missing capture directory for {uuid}, unable to archive {capture_path}') + continue + lock_file = capture_path / 'lock' + if try_make_file(lock_file): + # Lock created, we can proceede + with lock_file.open('w') as f: + f.write(f"{datetime.now().isoformat()};{os.getpid()}") else: - break + # The directory is locked because a pickle is being created, try again later + if is_locked(capture_path): + # call this method to remove dead locks + continue + + try: + new_capture_path = self.__archive_single_capture(capture_path) + capture_breakpoint -= 1 + except OSError as e: + self.logger.warning(f'Unable to archive capture: {e}') + finally: + (new_capture_path / 'lock').unlink(missing_ok=True) + if archiving_done: self.logger.info('Archiving done.') return archiving_done - def _compress_hars(self): - """This method is very slow (it checks every single capture for non-compressed HARs) - The new approach is to compress the har of every capture by default so this shouldn't be - needed anymore. Keeping it here just for reference, or to process old archives that contain - non-gziped HARs. - """ - self.logger.info('Compressing archived captures') - for index in self.archived_captures_dir.glob('*/*/index'): - if self.shutdown_requested(): - self.logger.warning('Shutdown requested, breaking.') - break - with index.open('r') as _f: - for uuid, dirname in csv.reader(_f): - for har in (index.parent / dirname).glob('*.har'): - with har.open('rb') as f_in: - with gzip.open(f'{har}.gz', 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - har.unlink() - self.logger.info('Archived captures compressed') + def __load_index(self, index_path: Path, ignore_sub: bool=False) -> Dict[str, str]: + '''Loads the given index file and all the subsequent ones if they exist''' + # NOTE: this method is used on recent and archived captures, it must never trigger a dir listing + indexed_captures = {} + with index_path.open() as _i: + for key, path_name in csv.reader(_i): + if key == 'sub_index' and not ignore_sub: + sub_index_file = index_path.parent / path_name / 'index' + if sub_index_file.exists(): + indexed_captures.update(self.__load_index(sub_index_file)) + else: + self.logger.warning(f'Missing sub index file: {sub_index_file}') + else: + # NOTE: we were initially checking if that path exists, + # but that's something we can do when we update the indexes instead. + # And a missing capture directory is already handled at rendering + indexed_captures[key] = str(index_path.parent / path_name) + return indexed_captures def _load_indexes(self): - # Initialize archives - for index in get_captures_dir().glob('*/*/index'): + # capture_dir / Year / Month / index <- should always exists. If not, created by _update_index + # Initialize recent index + for index in sorted(get_captures_dir().glob('*/*/index'), reverse=True): if self.shutdown_requested(): self.logger.warning('Shutdown requested, breaking.') break self.logger.info(f'Loading {index}') - with index.open('r') as _f: - recent_uuids: Mapping = {uuid: str(index.parent / dirname) - for uuid, dirname in csv.reader(_f) - if (index.parent / dirname).exists()} - if recent_uuids: - self.logger.info(f'{len(recent_uuids)} captures in directory.') - self.redis.hset('lookup_dirs', mapping=recent_uuids) + if recent_uuids := self.__load_index(index): + self.logger.debug(f'{len(recent_uuids)} captures in directory {index.parent}.') + self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore else: index.unlink() - self.logger.info('Recent indexes loaded') + total_recent_captures = self.redis.hlen('lookup_dirs') + self.logger.info(f'Recent indexes loaded: {total_recent_captures} entries.') - already_archived_uuids = {k.decode() for k in self.redis.hkeys('lookup_dirs_archived')} - self.logger.info(f'Already have {len(already_archived_uuids)} UUIDs archived') - # Initialize archives + # Initialize archives index for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True): if self.shutdown_requested(): self.logger.warning('Shutdown requested, breaking.') break self.logger.debug(f'Loading {index}') - with index.open('r') as _f: - archived_uuids: Mapping = {uuid: index.parent / dirname - for uuid, dirname in csv.reader(_f)} - if archived_uuids: - self.logger.debug(f'{len(archived_uuids)} captures in directory.') - new_uuids = set(archived_uuids.keys()) - already_archived_uuids - if not new_uuids: - self.logger.debug('No new archived UUID to check.') - continue - - self.logger.info(f'Loading {index}, {len(archived_uuids)} captures in directory, {len(new_uuids)} archived UUID to check.') - # NOTE: Only check if the directory exists if the UUID isn't in the cache. - self.redis.hset('lookup_dirs_archived', - mapping={uuid: str(dirname) - for uuid, dirname in archived_uuids.items() - if uuid in new_uuids and dirname.exists()}) - self.redis.hdel('lookup_dirs', *archived_uuids.keys()) + if archived_uuids := self.__load_index(index): + self.logger.debug(f'{len(archived_uuids)} captures in directory {index.parent}.') + self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore else: index.unlink() - self.logger.info('Archived indexes loaded') + total_archived_captures = self.redis.hlen('lookup_dirs_archived') + self.logger.info(f'Archived indexes loaded: {total_archived_captures} entries.') def main(): diff --git a/bin/background_indexer.py b/bin/background_indexer.py index 441c27d..c85d281 100755 --- a/bin/background_indexer.py +++ b/bin/background_indexer.py @@ -5,14 +5,14 @@ import logging.config import os import shutil -from datetime import date +from datetime import date, datetime, timedelta from pathlib import Path from typing import Optional, List from lookyloo.default import AbstractManager, get_config from lookyloo.exceptions import MissingUUID, NoValidHarFile from lookyloo.lookyloo import Lookyloo -from lookyloo.helpers import is_locked +from lookyloo.helpers import is_locked, get_sorted_captures_from_disk logging.config.dictConfig(get_config('logging')) @@ -55,46 +55,50 @@ class BackgroundIndexer(AbstractManager): # This value makes sure we break out of the loop and build pickles of the most recent captures max_captures = 50 got_new_captures = False + + # Initialize time where we do not want to build the pickles anymore. + archive_interval = timedelta(days=get_config('generic', 'archive')) + cut_time = (datetime.now() - archive_interval).date() for month_dir in self._make_dirs_list(self.lookyloo.capture_dir): - for uuid_path in sorted(month_dir.glob('*/uuid'), reverse=True): - if ((uuid_path.parent / 'tree.pickle.gz').exists() or (uuid_path.parent / 'tree.pickle').exists()): + for capture_time, path in get_sorted_captures_from_disk(month_dir, cut_time=cut_time, keep_more_recent=True): + if ((path / 'tree.pickle.gz').exists() or (path / 'tree.pickle').exists()): # We already have a pickle file - self.logger.debug(f'{uuid_path.parent} has a pickle.') + self.logger.debug(f'{path} has a pickle.') continue - if not list(uuid_path.parent.rglob('*.har.gz')) and not list(uuid_path.parent.rglob('*.har')): + if not list(path.rglob('*.har.gz')) and not list(path.rglob('*.har')): # No HAR file - self.logger.debug(f'{uuid_path.parent} has no HAR file.') + self.logger.debug(f'{path} has no HAR file.') continue - if is_locked(uuid_path.parent): + if is_locked(path): # it is really locked - self.logger.debug(f'{uuid_path.parent} is locked, pickle generated by another process.') + self.logger.debug(f'{path} is locked, pickle generated by another process.') continue - with uuid_path.open() as f: + with (path / 'uuid').open() as f: uuid = f.read() if not self.lookyloo.redis.hexists('lookup_dirs', uuid): # The capture with this UUID exists, but it is for some reason missing in lookup_dirs - self.lookyloo.redis.hset('lookup_dirs', uuid, str(uuid_path.parent)) + self.lookyloo.redis.hset('lookup_dirs', uuid, str(path)) else: cached_path = Path(self.lookyloo.redis.hget('lookup_dirs', uuid)) - if cached_path != uuid_path.parent: + if cached_path != path: # we have a duplicate UUID, it is proably related to some bad copy/paste if cached_path.exists(): # Both paths exist, move the one that isn't in lookup_dirs - self.logger.critical(f'Duplicate UUID for {uuid} in {cached_path} and {uuid_path.parent}, discarding the latest') + self.logger.critical(f'Duplicate UUID for {uuid} in {cached_path} and {path}, discarding the latest') try: - shutil.move(str(uuid_path.parent), str(self.discarded_captures_dir / uuid_path.parent.name)) + shutil.move(str(path), str(self.discarded_captures_dir / path.name)) except FileNotFoundError as e: self.logger.warning(f'Unable to move capture: {e}') continue else: # The path in lookup_dirs for that UUID doesn't exists, just update it. - self.lookyloo.redis.hset('lookup_dirs', uuid, str(uuid_path.parent)) + self.lookyloo.redis.hset('lookup_dirs', uuid, str(path)) try: - self.logger.info(f'Build pickle for {uuid}: {uuid_path.parent.name}') + self.logger.info(f'Build pickle for {uuid}: {path.name}') self.lookyloo.get_crawled_tree(uuid) self.lookyloo.trigger_modules(uuid, auto_trigger=True) self.logger.info(f'Pickle for {uuid} build.') @@ -103,14 +107,14 @@ class BackgroundIndexer(AbstractManager): except MissingUUID: self.logger.warning(f'Unable to find {uuid}. That should not happen.') except NoValidHarFile as e: - self.logger.critical(f'There are no HAR files in the capture {uuid}: {uuid_path.parent.name} - {e}') + self.logger.critical(f'There are no HAR files in the capture {uuid}: {path.name} - {e}') except FileNotFoundError: self.logger.warning(f'Capture {uuid} disappeared during processing, probably archived.') except Exception: - self.logger.exception(f'Unable to build pickle for {uuid}: {uuid_path.parent.name}') + self.logger.exception(f'Unable to build pickle for {uuid}: {path.name}') # The capture is not working, moving it away. try: - shutil.move(str(uuid_path.parent), str(self.discarded_captures_dir / uuid_path.parent.name)) + shutil.move(str(path), str(self.discarded_captures_dir / path.name)) self.lookyloo.redis.hdel('lookup_dirs', uuid) except FileNotFoundError as e: self.logger.warning(f'Unable to move capture: {e}') diff --git a/lookyloo/helpers.py b/lookyloo/helpers.py index 1c68ed4..b47cccd 100644 --- a/lookyloo/helpers.py +++ b/lookyloo/helpers.py @@ -5,12 +5,12 @@ import logging import os import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date from functools import lru_cache from importlib.metadata import version from io import BufferedIOBase from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Set, Union, Tuple from urllib.parse import urlparse @@ -76,6 +76,44 @@ def get_email_template() -> str: return f.read() +@lru_cache +def make_ts_from_dirname(dirname: str) -> datetime: + try: + return datetime.strptime(dirname, '%Y-%m-%dT%H:%M:%S.%f') + except ValueError: + return datetime.strptime(dirname, '%Y-%m-%dT%H:%M:%S') + + +def get_sorted_captures_from_disk(captures_dir: Path, /, *, + cut_time: Optional[Union[datetime, date]]=None, + keep_more_recent: bool=True) -> List[Tuple[datetime, Path]]: + '''Recursively gets all the captures present in a specific directory, doesn't use the indexes. + + NOTE: this method should never be used on archived captures as it's going to take forever on S3 + ''' + + all_paths: List[Tuple[datetime, Path]] = [] + for entry in captures_dir.iterdir(): + if not entry.is_dir(): + # index file + continue + if entry.name.isdigit(): + # sub directory + all_paths += get_sorted_captures_from_disk(entry, cut_time=cut_time, keep_more_recent=keep_more_recent) + else: + # capture directory + capture_time = make_ts_from_dirname(entry.name) + if cut_time: + if keep_more_recent and capture_time >= cut_time: + all_paths.append((capture_time, entry)) + elif capture_time < cut_time: + # keep only older + all_paths.append((capture_time, entry)) + else: + all_paths.append((capture_time, entry)) + return sorted(all_paths) + + class UserAgents: def __init__(self): diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index f0e5e02..b0eaa22 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -1497,7 +1497,7 @@ class Lookyloo(): ) -> None: now = datetime.now() - dirpath = self.capture_dir / str(now.year) / f'{now.month:02}' / now.isoformat() + dirpath = self.capture_dir / str(now.year) / f'{now.month:02}' / f'{now.day:02}' / now.isoformat() safe_create_dir(dirpath) if os or browser: