chg: normalize output to get categories, properly re-trigger indexes

pull/929/head
Raphaël Vinot 2024-07-23 18:45:09 +02:00
parent 13651dcfdb
commit 11fcc9dd8b
4 changed files with 27 additions and 25 deletions

View File

@ -11,7 +11,7 @@ import re
import time
from datetime import datetime, timedelta, date
from functools import lru_cache
from functools import lru_cache, cache
from importlib.metadata import version
from io import BufferedIOBase
from pathlib import Path
@ -31,6 +31,7 @@ from werkzeug.user_agent import UserAgent
from werkzeug.utils import cached_property
from .default import get_homedir, safe_create_dir, get_config, LookylooException
from .indexing import Indexing
# from .exceptions import InvalidCaptureSetting
@ -436,3 +437,12 @@ def load_user_config(username: str) -> dict[str, Any] | None:
return None
with user_config_path.open() as _c:
return json.load(_c)
@cache
def get_indexing(full: bool=False) -> Indexing:
if not get_config('generic', 'index_everything'):
return Indexing()
if full:
return Indexing(full_index=True)
return Indexing()

View File

@ -59,7 +59,8 @@ from .helpers import (get_captures_dir, get_email_template,
get_resources_hashes, get_taxonomies,
uniq_domains, ParsedUserAgent, UserAgents,
get_useragent_for_requests, load_takedown_filters,
CaptureSettings, load_user_config
CaptureSettings, load_user_config,
get_indexing
)
from .modules import (MISPs, PhishingInitiative, UniversalWhois,
UrlScan, VirusTotal, Phishtank, Hashlookup,
@ -335,6 +336,9 @@ class Lookyloo():
current_categories.add(category)
with categ_file.open('w') as f:
f.writelines(f'{t}\n' for t in current_categories)
get_indexing().reindex_categories_capture(capture_uuid)
if get_config('generic', 'index_everything'):
get_indexing(full=True).reindex_categories_capture(capture_uuid)
def uncategorize_capture(self, capture_uuid: str, /, category: str) -> None:
'''Remove a category (MISP Taxonomy tag) from a capture.'''
@ -351,6 +355,9 @@ class Lookyloo():
current_categories.remove(category)
with categ_file.open('w') as f:
f.writelines(f'{t}\n' for t in current_categories)
get_indexing().reindex_categories_capture(capture_uuid)
if get_config('generic', 'index_everything'):
get_indexing(full=True).reindex_categories_capture(capture_uuid)
def trigger_modules(self, capture_uuid: str, /, force: bool=False, auto_trigger: bool=False) -> dict[str, Any]:
'''Launch the 3rd party modules on a capture.

View File

@ -694,7 +694,6 @@ def categories_capture(tree_uuid: str, query: str) -> str | WerkzeugResponse | R
categories.append(category)
for category in categories:
lookyloo.categorize_capture(tree_uuid, category)
get_indexing(flask_login.current_user).reindex_categories_capture(tree_uuid)
if 'query' in request.form and request.form.get('query', '').strip():
matching_categories = {}
t = get_taxonomies()
@ -714,7 +713,6 @@ def uncategorize_capture(tree_uuid: str, category: str) -> str | WerkzeugRespons
if not enable_categorization:
return jsonify({'response': 'Categorization not enabled.'})
lookyloo.uncategorize_capture(tree_uuid, category)
get_indexing(flask_login.current_user).reindex_categories_capture(tree_uuid)
return jsonify({'response': f'{category} successfully removed from {tree_uuid}'})
@ -725,7 +723,6 @@ def categorize_capture(tree_uuid: str, category: str) -> str | WerkzeugResponse
if not enable_categorization:
return jsonify({'response': 'Categorization not enabled.'})
lookyloo.categorize_capture(tree_uuid, category)
get_indexing(flask_login.current_user).reindex_categories_capture(tree_uuid)
return jsonify({'response': f'{category} successfully added to {tree_uuid}'})

View File

@ -802,23 +802,11 @@ class RecentCaptures(Resource): # type: ignore[misc]
params={'category': 'The category according to which the uuids are to be returned.'},
required=False)
class CategoriesCaptures(Resource): # type: ignore[misc]
def get(self, category: str | None=None) -> list[str] | dict[str, Any]:
categories = ['legitimate', 'parking-page', 'default-page', 'insti_usertution', 'captcha',
'authentication-form', 'adult-content', 'shop', 'malicious', 'clone', 'phishing', 'unclear']
if not category:
all_categorized_uuids: dict[str, set[str]] = {}
for c in categories:
one_categorie = get_indexing(flask_login.current_user).get_captures_category(c)
if not one_categorie:
continue
for uuid in one_categorie:
if uuid not in all_categorized_uuids:
all_categorized_uuids[uuid] = {c}
else:
all_categorized_uuids[uuid].add(c)
all_categorized_uuids_list = {uuid: list(categories) for uuid, categories in all_categorized_uuids.items()}
return all_categorized_uuids_list
if not category in categories:
return {'error': f'Invalid category: {category}'}
return list(get_indexing(flask_login.current_user).get_captures_category(category))
def get(self, category: str | None=None) -> list[str] | dict[str, list[str]] | tuple[dict[str, str], int]:
existing_categories = get_indexing(flask_login.current_user).categories
if category:
if category not in existing_categories:
return {'error': f'Invalid category: {category}, must be in {", ".join(existing_categories)}.'}, 400
return list(get_indexing(flask_login.current_user).get_captures_category(category))
return {c: list(get_indexing(flask_login.current_user).get_captures_category(c))
for c in existing_categories}