lookyloo/bin/archiver.py

441 lines
21 KiB
Python
Raw Normal View History

2021-08-20 17:46:22 +02:00
#!/usr/bin/env python3
2024-01-12 17:15:41 +01:00
from __future__ import annotations
2021-08-20 17:46:22 +02:00
import csv
import gzip
2021-08-20 17:46:22 +02:00
import logging
2022-11-23 15:54:22 +01:00
import logging.config
2023-08-06 21:34:20 +02:00
import os
2023-10-23 15:59:14 +02:00
import random
import shutil
from datetime import datetime, timedelta
2021-08-20 17:46:22 +02:00
from pathlib import Path
from redis import Redis
2024-01-26 15:03:36 +01:00
import s3fs # type: ignore[import-untyped]
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
from lookyloo.helpers import get_captures_dir, is_locked, make_ts_from_dirname, make_dirs_list
2021-08-20 17:46:22 +02:00
2022-11-23 15:54:22 +01:00
logging.config.dictConfig(get_config('logging'))
2021-08-20 17:46:22 +02:00
class Archiver(AbstractManager):
2024-01-12 17:15:41 +01:00
def __init__(self, loglevel: int | None=None) -> None:
2021-08-20 17:46:22 +02:00
super().__init__(loglevel)
self.script_name = 'archiver'
self.redis = Redis(unix_socket_path=get_socket_path('cache'))
# make sure archived captures dir exists
self.archived_captures_dir = get_homedir() / 'archived_captures'
self.archived_captures_dir.mkdir(parents=True, exist_ok=True)
2021-08-30 12:48:13 +02:00
self._load_indexes()
2021-08-20 17:46:22 +02:00
# NOTE 2023-10-03: if we store the archived captures in s3fs (as it is the case in the CIRCL demo instance),
# listing the directories directly with s3fs-fuse causes I/O errors and is making the interface unusable.
# It is only a problem on directory listing and not when accessing a capture, so we only need to change the way
# we generate the index files.
# Other issue: the python module s3fs requires urllib < 2.0 (https://github.com/boto/botocore/issues/2926) so
# we cannot run the script creating the indexes in the same virtual environment as the rest of the project.
# The variable below will only be used to make sure we don't try to trigger a directory listing on a s3fs-fuse mount
# and we're going to create the index files from another script, in tools/create_archive_indexes.
self.archive_on_s3fs = False
s3fs_config = get_config('generic', 's3fs')
if s3fs_config.get('archive_on_s3fs'):
self.archive_on_s3fs = True
2023-10-23 15:59:14 +02:00
self.s3fs_client = s3fs.S3FileSystem(key=s3fs_config['config']['key'],
secret=s3fs_config['config']['secret'],
endpoint_url=s3fs_config['config']['endpoint_url'],
config_kwargs={'connect_timeout': 10,
'read_timeout': 900})
2023-10-23 15:59:14 +02:00
self.s3fs_bucket = s3fs_config['config']['bucket_name']
self.s3fs_client.clear_multipart_uploads(self.s3fs_bucket)
2024-01-12 17:15:41 +01:00
def _to_run_forever(self) -> None:
archiving_done = False
# NOTE: When we archive a big directory, moving *a lot* of files, expecially to MinIO
# can take a very long time. In order to avoid being stuck on the archiving, we break that in chunks
# but we also want to keep archiving without waiting 1h between each run.
while not archiving_done:
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
archiving_done = self._archive()
self._load_indexes()
if not archiving_done:
self._update_all_capture_indexes(recent_only=True)
if not self.shutdown_requested():
# This call takes a very long time on MinIO
self._update_all_capture_indexes()
2021-08-30 12:48:13 +02:00
2024-01-12 17:15:41 +01:00
def _update_index(self, root_dir: Path, *, s3fs_parent_dir: str | None=None) -> Path | None:
# returns a path to the index for the given directory
logmsg = f'Updating index for {root_dir}'
if s3fs_parent_dir:
logmsg = f'{logmsg} (s3fs)'
self.logger.info(logmsg)
2021-08-30 12:48:13 +02:00
# Flip that variable is we need to write the index
rewrite_index: bool = False
2024-01-12 17:15:41 +01:00
current_index: dict[str, str] = {}
current_sub_index: set[str] = set()
2021-08-30 12:48:13 +02:00
index_file = root_dir / 'index'
if index_file.exists():
try:
current_index = self.__load_index(index_file, ignore_sub=True)
2022-09-23 14:32:42 +02:00
except Exception as e:
# the index file is broken, it will be recreated.
2022-09-23 14:32:42 +02:00
self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}')
# Check if we have sub_index entries, they're skipped from the call above.
with index_file.open() as _i:
for key, path_name in csv.reader(_i):
if key == 'sub_index':
current_sub_index.add(path_name)
if not current_index and not current_sub_index:
# The file is empty
2021-08-30 12:48:13 +02:00
index_file.unlink()
2024-01-12 17:15:41 +01:00
current_index_dirs: set[str] = set(current_index.values())
new_captures: set[Path] = set()
# Directories that are actually in the listing.
2024-01-12 17:15:41 +01:00
current_dirs: set[str] = set()
if s3fs_parent_dir:
s3fs_dir = '/'.join([s3fs_parent_dir, root_dir.name])
# the call below will spit out a mix of directories:
# * <datetime>
# * <day> (which contains a <datetime> directory)
for entry in self.s3fs_client.ls(s3fs_dir, detail=False, refresh=False):
if entry.endswith('/'):
# root directory
continue
if not self.s3fs_client.isdir(entry):
# index
continue
dir_on_disk = root_dir / entry.rsplit('/', 1)[-1]
if dir_on_disk.name.isdigit():
if self._update_index(dir_on_disk, s3fs_parent_dir=s3fs_dir):
2024-01-08 16:27:12 +01:00
# got a day directory that contains captures
if dir_on_disk.name not in current_sub_index:
# ... and it's not in the index
rewrite_index = True
current_sub_index.add(dir_on_disk.name)
self.logger.info(f'Adding sub index {dir_on_disk.name} to {index_file}')
else:
# got a capture
2024-04-24 13:54:51 +02:00
if len(self.s3fs_client.ls(entry, detail=False)) == 1:
# empty capture directory
self.s3fs_client.rm(entry)
continue
if str(dir_on_disk) not in current_index_dirs:
new_captures.add(dir_on_disk)
2024-01-08 16:27:12 +01:00
current_dirs.add(dir_on_disk.name)
2024-01-08 16:50:48 +01:00
current_dirs.add(str(dir_on_disk))
else:
with os.scandir(root_dir) as it:
for entry in it:
# can be index, sub directory (digit), or isoformat
if not entry.is_dir():
# index
continue
dir_on_disk = Path(entry)
if dir_on_disk.name.isdigit():
if self._update_index(dir_on_disk):
2024-01-08 16:27:12 +01:00
# got a day directory that contains captures
if dir_on_disk.name not in current_sub_index:
# ... and it's not in the index
rewrite_index = True
current_sub_index.add(dir_on_disk.name)
self.logger.info(f'Adding sub index {dir_on_disk.name} to {index_file}')
else:
# isoformat
if str(dir_on_disk) not in current_index_dirs:
new_captures.add(dir_on_disk)
2024-01-08 16:27:12 +01:00
current_dirs.add(dir_on_disk.name)
2024-01-08 16:50:48 +01:00
current_dirs.add(str(dir_on_disk))
# Check if all the directories in current_dirs (that we got by listing the directory)
2023-11-18 03:20:49 +01:00
# are the same as the one in the index. If they're not, we pop the UUID before writing the index
if non_existing_dirs := current_index_dirs - current_dirs:
self.logger.info(f'Got {len(non_existing_dirs)} non existing dirs in {root_dir}, removing them from the index.')
current_index = {uuid: Path(path).name for uuid, path in current_index.items() if path not in non_existing_dirs}
rewrite_index = True
# Make sure all the sub_index directories exist on the disk
if old_subindexes := {sub_index for sub_index in current_sub_index if sub_index not in current_dirs}:
2024-06-24 16:16:20 +02:00
self.logger.warning(f'Sub index {", ".join(old_subindexes)} do not exist, removing them from the index.')
rewrite_index = True
current_sub_index -= old_subindexes
if not current_index and not new_captures and not current_sub_index:
# No captures at all in the directory and subdirectories, quitting
logmsg = f'No captures in {root_dir}'
if s3fs_parent_dir:
logmsg = f'{logmsg} (s3fs directory)'
self.logger.info(logmsg)
index_file.unlink(missing_ok=True)
root_dir.rmdir()
return None
if new_captures:
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
for capture_dir in new_captures:
# capture_dir_name is *only* the isoformat of the capture.
# This directory will either be directly in the month directory (old format)
# or in the day directory (new format)
try:
if not next(capture_dir.iterdir(), None):
self.logger.warning(f'{capture_dir} is empty, removing.')
capture_dir.rmdir()
continue
except FileNotFoundError:
self.logger.warning(f'{capture_dir} does not exists.')
continue
try:
uuid_file = capture_dir / 'uuid'
if not uuid_file.exists():
self.logger.warning(f'No UUID file in {capture_dir}.')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
with uuid_file.open() as _f:
uuid = _f.read().strip()
if not uuid:
self.logger.warning(f'{uuid_file} is empty')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
if uuid in current_index:
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
except OSError as e:
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
continue
rewrite_index = True
current_index[uuid] = capture_dir.name
if not current_index and not current_sub_index:
# The directory has been archived. It is probably safe to unlink, but
# if it's not, we will lose a whole buch of captures. Moving instead for safety.
2023-08-06 21:34:20 +02:00
shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name))
self.logger.warning(f'Nothing to index in {root_dir}')
return None
2021-08-30 12:48:13 +02:00
if rewrite_index:
self.logger.info(f'Writing index {index_file}.')
with index_file.open('w') as _f:
index_writer = csv.writer(_f)
for uuid, dirname in current_index.items():
index_writer.writerow([uuid, Path(dirname).name])
for sub_path in sorted(current_sub_index):
# Only keep the dir name
index_writer.writerow(['sub_index', sub_path])
2021-08-30 12:48:13 +02:00
return index_file
2023-08-05 20:47:08 +02:00
2024-01-12 17:15:41 +01:00
def _update_all_capture_indexes(self, *, recent_only: bool=False) -> None:
2021-08-30 12:48:13 +02:00
'''Run that after the captures are in the proper directories'''
# Recent captures
self.logger.info('Update recent indexes')
2023-08-05 20:47:08 +02:00
# NOTE: the call below will check the existence of every path ending with `uuid`,
# it is extremely ineficient as we have many hundred of thusands of them
# and we only care about the root directory (ex: 2023/06)
2023-08-05 20:47:08 +02:00
# directories_to_index = {capture_dir.parent.parent
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
for directory_to_index in make_dirs_list(get_captures_dir()):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2021-08-30 12:48:13 +02:00
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Recent indexes updated')
if recent_only:
self.logger.info('Only updating recent indexes.')
return
2021-08-30 12:48:13 +02:00
# Archived captures
self.logger.info('Update archives indexes')
for directory_to_index in make_dirs_list(self.archived_captures_dir):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
year = directory_to_index.parent.name
if self.archive_on_s3fs:
# Updating the indexes can take a while, just run this call once in N calls
if random.randrange(20) == 0:
self._update_index(directory_to_index,
s3fs_parent_dir='/'.join([self.s3fs_bucket, year]))
else:
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Archived indexes updated')
2021-08-20 17:46:22 +02:00
def __archive_single_capture(self, capture_path: Path) -> Path:
capture_timestamp = make_ts_from_dirname(capture_path.name)
dest_dir = self.archived_captures_dir / str(capture_timestamp.year) / f'{capture_timestamp.month:02}' / f'{capture_timestamp.day:02}'
dest_dir.mkdir(parents=True, exist_ok=True)
# If the HAR isn't archived yet, archive it before copy
for har in capture_path.glob('*.har'):
with har.open('rb') as f_in:
with gzip.open(f'{har}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
har.unlink()
# read uuid before copying over to (maybe) S3
with (capture_path / 'uuid').open() as _uuid:
uuid = _uuid.read().strip()
(capture_path / 'tree.pickle').unlink(missing_ok=True)
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
shutil.move(str(capture_path), str(dest_dir))
# Update index in parent
with (dest_dir / 'index').open('a') as _index:
index_writer = csv.writer(_index)
index_writer.writerow([uuid, capture_path.name])
# Update redis cache all at once.
p = self.redis.pipeline()
p.delete(str(capture_path))
p.hset('lookup_dirs_archived', mapping={uuid: str(dest_dir / capture_path.name)})
p.hdel('lookup_dirs', uuid)
p.execute()
return dest_dir / capture_path.name
2024-01-12 17:15:41 +01:00
def _archive(self) -> bool:
2021-08-20 17:46:22 +02:00
archive_interval = timedelta(days=get_config('generic', 'archive'))
cut_time = (datetime.now() - archive_interval)
2023-09-15 11:42:33 +02:00
self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.')
archiving_done = True
2021-08-20 17:46:22 +02:00
# Let's use the indexes instead of listing directories to find what we want to archive.
capture_breakpoint = 300
for u, p in self.redis.hscan_iter('lookup_dirs'):
uuid = u.decode()
path = p.decode()
if capture_breakpoint <= 0:
# Break and restart later
self.logger.info('Archived many captures will keep going later.')
archiving_done = False
break
elif capture_breakpoint % 10:
# Just check if we requested a shutdown.
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
capture_time_isoformat = os.path.basename(path)
if not capture_time_isoformat:
continue
try:
capture_time = make_ts_from_dirname(capture_time_isoformat)
except ValueError:
self.logger.warning(f'Invalid capture time for {uuid}: {capture_time_isoformat}')
self.redis.hdel('lookup_dirs', uuid)
continue
if capture_time >= cut_time:
continue
# archive the capture.
capture_path = Path(path)
if not capture_path.exists():
self.redis.hdel('lookup_dirs', uuid)
if not self.redis.hexists('lookup_dirs_archived', uuid):
self.logger.warning(f'Missing capture directory for {uuid}, unable to archive {capture_path}')
continue
lock_file = capture_path / 'lock'
if try_make_file(lock_file):
# Lock created, we can proceede
with lock_file.open('w') as f:
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
else:
# The directory is locked because a pickle is being created, try again later
if is_locked(capture_path):
# call this method to remove dead locks
continue
try:
new_capture_path = self.__archive_single_capture(capture_path)
capture_breakpoint -= 1
except OSError:
self.logger.exception(f'Unable to archive capture {capture_path}')
(capture_path / 'lock').unlink(missing_ok=True)
except Exception:
self.logger.exception(f'Critical exception while archiving {capture_path}')
(capture_path / 'lock').unlink(missing_ok=True)
else:
(new_capture_path / 'lock').unlink(missing_ok=True)
if archiving_done:
self.logger.info('Archiving done.')
return archiving_done
2021-08-20 17:46:22 +02:00
2024-01-12 17:15:41 +01:00
def __load_index(self, index_path: Path, ignore_sub: bool=False) -> dict[str, str]:
'''Loads the given index file and all the subsequent ones if they exist'''
# NOTE: this method is used on recent and archived captures, it must never trigger a dir listing
indexed_captures = {}
with index_path.open() as _i:
for key, path_name in csv.reader(_i):
if key == 'sub_index' and ignore_sub:
# We're not interested in the sub indexes and don't want them to land in indexed_captures
continue
elif key == 'sub_index' and not ignore_sub:
sub_index_file = index_path.parent / path_name / 'index'
if sub_index_file.exists():
indexed_captures.update(self.__load_index(sub_index_file))
else:
self.logger.warning(f'Missing sub index file: {sub_index_file}')
else:
# NOTE: we were initially checking if that path exists,
# but that's something we can do when we update the indexes instead.
# And a missing capture directory is already handled at rendering
indexed_captures[key] = str(index_path.parent / path_name)
return indexed_captures
2024-01-12 17:15:41 +01:00
def _load_indexes(self) -> None:
# capture_dir / Year / Month / index <- should always exists. If not, created by _update_index
# Initialize recent index
for index in sorted(get_captures_dir().glob('*/*/index'), reverse=True):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
self.logger.debug(f'Loading {index}')
if recent_uuids := self.__load_index(index):
self.logger.debug(f'{len(recent_uuids)} captures in directory {index.parent}.')
2024-01-26 15:03:36 +01:00
self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore[arg-type]
else:
index.unlink()
total_recent_captures = self.redis.hlen('lookup_dirs')
self.logger.info(f'Recent indexes loaded: {total_recent_captures} entries.')
2021-08-30 12:48:13 +02:00
# Initialize archives index
2023-08-05 13:36:56 +02:00
for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2023-08-05 13:36:56 +02:00
self.logger.debug(f'Loading {index}')
if archived_uuids := self.__load_index(index):
self.logger.debug(f'{len(archived_uuids)} captures in directory {index.parent}.')
2024-01-26 15:03:36 +01:00
self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore[arg-type]
else:
index.unlink()
total_archived_captures = self.redis.hlen('lookup_dirs_archived')
self.logger.info(f'Archived indexes loaded: {total_archived_captures} entries.')
2021-08-23 15:14:08 +02:00
2021-08-20 17:46:22 +02:00
2024-01-12 17:15:41 +01:00
def main() -> None:
2021-08-20 17:46:22 +02:00
a = Archiver()
a.run(sleep_in_sec=3600)
2021-08-20 17:46:22 +02:00
if __name__ == '__main__':
main()