mirror of https://github.com/CIRCL/lookyloo
chg: Avoid setting the lock and quitting, cleanup
parent
1fcd4cfa3f
commit
d4b9ca13af
|
@ -202,26 +202,13 @@ class Archiver(AbstractManager):
|
||||||
self.logger.info('Nothing to archive.')
|
self.logger.info('Nothing to archive.')
|
||||||
return
|
return
|
||||||
|
|
||||||
p = self.redis.pipeline()
|
|
||||||
for year, month_captures in to_archive.items():
|
for year, month_captures in to_archive.items():
|
||||||
for month, captures in month_captures.items():
|
for month, captures in month_captures.items():
|
||||||
dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
|
dest_dir = self.archived_captures_dir / str(year) / f'{month:02}'
|
||||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||||
capture_breakpoint = 100
|
capture_breakpoint = 300
|
||||||
self.logger.info(f'{len(captures)} captures to archive in {year}-{month}.')
|
self.logger.info(f'{len(captures)} captures to archive in {year}-{month}.')
|
||||||
locks_to_clear: List[Path] = []
|
|
||||||
for capture_path in captures:
|
for capture_path in captures:
|
||||||
lock_file = capture_path / 'lock'
|
|
||||||
if try_make_file(lock_file):
|
|
||||||
# Lock created, we can process
|
|
||||||
with lock_file.open('w') as f:
|
|
||||||
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
|
|
||||||
else:
|
|
||||||
# The directory is locked because a pickle is being created, try again later
|
|
||||||
if is_locked(capture_path):
|
|
||||||
# call this method to remove dead locks
|
|
||||||
continue
|
|
||||||
capture_breakpoint -= 1
|
|
||||||
if capture_breakpoint <= 0:
|
if capture_breakpoint <= 0:
|
||||||
# Break and restart later
|
# Break and restart later
|
||||||
self.logger.info(f'Archived many captures in {year}-{month}, will keep going later.')
|
self.logger.info(f'Archived many captures in {year}-{month}, will keep going later.')
|
||||||
|
@ -232,30 +219,41 @@ class Archiver(AbstractManager):
|
||||||
if self.shutdown_requested():
|
if self.shutdown_requested():
|
||||||
self.logger.warning('Shutdown requested, breaking.')
|
self.logger.warning('Shutdown requested, breaking.')
|
||||||
break
|
break
|
||||||
|
|
||||||
|
lock_file = capture_path / 'lock'
|
||||||
|
if try_make_file(lock_file):
|
||||||
|
# Lock created, we can proceede
|
||||||
|
with lock_file.open('w') as f:
|
||||||
|
f.write(f"{datetime.now().isoformat()};{os.getpid()}")
|
||||||
|
else:
|
||||||
|
# The directory is locked because a pickle is being created, try again later
|
||||||
|
if is_locked(capture_path):
|
||||||
|
# call this method to remove dead locks
|
||||||
|
continue
|
||||||
|
|
||||||
|
capture_breakpoint -= 1
|
||||||
# If the HAR isn't archived yet, archive it before copy
|
# If the HAR isn't archived yet, archive it before copy
|
||||||
for har in capture_path.glob('*.har'):
|
for har in capture_path.glob('*.har'):
|
||||||
with har.open('rb') as f_in:
|
with har.open('rb') as f_in:
|
||||||
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
with gzip.open(f'{har}.gz', 'wb') as f_out:
|
||||||
shutil.copyfileobj(f_in, f_out)
|
shutil.copyfileobj(f_in, f_out)
|
||||||
har.unlink()
|
har.unlink()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(capture_path / 'tree.pickle').unlink(missing_ok=True)
|
(capture_path / 'tree.pickle').unlink(missing_ok=True)
|
||||||
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
|
(capture_path / 'tree.pickle.gz').unlink(missing_ok=True)
|
||||||
shutil.move(str(capture_path), str(dest_dir))
|
shutil.move(str(capture_path), str(dest_dir))
|
||||||
p.delete(str(capture_path))
|
self.redis.delete(str(capture_path))
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
self.logger.warning(f'Unable to archive capture: {e}')
|
self.logger.warning(f'Unable to archive capture: {e}')
|
||||||
finally:
|
finally:
|
||||||
locks_to_clear.append(dest_dir / capture_path.name / 'lock')
|
(dest_dir / capture_path.name / 'lock').unlink(missing_ok=True)
|
||||||
# we archived some captures, update relevant index
|
# we archived some captures, update relevant index
|
||||||
self._update_index(dest_dir)
|
self._update_index(dest_dir)
|
||||||
for lock in locks_to_clear:
|
|
||||||
lock.unlink(missing_ok=True)
|
|
||||||
if not archiving_done:
|
if not archiving_done:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
p.execute()
|
|
||||||
if archiving_done:
|
if archiving_done:
|
||||||
self.logger.info('Archiving done.')
|
self.logger.info('Archiving done.')
|
||||||
return archiving_done
|
return archiving_done
|
||||||
|
|
Loading…
Reference in New Issue