lookyloo/bin/archiver.py

269 lines
12 KiB
Python
Raw Normal View History

2021-08-20 17:46:22 +02:00
#!/usr/bin/env python3
import csv
import gzip
2021-08-20 17:46:22 +02:00
import logging
2022-11-23 15:54:22 +01:00
import logging.config
2023-08-06 21:34:20 +02:00
import os
import shutil
2021-09-07 12:59:31 +02:00
from collections import defaultdict
2022-05-23 00:15:52 +02:00
from collections.abc import Mapping
from datetime import datetime, timedelta, date
2021-08-20 17:46:22 +02:00
from pathlib import Path
from typing import Dict, List, Optional
2021-08-20 17:46:22 +02:00
from redis import Redis
2021-10-18 13:06:43 +02:00
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path
from lookyloo.helpers import get_captures_dir
2021-08-20 17:46:22 +02:00
2022-11-23 15:54:22 +01:00
logging.config.dictConfig(get_config('logging'))
2021-08-20 17:46:22 +02:00
class Archiver(AbstractManager):
def __init__(self, loglevel: Optional[int]=None):
2021-08-20 17:46:22 +02:00
super().__init__(loglevel)
self.script_name = 'archiver'
self.redis = Redis(unix_socket_path=get_socket_path('cache'))
# make sure archived captures dir exists
self.archived_captures_dir = get_homedir() / 'archived_captures'
self.archived_captures_dir.mkdir(parents=True, exist_ok=True)
2021-08-30 12:48:13 +02:00
self._load_indexes()
2021-08-20 17:46:22 +02:00
def _to_run_forever(self):
self._archive()
2021-08-30 12:48:13 +02:00
self._update_all_capture_indexes()
self._load_indexes()
self._compress_hars()
2021-08-30 12:48:13 +02:00
def _update_index(self, root_dir: Path) -> None:
current_index: Dict[str, str] = {}
2023-08-06 21:34:20 +02:00
if not os.listdir(root_dir):
# the directory is empty, we can safely remove it
root_dir.rmdir()
return
2021-08-30 12:48:13 +02:00
self.logger.debug(f'Updating index for {root_dir}')
2021-08-30 12:48:13 +02:00
index_file = root_dir / 'index'
existing_captures_names = {existing_capture.name for existing_capture in index_file.parent.iterdir()
if existing_capture.name != 'index'}
2021-08-30 12:48:13 +02:00
if index_file.exists():
# Skip index if the directory has been archived.
try:
with index_file.open('r') as _f:
2023-08-05 20:47:08 +02:00
current_index = {uuid: dirname for uuid, dirname in csv.reader(_f)
if uuid
and dirname
and dirname in existing_captures_names}
2022-09-23 14:32:42 +02:00
except Exception as e:
# the index file is broken, it will be recreated.
2022-09-23 14:32:42 +02:00
self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}')
pass
2021-08-30 12:48:13 +02:00
if not current_index:
index_file.unlink()
if set(current_index.values()) == existing_captures_names:
# No new captures, quitting
self.logger.debug(f'No new captures in {root_dir}.')
return
new_captures = sorted(existing_captures_names - set(current_index.values()), reverse=True)
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
for capture_dir_name in new_captures:
capture_dir = root_dir / capture_dir_name
2023-08-06 21:34:20 +02:00
if not capture_dir.is_dir():
self.logger.warning(f'{capture_dir} is not a directory')
2021-08-30 12:48:13 +02:00
continue
2023-08-06 21:34:20 +02:00
uuid_file = capture_dir / 'uuid'
if not uuid_file.exists():
self.logger.warning(f'No UUID file in {capture_dir}.')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
2023-08-06 21:34:20 +02:00
continue
2021-08-30 12:48:13 +02:00
with uuid_file.open() as _f:
uuid = _f.read().strip()
if not uuid:
self.logger.warning(f'{uuid_file} is empty')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
if uuid in current_index:
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
continue
current_index[uuid] = uuid_file.parent.name
2021-08-30 12:48:13 +02:00
if not current_index:
# The directory has been archived. It is probably safe to unlink, but
# if it's not, we will lose a whole buch of captures. Moving instead for safety.
2023-08-06 21:34:20 +02:00
shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name))
2021-08-30 12:48:13 +02:00
return
with index_file.open('w') as _f:
index_writer = csv.writer(_f)
for uuid, dirname in current_index.items():
index_writer.writerow([uuid, dirname])
2023-08-05 20:47:08 +02:00
def _make_dirs_list(self, root_dir: Path) -> List[Path]:
directories = []
year_now = date.today().year
while True:
year_dir = root_dir / str(year_now)
if not year_dir.exists():
# if we do not have a directory with this year, quit the loop
break
for month in range(12, 0, -1):
month_dir = year_dir / f'{month:02}'
if month_dir.exists():
directories.append(month_dir)
year_now -= 1
return directories
2021-08-30 12:48:13 +02:00
def _update_all_capture_indexes(self):
'''Run that after the captures are in the proper directories'''
# Recent captures
self.logger.info('Update recent indexes')
2023-08-05 20:47:08 +02:00
# NOTE: the call below will check the existence of every path ending with `uuid`,
# it is extremely inneficient as we have many hundred of thusands of them
# and we only care about the rood directory (ex: 2023/06)
# directories_to_index = {capture_dir.parent.parent
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
for directory_to_index in self._make_dirs_list(get_captures_dir()):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2021-08-30 12:48:13 +02:00
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Recent indexes updated')
2021-08-30 12:48:13 +02:00
# Archived captures
self.logger.info('Update archives indexes')
2023-08-05 20:47:08 +02:00
for directory_to_index in self._make_dirs_list(self.archived_captures_dir):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2021-08-30 12:48:13 +02:00
self._update_index(directory_to_index)
2022-07-27 14:33:28 +02:00
self.logger.info('Archived indexes updated')
2021-08-20 17:46:22 +02:00
def _archive(self):
archive_interval = timedelta(days=get_config('generic', 'archive'))
cut_time = (datetime.now() - archive_interval).date()
2021-08-23 15:51:06 +02:00
cut_time = cut_time.replace(day=1)
2021-08-20 17:46:22 +02:00
# Format:
# { 2020: { 12: [(directory, uuid)] } }
2021-08-30 12:48:13 +02:00
to_archive: Dict[int, Dict[int, List[Path]]] = defaultdict(lambda: defaultdict(list))
# In order to avoid scanning the complete directory on each run, we check if year and month are
# older than the cut time.
for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
month = int(index.parent.name)
year = int(index.parent.parent.name)
if date(year, month, 1) >= cut_time:
2021-08-20 17:46:22 +02:00
continue
for capture_uuid in index.parent.glob('*/uuid'):
try:
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
timestamp = datetime.strptime(capture_uuid.parent.name, '%Y-%m-%dT%H:%M:%S')
if timestamp.date() >= cut_time:
continue
to_archive[timestamp.year][timestamp.month].append(capture_uuid.parent)
self.logger.info(f'Archiving {capture_uuid.parent}.')
if not to_archive:
self.logger.info('Nothing to archive.')
return
2021-08-20 17:46:22 +02:00
2021-08-30 12:48:13 +02:00
p = self.redis.pipeline()
2021-08-20 17:46:22 +02:00
for year, month_captures in to_archive.items():
for month, captures in month_captures.items():
dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
2021-08-20 17:46:22 +02:00
dest_dir.mkdir(parents=True, exist_ok=True)
2021-08-30 12:48:13 +02:00
for capture_path in captures:
p.delete(str(capture_path))
(capture_path / 'tree.pickle').unlink(missing_ok=True)
2023-03-16 12:28:28 +01:00
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
shutil.move(str(capture_path), str(dest_dir))
2021-08-30 12:48:13 +02:00
p.execute()
self.logger.info('Archiving done.')
2021-08-20 17:46:22 +02:00
def _compress_hars(self):
2022-07-27 14:33:28 +02:00
self.logger.info('Compressing archived captures')
for index in self.archived_captures_dir.glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
with index.open('r') as _f:
for uuid, dirname in csv.reader(_f):
for har in (index.parent / dirname).rglob('*.har'):
if not har.exists():
continue
with har.open('rb') as f_in:
with gzip.open(f'{har}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
har.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Archived captures compressed')
2021-08-30 12:48:13 +02:00
def _load_indexes(self):
# Initialize archives
for index in get_captures_dir().glob('*/*/index'):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2023-08-05 13:36:56 +02:00
self.logger.info(f'Loading {index}')
2021-08-30 12:48:13 +02:00
with index.open('r') as _f:
2023-08-05 20:47:08 +02:00
recent_uuids: Mapping = {uuid: str(index.parent / dirname)
for uuid, dirname in csv.reader(_f)
if (index.parent / dirname).exists()}
if recent_uuids:
2023-08-05 13:36:56 +02:00
self.logger.info(f'{len(recent_uuids)} captures in directory.')
2022-05-23 00:15:52 +02:00
self.redis.hset('lookup_dirs', mapping=recent_uuids)
else:
index.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Recent indexes loaded')
2021-08-30 12:48:13 +02:00
2023-08-05 13:36:56 +02:00
already_archived_uuids = {k.decode() for k in self.redis.hkeys('lookup_dirs_archived')}
self.logger.info(f'Already have {len(already_archived_uuids)} UUIDs archived')
# Initialize archives
2023-08-05 13:36:56 +02:00
for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True):
if self.shutdown_requested():
self.logger.warning('Shutdown requested, breaking.')
break
2023-08-05 13:36:56 +02:00
self.logger.debug(f'Loading {index}')
with index.open('r') as _f:
2023-08-05 20:47:08 +02:00
archived_uuids: Mapping = {uuid: index.parent / dirname
for uuid, dirname in csv.reader(_f)}
if archived_uuids:
2023-08-05 13:36:56 +02:00
self.logger.debug(f'{len(archived_uuids)} captures in directory.')
new_uuids = set(archived_uuids.keys()) - already_archived_uuids
if not new_uuids:
self.logger.debug('No new archived UUID to check.')
continue
self.logger.info(f'Loading {index}, {len(archived_uuids)} captures in directory, {len(new_uuids)} archived UUID to check.')
2023-08-04 15:03:58 +02:00
# NOTE: Only check if the directory exists if the UUID isn't in the cache.
2023-08-05 20:47:08 +02:00
self.redis.hset('lookup_dirs_archived',
mapping={uuid: str(dirname)
for uuid, dirname in archived_uuids.items()
if uuid in new_uuids and dirname.exists()})
2022-07-27 14:33:28 +02:00
self.redis.hdel('lookup_dirs', *archived_uuids.keys())
else:
index.unlink()
2022-07-27 14:33:28 +02:00
self.logger.info('Archived indexes loaded')
2021-08-23 15:14:08 +02:00
2021-08-20 17:46:22 +02:00
def main():
a = Archiver()
2021-08-30 12:48:13 +02:00
a.run(sleep_in_sec=3600)
2021-08-20 17:46:22 +02:00
if __name__ == '__main__':
main()