new: Allow user accessible MISP servers

Related  #1021
dependabot/pip/publicsuffixlist-1.0.2.20241224
Raphaël Vinot 2024-12-23 18:47:29 +01:00
parent e189d6f679
commit 416eba5beb
No known key found for this signature in database
GPG Key ID: 32E4E1C133B3792F
7 changed files with 158 additions and 91 deletions

View File

@ -45,7 +45,7 @@ class BackgroundBuildCaptures(AbstractManager):
# could be an empty file.
settings = json.loads(ar)
try:
self.lookyloo.send_mail(capture_uuid, email=settings.get('email', ''),
self.lookyloo.send_mail(capture_uuid, as_admin=True, email=settings.get('email', ''),
comment=settings.get('comment'))
(path / 'auto_report').unlink()
except Exception as e:

View File

@ -882,7 +882,7 @@ class Lookyloo():
return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}"
def send_mail(self, capture_uuid: str, /, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]:
def send_mail(self, capture_uuid: str, /, as_admin: bool, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]:
'''Send an email notification regarding a specific capture'''
if not get_config('generic', 'enable_mail_notification'):
return {"error": "Unable to send mail: mail notification disabled"}
@ -916,7 +916,9 @@ class Lookyloo():
self.logger.info('There are no MISP instances available for a lookup.')
else:
for instance_name in self.misps.keys():
if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name):
if occurrences := self.get_misp_occurrences(capture_uuid,
as_admin=as_admin,
instance_name=instance_name):
elements, misp_url = occurrences
for event_id, attributes in elements.items():
for value, ts in attributes:
@ -1225,7 +1227,8 @@ class Lookyloo():
return [event]
def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None:
def get_misp_occurrences(self, capture_uuid: str, /, as_admin: bool,
*, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None:
if instance_name is None:
misp = self.misps.default_misp
elif self.misps.get(instance_name) is not None:
@ -1244,7 +1247,7 @@ class Lookyloo():
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
for node in nodes_to_lookup:
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid), as_admin=as_admin)
for event_id, values in hits.items():
if not isinstance(event_id, int) or not isinstance(values, set):
continue

View File

@ -57,6 +57,20 @@ class MISPs(Mapping, AbstractModule): # type: ignore[type-arg]
return True
@property
def has_public_misp(self) -> bool:
return not all(misp.admin_only for misp in self.__misps.values())
def has_lookup(self, as_admin: bool) -> bool:
if as_admin:
return any(misp.enable_lookup for misp in self.__misps.values())
return any(misp.enable_lookup and not misp.admin_only for misp in self.__misps.values())
def has_push(self, as_admin: bool) -> bool:
if as_admin:
return any(misp.enable_push for misp in self.__misps.values())
return any(misp.enable_push and not misp.admin_only for misp in self.__misps.values())
def __getitem__(self, name: str) -> MISP:
return self.__misps[name]
@ -200,7 +214,7 @@ class MISP(AbstractModule):
existing_uuid_to_extend = None
for event in events:
if not allow_duplicates:
existing_event = self.get_existing_event(event.attributes[0].value)
existing_event = self.__get_existing_event(event.attributes[0].value)
if existing_event:
existing_uuid_to_extend = existing_event.uuid
continue
@ -215,38 +229,44 @@ class MISP(AbstractModule):
events_to_push.append(event)
return events_to_push
def push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: bool=False, auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]:
def push(self, to_push: list[MISPEvent] | MISPEvent, as_admin: bool, *, allow_duplicates: bool=False,
auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]:
if not self.available:
return {'error': 'Module not available.'}
if not self.enable_push:
return {'error': 'Push not enabled.'}
if self.admin_only and not as_admin:
return {'error': 'Admin only module, cannot push.'}
if auto_publish is None:
auto_publish = self.auto_publish
if self.available and self.enable_push:
events = self._prepare_push(to_push, allow_duplicates, auto_publish)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, dict):
return {'error': events}
to_return = []
for event in events:
try:
# NOTE: POST the event as published publishes inline, which can tak a long time.
# Here, we POST as not published, and trigger the publishing in a second call.
if hasattr(event, 'published'):
background_publish = event.published
else:
background_publish = False
if background_publish:
event.published = False
new_event = self.client.add_event(event, pythonify=True)
if background_publish and isinstance(new_event, MISPEvent):
self.client.publish(new_event)
except requests.exceptions.ReadTimeout:
return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'}
if isinstance(new_event, MISPEvent):
to_return.append(new_event)
events = self._prepare_push(to_push, allow_duplicates, auto_publish)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, dict):
return {'error': events}
to_return = []
for event in events:
try:
# NOTE: POST the event as published publishes inline, which can tak a long time.
# Here, we POST as not published, and trigger the publishing in a second call.
if hasattr(event, 'published'):
background_publish = event.published
else:
return {'error': new_event}
return to_return
else:
return {'error': 'Module not available or push not enabled.'}
background_publish = False
if background_publish:
event.published = False
new_event = self.client.add_event(event, pythonify=True)
if background_publish and isinstance(new_event, MISPEvent):
self.client.publish(new_event)
except requests.exceptions.ReadTimeout:
return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'}
if isinstance(new_event, MISPEvent):
to_return.append(new_event)
else:
return {'error': new_event}
return to_return
def get_existing_event_url(self, permaurl: str) -> str | None:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
@ -255,7 +275,7 @@ class MISP(AbstractModule):
url = f'{self.client.root_url}/events/{attributes[0].event_id}'
return url
def get_existing_event(self, permaurl: str) -> MISPEvent | None:
def __get_existing_event(self, permaurl: str) -> MISPEvent | None:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
if not attributes or not isinstance(attributes, list) or not isinstance(attributes[0], MISPAttribute):
return None
@ -264,36 +284,40 @@ class MISP(AbstractModule):
return event
return None
def lookup(self, node: URLNode, hostnode: HostNode) -> dict[int | str, str | set[tuple[str, datetime]]]:
if self.available and self.enable_lookup:
tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
to_lookup = [node.name, hostnode.name, f'{domain}.{tld}']
if hasattr(hostnode, 'resolved_ips'):
if 'v4' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v4']
if 'v6' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v6']
if hasattr(hostnode, 'cnames'):
to_lookup += hostnode.cnames
if not node.empty_response:
to_lookup.append(node.body_hash)
if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list):
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
a: MISPAttribute
for a in attributes: # type: ignore[assignment]
if isinstance(a.value, str):
# a.timestamp is always a datetime in this situation
to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type]
else:
# This shouldn't happen (?)
self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}')
return to_return # type: ignore[return-value]
else:
# The request returned an error
return attributes # type: ignore[return-value]
return {'info': 'No hits.'}
else:
return {'error': 'Module not available or lookup not enabled.'}
def lookup(self, node: URLNode, hostnode: HostNode, as_admin: bool) -> dict[int | str, str | set[tuple[str, datetime]]]:
if not self.available:
return {'error': 'Module not available.'}
if not self.enable_lookup:
return {'error': 'Lookup not enabled.'}
if self.admin_only and not as_admin:
return {'error': 'Admin only module, cannot lookup.'}
tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
to_lookup = [node.name, hostnode.name, f'{domain}.{tld}']
if hasattr(hostnode, 'resolved_ips'):
if 'v4' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v4']
if 'v6' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v6']
if hasattr(hostnode, 'cnames'):
to_lookup += hostnode.cnames
if not node.empty_response:
to_lookup.append(node.body_hash)
if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list):
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
a: MISPAttribute
for a in attributes: # type: ignore[assignment]
if isinstance(a.value, str):
# a.timestamp is always a datetime in this situation
to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type]
else:
# This shouldn't happen (?)
self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}')
return to_return # type: ignore[return-value]
else:
# The request returned an error
return attributes # type: ignore[return-value]
return {'info': 'No hits.'}

View File

@ -725,27 +725,47 @@ def stats(tree_uuid: str) -> str:
@app.route('/tree/<string:tree_uuid>/misp_lookup', methods=['GET'])
@flask_login.login_required # type: ignore[misc]
def web_misp_lookup_view(tree_uuid: str) -> str | WerkzeugResponse | Response:
if not lookyloo.misps.available:
flash('There are no MISP instances available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
as_admin = flask_login.current_user.is_authenticated
if not as_admin and not lookyloo.misps.has_public_misp:
flash('You need to be authenticated to search on MISP.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
if not as_admin and lookyloo.misps.default_misp.admin_only:
current_misp = None
else:
current_misp = lookyloo.misps.default_instance
misps_occurrences = {}
for instance_name in lookyloo.misps.keys():
if occurrences := lookyloo.get_misp_occurrences(tree_uuid, instance_name=instance_name):
for instance_name, instance in lookyloo.misps.items():
if instance.admin_only and not as_admin:
continue
if not current_misp:
# Pick the first one we can
current_misp = instance_name
if occurrences := lookyloo.get_misp_occurrences(tree_uuid,
as_admin=as_admin,
instance_name=instance_name):
misps_occurrences[instance_name] = occurrences
return render_template('misp_lookup.html', uuid=tree_uuid,
current_misp=lookyloo.misps.default_instance,
current_misp=current_misp,
misps_occurrences=misps_occurrences)
@app.route('/tree/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
@flask_login.login_required # type: ignore[misc]
def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | None:
def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response:
if not lookyloo.misps.available:
flash('There are no MISP instances available.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
as_admin = flask_login.current_user.is_authenticated
if not as_admin and not lookyloo.misps.has_public_misp:
flash('You need to be authenticated to push to MISP.', 'error')
return redirect(url_for('tree', tree_uuid=tree_uuid))
event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error')
@ -754,7 +774,18 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No
if request.method == 'GET':
# Initialize settings that will be displayed on the template
misp_instances_settings = {}
if not as_admin and lookyloo.misps.default_misp.admin_only:
current_misp = None
else:
current_misp = lookyloo.misps.default_instance
for name, instance in lookyloo.misps.items():
if instance.admin_only and not as_admin:
continue
if not current_misp:
# Pick the first one we can
current_misp = name
# the 1st attribute in the event is the link to lookyloo
misp_instances_settings[name] = {
'default_tags': instance.default_tags,
@ -766,13 +797,13 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No
cache = lookyloo.capture_cache(tree_uuid)
return render_template('misp_push_view.html',
current_misp=lookyloo.misps.default_instance,
current_misp=current_misp,
tree_uuid=tree_uuid,
event=event[0],
misp_instances_settings=misp_instances_settings,
has_parent=True if cache and cache.parent else False)
elif request.method == 'POST':
else:
# event is a MISPEvent at this point
misp_instance_name = request.form.get('misp_instance_name')
if not misp_instance_name or misp_instance_name not in lookyloo.misps:
@ -808,8 +839,10 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No
events[-1].info = request.form.get('event_info')
try:
new_events = misp.push(events, True if request.form.get('force_push') else False,
True if request.form.get('auto_publish') else False)
new_events = misp.push(events, as_admin=as_admin,
allow_duplicates=True if request.form.get('force_push') else False,
auto_publish=True if request.form.get('auto_publish') else False,
)
except MISPServerError:
flash(f'MISP returned an error, the event(s) might still have been created on {misp.client.root_url}', 'error')
else:
@ -819,7 +852,6 @@ def web_misp_push_view(tree_uuid: str) -> str | WerkzeugResponse | Response | No
for e in new_events:
flash(f'MISP event {e.id} created on {misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
return None
@app.route('/tree/<string:tree_uuid>/modules', methods=['GET'])
@ -1130,7 +1162,7 @@ def send_mail(tree_uuid: str) -> WerkzeugResponse:
# skip clearly incorrect emails
email = ''
comment: str = request.form['comment'] if request.form.get('comment') else ''
lookyloo.send_mail(tree_uuid, email, comment)
lookyloo.send_mail(tree_uuid, as_admin=flask_login.current_user.is_authenticated, email=email, comment=comment)
flash("Email notification sent", 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid))
@ -1219,8 +1251,8 @@ def tree(tree_uuid: str, node_uuid: str | None=None) -> Response | str | Werkzeu
enable_context_by_users=enable_context_by_users,
enable_categorization=enable_categorization,
enable_bookmark=enable_bookmark,
misp_push=lookyloo.misps.available and lookyloo.misps.default_misp.enable_push,
misp_lookup=lookyloo.misps.available and lookyloo.misps.default_misp.enable_lookup,
misp_push=lookyloo.misps.available and lookyloo.misps.has_push(flask_login.current_user.is_authenticated),
misp_lookup=lookyloo.misps.available and lookyloo.misps.has_lookup(flask_login.current_user.is_authenticated),
blur_screenshot=blur_screenshot, urlnode_uuid=hostnode_to_highlight,
auto_trigger_modules=auto_trigger_modules,
confirm_message=confirm_message if confirm_message else 'Tick to confirm.',

View File

@ -16,6 +16,7 @@
</center>
<div id="allInstances">
{% if misps_occurrences %}
{% for name, occurrences in misps_occurrences.items() %}
<div id="lookup_{{name.replace(' ', '_')}}" {%if name != current_misp%}style="display:none"{%endif%}>
{% set hits, root_url = occurrences %}
@ -36,4 +37,7 @@
{% endif %}
</div>
{% endfor %}
{%else%}
No hits in any of the instances available.
{%endif%}
</div>

View File

@ -12,6 +12,7 @@
{%endif%}
<div id="allInstances">
{%if misp_instances_settings %}
{%for name, misp_settings in misp_instances_settings.items() %}
<div id="push_{{name.replace(' ', '_')}}" {%if name != current_misp%}style="display:none"{%endif%}>
<form role="form" action="{{ url_for('web_misp_push_view', tree_uuid=tree_uuid) }}"
@ -60,4 +61,7 @@
</form>
</div>
{%endfor%}
{%else%}
None of the instances are available, please login.
{%endif%}
</div>

View File

@ -211,6 +211,10 @@
<a href="{{ url_for('trigger_indexing', tree_uuid=tree_uuid) }}" role="button" class="btn btn-warning"
title="The capture isn't (fully) indexed, index now.">Index capture</a>
{% endif %}
{% if misp_lookup%}
<a href="#mispLookupModal" data-remote="{{ url_for('web_misp_lookup_view', tree_uuid=tree_uuid) }}"
data-bs-toggle="modal" data-bs-target="#mispLookupModal" role="button">Search events on MISP</a>
{% endif %}
<a href="#modulesModal" data-remote="{{ url_for('trigger_modules', tree_uuid=tree_uuid, force=False) }}"
data-bs-toggle="modal" data-bs-target="#modulesModal" role="button"
title="Lookups from supported 3rd party services">Third Party Reports</a>
@ -261,6 +265,10 @@
<a href="#downloadModal" data-bs-toggle="modal" data-bs-target="#downloadModal" role="button"
title="Download specific elements of the capture">Download elements</a>
{% if misp_push%}
<a href="#mispPushModal" data-remote="{{ url_for('web_misp_push_view', tree_uuid=tree_uuid) }}"
data-bs-toggle="modal" data-bs-target="#mispPushModal" role="button">Prepare push to MISP</a>
{% endif %}
</div>
</div>
@ -271,14 +279,6 @@
<a href="{{ url_for('rebuild_tree', tree_uuid=tree_uuid) }}" role="button">Rebuild capture</a>
<a href="{{ url_for('hide_capture', tree_uuid=tree_uuid) }}" role="button">Hide capture</a>
<a href="{{ url_for('remove_capture', tree_uuid=tree_uuid) }}" role="button" id="removeCapture">Remove capture</a>
{% if misp_push%}
<a href="#mispPushModal" data-remote="{{ url_for('web_misp_push_view', tree_uuid=tree_uuid) }}"
data-bs-toggle="modal" data-bs-target="#mispPushModal" role="button">Prepare push to MISP</a>
{% endif %}
{% if misp_lookup%}
<a href="#mispLookupModal" data-remote="{{ url_for('web_misp_lookup_view', tree_uuid=tree_uuid) }}"
data-bs-toggle="modal" data-bs-target="#mispLookupModal" role="button">Search events on MISP</a>
{% endif %}
<a href="{{ url_for('logout') }}" role="button" style="color: red">Logout</a>
</div>
</div>