lookyloo/bin/archiver.py

354 lines
16 KiB
Python
Raw Normal View History

2021-08-20 17:46:22 +02:00
#!/usr/bin/env python3
import csv
import gzip
2021-08-20 17:46:22 +02:00
import logging
2022-11-23 15:54:22 +01:00
import logging.config
2023-08-06 21:34:20 +02:00
import os
import shutil
2021-09-07 12:59:31 +02:00
from collections import defaultdict
2022-05-23 00:15:52 +02:00
from collections.abc import Mapping
from datetime import datetime, timedelta, date
2021-08-20 17:46:22 +02:00
from pathlib import Path
from typing import Dict, List, Optional
2021-08-20 17:46:22 +02:00
from redis import Redis
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
from lookyloo.helpers import get_captures_dir, is_locked
2021-08-20 17:46:22 +02:00
2022-11-23 15:54:22 +01:00
logging.config.dictConfig(get_config('logging'))
2021-08-20 17:46:22 +02:00
class Archiver(AbstractManager):
def __init__(self, loglevel: Optional[int]=None):
2021-08-20 17:46:22 +02:00
super().__init__(loglevel)
self.script_name = 'archiver'
self.redis = Redis(unix_socket_path=get_socket_path('cache'))
# make sure archived captures dir exists
self.archived_captures_dir = get_homedir() / 'archived_captures'
self.archived_captures_dir.mkdir(parents=True, exist_ok=True)
2021-08-30 12:48:13 +02:00
self._load_indexes()
2021-08-20 17:46:22 +02:00
# NOTE 2023-10-03: if we store the archived captures in s3fs (as it is the case in the CIRCL demo instance),
# listing the directories directly with s3fs-fuse causes I/O errors and is making the interface unusable.
# It is only a problem on directory listing and not when accessing a capture, so we only need to change the way
# we generate the index files.
# Other issue: the python module s3fs requires urllib < 2.0 (https://github.com/boto/botocore/issues/2926) so
# we cannot run the script creating the indexes in the same virtual environment as the rest of the project.
# The variable below will only be used to make sure we don't try to trigger a directory listing on a s3fs-fuse mount
# and we're going to create the index files from another script, in tools/create_archive_indexes.
self.archive_on_s3fs = False
s3fs_config = get_config('generic', 's3fs')
if s3fs_config.get('archive_on_s3fs'):
self.archive_on_s3fs = True
2021-08-20 17:46:22 +02:00
def _to_run_forever(self):
archiving_done = False
# NOTE: When we archive a big directory, moving *a lot* of files, expecially to MinIO
# can take a very long time. In order to avoid being stuck on the archiving, we break that in chunks
# but we also want to keep archiving without waiting 1h between each run.
while not archiving_done:
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
archiving_done = self._archive()
self._load_indexes()
# The HARs are supposedly all compressed so this call shouldn't be required
# unless you're processing old captures for the first time.
# self._compress_hars()
if not self.shutdown_requested():
# This call takes a very long time on MinIO
self._update_all_capture_indexes()
2021-08-30 12:48:13 +02:00
def _update_index(self, root_dir: Path) -> None:
current_index: Dict[str, str] = {}
if not any(os.scandir(root_dir)):
2023-08-06 21:34:20 +02:00
# the directory is empty, we can safely remove it
root_dir.rmdir()
return
2021-08-30 12:48:13 +02:00
self.logger.debug(f'Updating index for {root_dir}')
2021-08-30 12:48:13 +02:00
index_file = root_dir / 'index'
if index_file.exists():
# Skip index if the directory has been archived.
try:
with index_file.open('r') as _f:
2023-08-05 20:47:08 +02:00
current_index = {uuid: dirname for uuid, dirname in csv.reader(_f)
if uuid and dirname}
2022-09-23 14:32:42 +02:00
except Exception as e:
# the index file is broken, it will be recreated.
2022-09-23 14:32:42 +02:00
self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}')
pass
2021-08-30 12:48:13 +02:00
if not current_index:
index_file.unlink()
curent_index_dirs = set(current_index.values())
with os.scandir(root_dir) as it:
new_captures = {existing_capture.name for existing_capture in it
if (existing_capture.name not in curent_index_dirs) and existing_capture.is_dir()}
if not new_captures:
# No new captures, quitting
self.logger.debug(f'No new captures in {root_dir}.')
return
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
for capture_dir_name in new_captures:
capture_dir = root_dir / capture_dir_name
2023-08-06 21:34:20 +02:00
if not capture_dir.is_dir():
self.logger.warning(f'{capture_dir} is not a directory')
2021-08-30 12:48:13 +02:00
continue
if not next(capture_dir.iterdir(), None):
self.logger.warning(f'{capture_dir} is empty, removing.')
capture_dir.rmdir()
continue
2023-08-06 21:34:20 +02:00
uuid_file = capture_dir / 'uuid'
if not uuid_file.exists():
self.logger.warning(f'No UUID file in {capture_dir}.')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
2023-08-06 21:34:20 +02:00
continue
2021-08-30 12:48:13 +02:00
with uuid_file.open() as _f:
uuid = _f.read().strip()
try:
if not uuid:
self.logger.warning(f'{uuid_file} is empty')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
if uuid in current_index:
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
except OSError as e:
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
continue
current_index[uuid] = uuid_file.parent.name
2021-08-30 12:48:13 +02:00
if not current_index:
# The directory has been archived. It is probably safe to unlink, but
# if it's not, we will lose a whole buch of captures. Moving instead for safety.
2023-08-06 21:34:20 +02:00
shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name))
2021-08-30 12:48:13 +02:00
return
with index_file.open('w') as _f:
index_writer = csv.writer(_f)
for uuid, dirname in current_index.items():
index_writer.writerow([uuid, dirname])
2023-08-05 20:47:08 +02:00
def _make_dirs_list(self, root_dir: Path) -> List[Path]:
directories = []
year_now = date.today().year
while True:
year_dir = root_dir / str(year_now)
if not year_dir.exists():
# if we do not have a directory with this year, quit the loop
break
for month in range(12, 0, -1):
month_dir = year_dir / f'{month:02}'
if month_dir.exists():
directories.append(month_dir)
year_now -= 1
return directories
2021-08-30 12:48:13 +02:00
def _update_all_capture_indexes(self):
'''Run that after the captures are in the proper directories'''
# Recent captures
self.logger.info('Update recent indexes')
2023-08-05 20:47:08 +02:00
# NOTE: the call below will check the existence of every path ending with `uuid`,
# it is extremely inneficient as we have many hundred of thusands of them
# and we only care about the rood directory (ex: 2023/06)
# directories_to_index = {capture_dir.parent.parent
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
for directory_to_index in self._make_dirs_list(get_captures_dir()):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2021-08-30 12:48:13 +02:00
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Recent indexes updated')
if self.archive_on_s3fs:
self.logger.info('Not updating indexes as they are on a s3fs-fuse mount.')
return
2021-08-30 12:48:13 +02:00
# Archived captures
self.logger.info('Update archives indexes')
2023-08-05 20:47:08 +02:00
for directory_to_index in self._make_dirs_list(self.archived_captures_dir):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2021-08-30 12:48:13 +02:00
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Archived indexes updated')
2021-08-20 17:46:22 +02:00
def _archive(self):
archive_interval = timedelta(days=get_config('generic', 'archive'))
cut_time = (datetime.now() - archive_interval).date()
2021-08-23 15:51:06 +02:00
cut_time = cut_time.replace(day=1)
2023-09-15 11:42:33 +02:00
self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.')
archiving_done = True
2021-08-20 17:46:22 +02:00
# Format:
# { 2020: { 12: [(directory, uuid)] } }
2021-08-30 12:48:13 +02:00
to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list))
# In order to avoid scanning the complete directory on each run, we check if year and month are
# older than the cut time.
for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
month = int(index.parent.name)
year = int(index.parent.parent.name)
if date(year, month, 1) >= cut_time:
2021-08-20 17:46:22 +02:00
continue
for capture_uuid in index.parent.glob('*/uuid'):
try:
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S')
if timestamp.date() >= cut_time:
continue
to_archive[timestamp.year][timestamp.month].append(capture_uuid.parent)
self.logger.debug(f'Archiving {capture_uuid.parent}.')
if not to_archive:
self.logger.info('Nothing to archive.')
return archiving_done
2021-08-20 17:46:22 +02:00
for year, month_captures in to_archive.items():
for month, captures in month_captures.items():
dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
2021-08-20 17:46:22 +02:00
dest_dir.mkdir(parents=True, exist_ok=True)
capture_breakpoint = 300
self.logger.info(f'{len(captures)} captures to archive in {year}-{month}.')
2021-08-30 12:48:13 +02:00
for capture_path in captures:
2023-08-20 16:21:33 +02:00
if capture_breakpoint <= 0:
# Break and restart later
2023-09-15 11:42:33 +02:00
self.logger.info(f'Archived many captures in {year}-{month}, will keep going later.')
archiving_done = False
2023-08-20 16:21:33 +02:00
break
elif capture_breakpoint % 10:
2023-08-20 16:21:33 +02:00
# Just check if we requested a shutdown.
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
lock_file = capture_path / 'lock'
if try_make_file(lock_file):
# Lock created, we can proceede
with lock_file.open('w') as f:
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
else:
# The directory is locked because a pickle is being created, try again later
if is_locked(capture_path):
# call this method to remove dead locks
continue
capture_breakpoint -= 1
# If the HAR isn't archived yet, archive it before copy
for har in capture_path.glob('*.har'):
with har.open('rb') as f_in:
with gzip.open(f'{har}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
har.unlink()
try:
(capture_path / 'tree.pickle').unlink(missing_ok=True)
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
shutil.move(str(capture_path), str(dest_dir))
self.redis.delete(str(capture_path))
except OSError as e:
self.logger.warning(f'Unable to archive capture: {e}')
finally:
(dest_dir / capture_path.name / 'lock').unlink(missing_ok=True)
# we archived some captures, update relevant index
self._update_index(dest_dir)
if not archiving_done:
break
else:
break
if archiving_done:
self.logger.info('Archiving done.')
return archiving_done
2021-08-20 17:46:22 +02:00
def _compress_hars(self):
"""This method is very slow (it checks every single capture for non-compressed HARs)
The new approach is to compress the har of every capture by default so this shouldn't be
needed anymore. Keeping it here just for reference, or to process old archives that contain
non-gziped HARs.
"""
2022-07-27 14:33:28 +02:00
self.logger.info('Compressing archived captures')
for index in self.archived_captures_dir.glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
with index.open('r') as _f:
for uuid, dirname in csv.reader(_f):
for har in (index.parent / dirname).glob('*.har'):
with har.open('rb') as f_in:
with gzip.open(f'{har}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
har.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Archived captures compressed')
2021-08-30 12:48:13 +02:00
def _load_indexes(self):
# Initialize archives
for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2023-08-05 13:36:56 +02:00
self.logger.info(f'Loading {index}')
2021-08-30 12:48:13 +02:00
with index.open('r') as _f:
2023-08-05 20:47:08 +02:00
recent_uuids: Mapping = {uuid: str(index.parent / dirname)
for uuid, dirname in csv.reader(_f)
if (index.parent / dirname).exists()}
if recent_uuids:
2023-08-05 13:36:56 +02:00
self.logger.info(f'{len(recent_uuids)} captures in directory.')
2022-05-23 00:15:52 +02:00
self.redis.hset('lookup_dirs', mapping=recent_uuids)
else:
index.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Recent indexes loaded')
2021-08-30 12:48:13 +02:00
2023-08-05 13:36:56 +02:00
already_archived_uuids = {k.decode() for k in self.redis.hkeys('lookup_dirs_archived')}
self.logger.info(f'Already have {len(already_archived_uuids)} UUIDs archived')
# Initialize archives
2023-08-05 13:36:56 +02:00
for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2023-08-05 13:36:56 +02:00
self.logger.debug(f'Loading {index}')
with index.open('r') as _f:
2023-08-05 20:47:08 +02:00
archived_uuids: Mapping = {uuid: index.parent / dirname
for uuid, dirname in csv.reader(_f)}
if archived_uuids:
2023-08-05 13:36:56 +02:00
self.logger.debug(f'{len(archived_uuids)} captures in directory.')
new_uuids = set(archived_uuids.keys()) - already_archived_uuids
if not new_uuids:
self.logger.debug('No new archived UUID to check.')
continue
self.logger.info(f'Loading {index}, {len(archived_uuids)} captures in directory, {len(new_uuids)} archived UUID to check.')
2023-08-04 15:03:58 +02:00
# NOTE: Only check if the directory exists if the UUID isn't in the cache.
2023-08-05 20:47:08 +02:00
self.redis.hset('lookup_dirs_archived',
mapping={uuid: str(dirname)
for uuid, dirname in archived_uuids.items()
if uuid in new_uuids and dirname.exists()})
2022-07-27 14:33:28 +02:00
self.redis.hdel('lookup_dirs', *archived_uuids.keys())
else:
index.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Archived indexes loaded')
2021-08-23 15:14:08 +02:00
2021-08-20 17:46:22 +02:00
def main():
a = Archiver()
a.run(sleep_in_sec=3600)
2021-08-20 17:46:22 +02:00
if __name__ == '__main__':
main()