mirror of https://github.com/CIRCL/lookyloo
447 lines
21 KiB
Python
Executable File
447 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import gzip
|
|
import logging
|
|
import logging.config
|
|
import os
|
|
import random
|
|
import shutil
|
|
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
from redis import Redis
|
|
import s3fs # type: ignore[import-untyped]
|
|
|
|
from lookyloo.default import AbstractManager, get_config, get_homedir, get_socket_path, try_make_file
|
|
from lookyloo.helpers import get_captures_dir, is_locked, make_ts_from_dirname, make_dirs_list
|
|
|
|
logging.config.dictConfig(get_config('logging'))
|
|
|
|
|
|
class Archiver(AbstractManager):
|
|
|
|
def __init__(self, loglevel: int | None=None) -> None:
|
|
super().__init__(loglevel)
|
|
self.script_name = 'archiver'
|
|
self.redis = Redis(unix_socket_path=get_socket_path('cache'))
|
|
|
|
# make sure archived captures dir exists
|
|
self.archived_captures_dir = get_homedir() / 'archived_captures'
|
|
self.archived_captures_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._load_indexes()
|
|
|
|
# NOTE 2023-10-03: if we store the archived captures in s3fs (as it is the case in the CIRCL demo instance),
|
|
# listing the directories directly with s3fs-fuse causes I/O errors and is making the interface unusable.
|
|
# It is only a problem on directory listing and not when accessing a capture, so we only need to change the way
|
|
# we generate the index files.
|
|
# Other issue: the python module s3fs requires urllib < 2.0 (https://github.com/boto/botocore/issues/2926) so
|
|
# we cannot run the script creating the indexes in the same virtual environment as the rest of the project.
|
|
# The variable below will only be used to make sure we don't try to trigger a directory listing on a s3fs-fuse mount
|
|
# and we're going to create the index files from another script, in tools/create_archive_indexes.
|
|
self.archive_on_s3fs = False
|
|
s3fs_config = get_config('generic', 's3fs')
|
|
if s3fs_config.get('archive_on_s3fs'):
|
|
self.archive_on_s3fs = True
|
|
self.s3fs_client = s3fs.S3FileSystem(key=s3fs_config['config']['key'],
|
|
secret=s3fs_config['config']['secret'],
|
|
endpoint_url=s3fs_config['config']['endpoint_url'],
|
|
config_kwargs={'connect_timeout': 10,
|
|
'read_timeout': 900})
|
|
self.s3fs_bucket = s3fs_config['config']['bucket_name']
|
|
self.s3fs_client.clear_multipart_uploads(self.s3fs_bucket)
|
|
|
|
def _to_run_forever(self) -> None:
|
|
archiving_done = False
|
|
# NOTE: When we archive a big directory, moving *a lot* of files, expecially to MinIO
|
|
# can take a very long time. In order to avoid being stuck on the archiving, we break that in chunks
|
|
# but we also want to keep archiving without waiting 1h between each run.
|
|
while not archiving_done:
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
archiving_done = self._archive()
|
|
self._load_indexes()
|
|
if not archiving_done:
|
|
self._update_all_capture_indexes(recent_only=True)
|
|
if not self.shutdown_requested():
|
|
# This call takes a very long time on MinIO
|
|
self._update_all_capture_indexes()
|
|
|
|
def _update_index(self, root_dir: Path, *, s3fs_parent_dir: str | None=None) -> Path | None:
|
|
# returns a path to the index for the given directory
|
|
logmsg = f'Updating index for {root_dir}'
|
|
if s3fs_parent_dir:
|
|
logmsg = f'{logmsg} (s3fs)'
|
|
self.logger.info(logmsg)
|
|
|
|
# Flip that variable is we need to write the index
|
|
rewrite_index: bool = False
|
|
|
|
current_index: dict[str, str] = {}
|
|
current_sub_index: set[str] = set()
|
|
index_file = root_dir / 'index'
|
|
if index_file.exists():
|
|
try:
|
|
current_index = self.__load_index(index_file, ignore_sub=True)
|
|
except Exception as e:
|
|
# the index file is broken, it will be recreated.
|
|
self.logger.warning(f'Index for {root_dir} broken, recreating it: {e}')
|
|
|
|
# Check if we have sub_index entries, they're skipped from the call above.
|
|
with index_file.open() as _i:
|
|
for key, path_name in csv.reader(_i):
|
|
if key == 'sub_index':
|
|
current_sub_index.add(path_name)
|
|
|
|
if not current_index and not current_sub_index:
|
|
# The file is empty
|
|
index_file.unlink()
|
|
|
|
current_index_dirs: set[str] = set(current_index.values())
|
|
new_captures: set[Path] = set()
|
|
# Directories that are actually in the listing.
|
|
current_dirs: set[str] = set()
|
|
|
|
if s3fs_parent_dir:
|
|
s3fs_dir = '/'.join([s3fs_parent_dir, root_dir.name])
|
|
# the call below will spit out a mix of directories:
|
|
# * <datetime>
|
|
# * <day> (which contains a <datetime> directory)
|
|
for entry in self.s3fs_client.ls(s3fs_dir, detail=False, refresh=False):
|
|
if entry.endswith('/'):
|
|
# root directory
|
|
continue
|
|
if not self.s3fs_client.isdir(entry):
|
|
# index
|
|
continue
|
|
dir_on_disk = root_dir / entry.rsplit('/', 1)[-1]
|
|
if dir_on_disk.name.isdigit():
|
|
if self._update_index(dir_on_disk, s3fs_parent_dir=s3fs_dir):
|
|
# got a day directory that contains captures
|
|
if dir_on_disk.name not in current_sub_index:
|
|
# ... and it's not in the index
|
|
rewrite_index = True
|
|
current_sub_index.add(dir_on_disk.name)
|
|
self.logger.info(f'Adding sub index {dir_on_disk.name} to {index_file}')
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
else:
|
|
# got a capture
|
|
if len(self.s3fs_client.ls(entry, detail=False)) == 1:
|
|
# empty capture directory
|
|
self.s3fs_client.rm(entry)
|
|
continue
|
|
if str(dir_on_disk) not in current_index_dirs:
|
|
new_captures.add(dir_on_disk)
|
|
current_dirs.add(dir_on_disk.name)
|
|
current_dirs.add(str(dir_on_disk))
|
|
|
|
else:
|
|
with os.scandir(root_dir) as it:
|
|
for entry in it:
|
|
# can be index, sub directory (digit), or isoformat
|
|
if not entry.is_dir():
|
|
# index
|
|
continue
|
|
dir_on_disk = Path(entry)
|
|
if dir_on_disk.name.isdigit():
|
|
if self._update_index(dir_on_disk):
|
|
# got a day directory that contains captures
|
|
if dir_on_disk.name not in current_sub_index:
|
|
# ... and it's not in the index
|
|
rewrite_index = True
|
|
current_sub_index.add(dir_on_disk.name)
|
|
self.logger.info(f'Adding sub index {dir_on_disk.name} to {index_file}')
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
else:
|
|
# isoformat
|
|
if str(dir_on_disk) not in current_index_dirs:
|
|
new_captures.add(dir_on_disk)
|
|
current_dirs.add(dir_on_disk.name)
|
|
current_dirs.add(str(dir_on_disk))
|
|
|
|
# Check if all the directories in current_dirs (that we got by listing the directory)
|
|
# are the same as the one in the index. If they're not, we pop the UUID before writing the index
|
|
if non_existing_dirs := current_index_dirs - current_dirs:
|
|
self.logger.info(f'Got {len(non_existing_dirs)} non existing dirs in {root_dir}, removing them from the index.')
|
|
current_index = {uuid: Path(path).name for uuid, path in current_index.items() if path not in non_existing_dirs}
|
|
rewrite_index = True
|
|
|
|
# Make sure all the sub_index directories exist on the disk
|
|
if old_subindexes := {sub_index for sub_index in current_sub_index if sub_index not in current_dirs}:
|
|
self.logger.warning(f'Sub index {", ".join(old_subindexes)} do not exist, removing them from the index.')
|
|
rewrite_index = True
|
|
current_sub_index -= old_subindexes
|
|
|
|
if not current_index and not new_captures and not current_sub_index:
|
|
# No captures at all in the directory and subdirectories, quitting
|
|
logmsg = f'No captures in {root_dir}'
|
|
if s3fs_parent_dir:
|
|
logmsg = f'{logmsg} (s3fs directory)'
|
|
self.logger.info(logmsg)
|
|
index_file.unlink(missing_ok=True)
|
|
root_dir.rmdir()
|
|
return None
|
|
|
|
if new_captures:
|
|
self.logger.info(f'{len(new_captures)} new captures in {root_dir}.')
|
|
|
|
for capture_dir in new_captures:
|
|
# capture_dir_name is *only* the isoformat of the capture.
|
|
# This directory will either be directly in the month directory (old format)
|
|
# or in the day directory (new format)
|
|
try:
|
|
if not next(capture_dir.iterdir(), None):
|
|
self.logger.warning(f'{capture_dir} is empty, removing.')
|
|
capture_dir.rmdir()
|
|
continue
|
|
except FileNotFoundError:
|
|
self.logger.warning(f'{capture_dir} does not exists.')
|
|
continue
|
|
|
|
try:
|
|
uuid_file = capture_dir / 'uuid'
|
|
if not uuid_file.exists():
|
|
self.logger.warning(f'No UUID file in {capture_dir}.')
|
|
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
|
continue
|
|
|
|
with uuid_file.open() as _f:
|
|
uuid = _f.read().strip()
|
|
if not uuid:
|
|
self.logger.warning(f'{uuid_file} is empty')
|
|
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
|
continue
|
|
|
|
if uuid in current_index:
|
|
self.logger.warning(f'Duplicate UUID ({uuid}) in {current_index[uuid]} and {uuid_file.parent.name}')
|
|
shutil.move(str(capture_dir), str(get_homedir() / 'discarded_captures'))
|
|
continue
|
|
except OSError as e:
|
|
self.logger.warning(f'Error when discarding capture {capture_dir}: {e}')
|
|
continue
|
|
rewrite_index = True
|
|
current_index[uuid] = capture_dir.name
|
|
|
|
if not current_index and not current_sub_index:
|
|
# The directory has been archived. It is probably safe to unlink, but
|
|
# if it's not, we will lose a whole buch of captures. Moving instead for safety.
|
|
shutil.move(str(root_dir), str(get_homedir() / 'discarded_captures' / root_dir.parent / root_dir.name))
|
|
self.logger.warning(f'Nothing to index in {root_dir}')
|
|
return None
|
|
|
|
if rewrite_index:
|
|
self.logger.info(f'Writing index {index_file}.')
|
|
with index_file.open('w') as _f:
|
|
index_writer = csv.writer(_f)
|
|
for uuid, dirname in current_index.items():
|
|
index_writer.writerow([uuid, Path(dirname).name])
|
|
for sub_path in sorted(current_sub_index):
|
|
# Only keep the dir name
|
|
index_writer.writerow(['sub_index', sub_path])
|
|
|
|
return index_file
|
|
|
|
def _update_all_capture_indexes(self, *, recent_only: bool=False) -> None:
|
|
'''Run that after the captures are in the proper directories'''
|
|
# Recent captures
|
|
self.logger.info('Update recent indexes')
|
|
# NOTE: the call below will check the existence of every path ending with `uuid`,
|
|
# it is extremely ineficient as we have many hundred of thusands of them
|
|
# and we only care about the root directory (ex: 2023/06)
|
|
# directories_to_index = {capture_dir.parent.parent
|
|
# for capture_dir in get_captures_dir().glob('*/*/*/uuid')}
|
|
for directory_to_index in make_dirs_list(get_captures_dir()):
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
self._update_index(directory_to_index)
|
|
self.logger.info('Recent indexes updated')
|
|
if recent_only:
|
|
self.logger.info('Only updating recent indexes.')
|
|
return
|
|
|
|
# Archived captures
|
|
self.logger.info('Update archives indexes')
|
|
for directory_to_index in make_dirs_list(self.archived_captures_dir):
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
year = directory_to_index.parent.name
|
|
if self.archive_on_s3fs:
|
|
# Updating the indexes can take a while, just run this call once in N calls
|
|
if random.randrange(20) == 0:
|
|
self._update_index(directory_to_index,
|
|
s3fs_parent_dir='/'.join([self.s3fs_bucket, year]))
|
|
else:
|
|
self._update_index(directory_to_index)
|
|
self.logger.info('Archived indexes updated')
|
|
|
|
def __archive_single_capture(self, capture_path: Path) -> Path:
|
|
capture_timestamp = make_ts_from_dirname(capture_path.name)
|
|
dest_dir = self.archived_captures_dir / str(capture_timestamp.year) / f'{capture_timestamp.month:02}' / f'{capture_timestamp.day:02}'
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
# If the HAR isn't archived yet, archive it before copy
|
|
for har in capture_path.glob('*.har'):
|
|
with har.open('rb') as f_in:
|
|
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
har.unlink()
|
|
|
|
# read uuid before copying over to (maybe) S3
|
|
with (capture_path / 'uuid').open() as _uuid:
|
|
uuid = _uuid.read().strip()
|
|
|
|
(capture_path / 'tree.pickle').unlink(missing_ok=True)
|
|
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
|
|
shutil.move(str(capture_path), str(dest_dir))
|
|
# Update index in parent
|
|
with (dest_dir / 'index').open('a') as _index:
|
|
index_writer = csv.writer(_index)
|
|
index_writer.writerow([uuid, capture_path.name])
|
|
# Update redis cache all at once.
|
|
p = self.redis.pipeline()
|
|
p.delete(str(capture_path))
|
|
p.hset('lookup_dirs_archived', mapping={uuid: str(dest_dir / capture_path.name)})
|
|
p.hdel('lookup_dirs', uuid)
|
|
p.execute()
|
|
|
|
return dest_dir / capture_path.name
|
|
|
|
def _archive(self) -> bool:
|
|
archive_interval = timedelta(days=get_config('generic', 'archive'))
|
|
cut_time = (datetime.now() - archive_interval)
|
|
self.logger.info(f'Archiving all captures older than {cut_time.isoformat()}.')
|
|
archiving_done = True
|
|
|
|
# Let's use the indexes instead of listing directories to find what we want to archive.
|
|
capture_breakpoint = 300
|
|
for u, p in self.redis.hscan_iter('lookup_dirs'):
|
|
uuid = u.decode()
|
|
path = p.decode()
|
|
if capture_breakpoint <= 0:
|
|
# Break and restart later
|
|
self.logger.info('Archived many captures will keep going later.')
|
|
archiving_done = False
|
|
break
|
|
elif capture_breakpoint % 10:
|
|
# Just check if we requested a shutdown.
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
|
|
capture_time_isoformat = os.path.basename(path)
|
|
if not capture_time_isoformat:
|
|
continue
|
|
try:
|
|
capture_time = make_ts_from_dirname(capture_time_isoformat)
|
|
except ValueError:
|
|
self.logger.warning(f'Invalid capture time for {uuid}: {capture_time_isoformat}')
|
|
self.redis.hdel('lookup_dirs', uuid)
|
|
continue
|
|
if capture_time >= cut_time:
|
|
continue
|
|
# archive the capture.
|
|
capture_path = Path(path)
|
|
if not capture_path.exists():
|
|
self.redis.hdel('lookup_dirs', uuid)
|
|
if not self.redis.hexists('lookup_dirs_archived', uuid):
|
|
self.logger.warning(f'Missing capture directory for {uuid}, unable to archive {capture_path}')
|
|
continue
|
|
lock_file = capture_path / 'lock'
|
|
if try_make_file(lock_file):
|
|
# Lock created, we can proceede
|
|
with lock_file.open('w') as f:
|
|
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
|
|
else:
|
|
# The directory is locked because a pickle is being created, try again later
|
|
if is_locked(capture_path):
|
|
# call this method to remove dead locks
|
|
continue
|
|
|
|
try:
|
|
new_capture_path = self.__archive_single_capture(capture_path)
|
|
capture_breakpoint -= 1
|
|
except OSError:
|
|
self.logger.exception(f'Unable to archive capture {capture_path}')
|
|
(capture_path / 'lock').unlink(missing_ok=True)
|
|
except Exception:
|
|
self.logger.exception(f'Critical exception while archiving {capture_path}')
|
|
(capture_path / 'lock').unlink(missing_ok=True)
|
|
else:
|
|
(new_capture_path / 'lock').unlink(missing_ok=True)
|
|
|
|
if archiving_done:
|
|
self.logger.info('Archiving done.')
|
|
return archiving_done
|
|
|
|
def __load_index(self, index_path: Path, ignore_sub: bool=False) -> dict[str, str]:
|
|
'''Loads the given index file and all the subsequent ones if they exist'''
|
|
# NOTE: this method is used on recent and archived captures, it must never trigger a dir listing
|
|
indexed_captures = {}
|
|
with index_path.open() as _i:
|
|
for key, path_name in csv.reader(_i):
|
|
if key == 'sub_index' and ignore_sub:
|
|
# We're not interested in the sub indexes and don't want them to land in indexed_captures
|
|
continue
|
|
elif key == 'sub_index' and not ignore_sub:
|
|
sub_index_file = index_path.parent / path_name / 'index'
|
|
if sub_index_file.exists():
|
|
indexed_captures.update(self.__load_index(sub_index_file))
|
|
else:
|
|
self.logger.warning(f'Missing sub index file: {sub_index_file}')
|
|
else:
|
|
# NOTE: we were initially checking if that path exists,
|
|
# but that's something we can do when we update the indexes instead.
|
|
# And a missing capture directory is already handled at rendering
|
|
indexed_captures[key] = str(index_path.parent / path_name)
|
|
return indexed_captures
|
|
|
|
def _load_indexes(self) -> None:
|
|
# capture_dir / Year / Month / index <- should always exists. If not, created by _update_index
|
|
# Initialize recent index
|
|
for index in sorted(get_captures_dir().glob('*/*/index'), reverse=True):
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
|
|
self.logger.debug(f'Loading {index}')
|
|
if recent_uuids := self.__load_index(index):
|
|
self.logger.debug(f'{len(recent_uuids)} captures in directory {index.parent}.')
|
|
self.redis.hset('lookup_dirs', mapping=recent_uuids) # type: ignore[arg-type]
|
|
else:
|
|
index.unlink()
|
|
total_recent_captures = self.redis.hlen('lookup_dirs')
|
|
self.logger.info(f'Recent indexes loaded: {total_recent_captures} entries.')
|
|
|
|
# Initialize archives index
|
|
for index in sorted(self.archived_captures_dir.glob('*/*/index'), reverse=True):
|
|
if self.shutdown_requested():
|
|
self.logger.warning('Shutdown requested, breaking.')
|
|
break
|
|
self.logger.debug(f'Loading {index}')
|
|
if archived_uuids := self.__load_index(index):
|
|
self.logger.debug(f'{len(archived_uuids)} captures in directory {index.parent}.')
|
|
self.redis.hset('lookup_dirs_archived', mapping=archived_uuids) # type: ignore[arg-type]
|
|
else:
|
|
index.unlink()
|
|
total_archived_captures = self.redis.hlen('lookup_dirs_archived')
|
|
self.logger.info(f'Archived indexes loaded: {total_archived_captures} entries.')
|
|
|
|
|
|
def main() -> None:
|
|
a = Archiver()
|
|
a.run(sleep_in_sec=3600)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|