fix: Improve getting MISP instances

multipleMISPS
Raphaël Vinot 2023-08-29 17:30:45 +02:00
parent 5ed6a18b40
commit 00bf8fb551
2 changed files with 24 additions and 14 deletions

View File

@ -52,7 +52,7 @@ from .helpers import (get_captures_dir, get_email_template,
uniq_domains, ParsedUserAgent, load_cookies, UserAgents, uniq_domains, ParsedUserAgent, load_cookies, UserAgents,
get_useragent_for_requests) get_useragent_for_requests)
from .indexing import Indexing from .indexing import Indexing
from .modules import (MISPs, MISP, PhishingInitiative, UniversalWhois, from .modules import (MISPs, PhishingInitiative, UniversalWhois,
UrlScan, VirusTotal, Phishtank, Hashlookup, UrlScan, VirusTotal, Phishtank, Hashlookup,
RiskIQ, RiskIQError, Pandora, URLhaus) RiskIQ, RiskIQError, Pandora, URLhaus)
@ -1203,16 +1203,14 @@ class Lookyloo():
return [event] return [event]
def get_misp_instance(self, instance_name: Optional[str]=None) -> MISP:
if instance_name:
if misp := self.misps.get(instance_name):
return misp
self.logger.warning(f'Unable to connect to MISP Instance {instance_name}, falling back to default.')
return self.misps.default_misp
def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: Optional[str]=None) -> Optional[Tuple[Dict[str, Set[str]], str]]: def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: Optional[str]=None) -> Optional[Tuple[Dict[str, Set[str]], str]]:
misp = self.get_misp_instance(instance_name) if instance_name is None:
misp = self.misps.default_misp
elif self.misps.get(instance_name) is not None:
misp = self.misps[instance_name]
else:
self.logger.warning(f'MISP instance "{instance_name}" does not exists.')
return None
if not misp.available: if not misp.available:
return None return None

View File

@ -192,8 +192,15 @@ class MISPPush(Resource):
def get(self, capture_uuid: str, instance_name: Optional[str]=None): def get(self, capture_uuid: str, instance_name: Optional[str]=None):
with_parents = True if request.args.get('with_parents') else False with_parents = True if request.args.get('with_parents') else False
allow_duplicates = True if request.args.get('allow_duplicates') else False allow_duplicates = True if request.args.get('allow_duplicates') else False
if instance_name is None:
misp = lookyloo.misps.default_misp
elif lookyloo.misps.get(instance_name) is not None:
misp = lookyloo.misps[instance_name]
else:
return {'error': f'MISP instance "{instance_name}" does not exists.'}
to_return: Dict = {} to_return: Dict = {}
misp = self.get_misp_instance(instance_name)
if not misp.available: if not misp.available:
to_return['error'] = 'MISP module not available.' to_return['error'] = 'MISP module not available.'
elif not misp.enable_push: elif not misp.enable_push:
@ -209,7 +216,7 @@ class MISPPush(Resource):
else: else:
events_to_return = [] events_to_return = []
for e in new_events: for e in new_events:
events_to_return.append(e.to_json(indent=2)) events_to_return.append(json.loads(e.to_json()))
return events_to_return return events_to_return
return to_return return to_return
@ -219,9 +226,14 @@ class MISPPush(Resource):
parameters: Dict = request.get_json(force=True) parameters: Dict = request.get_json(force=True)
with_parents = True if parameters.get('with_parents') else False with_parents = True if parameters.get('with_parents') else False
allow_duplicates = True if parameters.get('allow_duplicates') else False allow_duplicates = True if parameters.get('allow_duplicates') else False
if instance_name is None:
misp = lookyloo.misps.default_misp
elif lookyloo.misps.get(instance_name) is not None:
misp = lookyloo.misps[instance_name]
else:
return {'error': f'MISP instance "{instance_name}" does not exists.'}
to_return: Dict = {} to_return: Dict = {}
misp = self.get_misp_instance(instance_name)
if not misp.available: if not misp.available:
to_return['error'] = 'MISP module not available.' to_return['error'] = 'MISP module not available.'
elif not misp.enable_push: elif not misp.enable_push:
@ -237,7 +249,7 @@ class MISPPush(Resource):
else: else:
events_to_return = [] events_to_return = []
for e in new_events: for e in new_events:
events_to_return.append(e.to_json(indent=2)) events_to_return.append(json.loads(e.to_json()))
return events_to_return return events_to_return
return to_return return to_return