chg: Properly handle capture parents, avoid duplicates

pull/203/head
Raphaël Vinot 2021-05-14 14:25:37 -07:00
parent b735cbeae3
commit 5ee62d157f
4 changed files with 113 additions and 78 deletions

View File

@ -1045,7 +1045,7 @@ class Lookyloo():
obj.add_reference(vt_obj, 'analysed-with') obj.add_reference(vt_obj, 'analysed-with')
return vt_obj return vt_obj
def misp_export(self, capture_uuid: str, with_parent: bool=False) -> Union[List[MISPEvent], MISPEvent, Dict[str, str]]: def misp_export(self, capture_uuid: str, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]:
'''Export a capture in MISP format. You can POST the return of this method '''Export a capture in MISP format. You can POST the return of this method
directly to a MISP instance and it will create an event.''' directly to a MISP instance and it will create an event.'''
cache = self.capture_cache(capture_uuid) cache = self.capture_cache(capture_uuid)
@ -1092,6 +1092,20 @@ class Lookyloo():
for u_object in redirects: for u_object in redirects:
event.add_object(u_object) event.add_object(u_object)
final_redirect = event.objects[-1]
screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png', data=self.get_screenshot(capture_uuid), disable_correlation=True) # type: ignore
try:
fo = FileObject(pseudofile=ct.root_hartree.rendered_node.body, filename=ct.root_hartree.rendered_node.filename)
fo.comment = 'Content received for the final redirect (before rendering)'
fo.add_reference(final_redirect, 'loaded-by', 'URL loading that content')
fo.add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
event.add_object(fo)
except Har2TreeError:
pass
except AttributeError:
# No `body` in rendered node
pass
if self.vt.available: if self.vt.available:
for e_obj in event.objects: for e_obj in event.objects:
@ -1101,34 +1115,17 @@ class Lookyloo():
if vt_obj: if vt_obj:
event.add_object(vt_obj) event.add_object(vt_obj)
screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png', data=self.get_screenshot(capture_uuid), disable_correlation=True) # type: ignore
try:
fo = FileObject(pseudofile=ct.root_hartree.rendered_node.body, filename=ct.root_hartree.rendered_node.filename)
fo.comment = 'Content received for the final redirect (before rendering)'
fo.add_reference(event.objects[-1], 'loaded-by', 'URL loading that content')
fo.add_reference(screenshot, 'rendered-as', 'Screenshot of the page')
event.add_object(fo)
except Har2TreeError:
pass
except AttributeError:
# No `body` in rendered node
pass
if with_parent and cache.parent: if with_parent and cache.parent:
parent = self.misp_export(cache.parent, with_parent) parent = self.misp_export(cache.parent, with_parent)
if isinstance(parent, dict): if isinstance(parent, dict):
# Something bad happened # Something bad happened
return parent return parent
if isinstance(parent, list):
# The parent has a parent
event.extends_uuid = parent[-1].uuid event.extends_uuid = parent[-1].uuid
parent.append(event) parent.append(event)
return parent return parent
else:
event.extends_uuid = parent.uuid
return [parent, event]
return event return [event]
def get_hashes(self, tree_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]: def get_hashes(self, tree_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
"""Return hashes of resources. """Return hashes of resources.

View File

@ -53,23 +53,67 @@ class MISP():
def get_fav_tags(self): def get_fav_tags(self):
return self.client.tags(pythonify=True, favouritesOnly=1) return self.client.tags(pythonify=True, favouritesOnly=1)
def push(self, event: MISPEvent) -> Union[MISPEvent, Dict]: def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False) -> Union[List[MISPEvent], MISPEvent, Dict]:
if self.available and self.enable_push: '''Adds the pre-configured information as required by the instance.
If duplicates aren't allowed, they will be automatically skiped and the
extends_uuid key in the next element in the list updated'''
if isinstance(to_push, MISPEvent):
events = [to_push]
else:
events = to_push
events_to_push = []
existing_uuid_to_extend = None
for event in events:
if not allow_duplicates:
existing_event = self.get_existing_event(event.attributes[0].value)
if existing_event:
existing_uuid_to_extend = existing_event.uuid
continue
if existing_uuid_to_extend:
event.extends_uuid = existing_uuid_to_extend
existing_uuid_to_extend = None
for tag in self.default_tags: for tag in self.default_tags:
event.add_tag(tag) event.add_tag(tag)
if self.auto_publish: if self.auto_publish:
event.publish() event.publish()
return self.client.add_event(event, pythonify=True) events_to_push.append(event)
return events_to_push
def push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False) -> Union[List[MISPEvent], Dict]:
if self.available and self.enable_push:
events = self._prepare_push(to_push, allow_duplicates)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, Dict):
return {'error': events}
to_return = []
for event in events:
new_event = self.client.add_event(event, pythonify=True)
if isinstance(new_event, MISPEvent):
to_return.append(new_event)
else:
return {'error': new_event}
return to_return
else: else:
return {'error': 'Module not available or push not enabled.'} return {'error': 'Module not available or push not enabled.'}
def get_existing_event(self, permaurl: str) -> Optional[str]: def get_existing_event_url(self, permaurl: str) -> Optional[str]:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True) attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
if not attributes or not isinstance(attributes[0], MISPAttribute): if not attributes or not isinstance(attributes[0], MISPAttribute):
return None return None
url = f'{self.client.root_url}/events/{attributes[0].event_id}' url = f'{self.client.root_url}/events/{attributes[0].event_id}'
return url return url
def get_existing_event(self, permaurl: str) -> Optional[MISPEvent]:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
if not attributes or not isinstance(attributes[0], MISPAttribute):
return None
event = self.client.get_event(attributes[0].event_id, pythonify=True)
if isinstance(event, MISPEvent):
return event
return None
class UniversalWhois(): class UniversalWhois():

12
poetry.lock generated
View File

@ -307,7 +307,7 @@ hyperframe = ">=5.2.0,<6"
[[package]] [[package]]
name = "har2tree" name = "har2tree"
version = "1.5.5" version = "1.5.6"
description = "HTTP Archive (HAR) to ETE Toolkit generator" description = "HTTP Archive (HAR) to ETE Toolkit generator"
category = "main" category = "main"
optional = false optional = false
@ -762,7 +762,7 @@ docs = ["Sphinx (>=3.5.3,<4.0.0)", "myst-parser (>=0.13.5,<0.14.0)"]
[[package]] [[package]]
name = "pymisp" name = "pymisp"
version = "2.4.142" version = "2.4.143"
description = "Python API for MISP." description = "Python API for MISP."
category = "main" category = "main"
optional = false optional = false
@ -1365,8 +1365,8 @@ h2 = [
{file = "h2-3.2.0.tar.gz", hash = "sha256:875f41ebd6f2c44781259005b157faed1a5031df3ae5aa7bcb4628a6c0782f14"}, {file = "h2-3.2.0.tar.gz", hash = "sha256:875f41ebd6f2c44781259005b157faed1a5031df3ae5aa7bcb4628a6c0782f14"},
] ]
har2tree = [ har2tree = [
{file = "har2tree-1.5.5-py3-none-any.whl", hash = "sha256:df666eb01228e112f6fb88622a7bb7dcd31713a727a63ea357b1e0c1466aa31a"}, {file = "har2tree-1.5.6-py3-none-any.whl", hash = "sha256:94dcb58890d1e76a595e965cda8e3dd841b169a69f6d4fc84b7f1d869e65b11a"},
{file = "har2tree-1.5.5.tar.gz", hash = "sha256:3c49d6e487d7e3d0683de9318cae66bf2ad4389508514bebda1d131cc673c06c"}, {file = "har2tree-1.5.6.tar.gz", hash = "sha256:a1c578552669688c3899d85ca3ac96c0b8e68860c161e49b99ac797a8e054bb2"},
] ]
hpack = [ hpack = [
{file = "hpack-3.0.0-py2.py3-none-any.whl", hash = "sha256:0edd79eda27a53ba5be2dfabf3b15780928a0dff6eb0c60a3d6767720e970c89"}, {file = "hpack-3.0.0-py2.py3-none-any.whl", hash = "sha256:0edd79eda27a53ba5be2dfabf3b15780928a0dff6eb0c60a3d6767720e970c89"},
@ -1742,8 +1742,8 @@ pylookyloo = [
{file = "pylookyloo-1.5.0.tar.gz", hash = "sha256:2f616e1f05206d84f4695b517b5c851775af69ac9df2e4f6fa880b6a2c6b6ec8"}, {file = "pylookyloo-1.5.0.tar.gz", hash = "sha256:2f616e1f05206d84f4695b517b5c851775af69ac9df2e4f6fa880b6a2c6b6ec8"},
] ]
pymisp = [ pymisp = [
{file = "pymisp-2.4.142-py3-none-any.whl", hash = "sha256:677046cdb7b8f89b2cccd925377c136d6dfce63c8c44e6ca4ff6b342babe28a4"}, {file = "pymisp-2.4.143-py3-none-any.whl", hash = "sha256:ceb6029045cfd2ab803902dc82e761909ca5337c2f6df484aef4e067e2f08d82"},
{file = "pymisp-2.4.142.tar.gz", hash = "sha256:172c7bfa1bc907c37682bce7054c2147f132781a0019f082e71871f46186b23f"}, {file = "pymisp-2.4.143.tar.gz", hash = "sha256:a51cddc4f274ec8daf7ccd0dd1fd599ee755ed77d168e72dfad01a41823877e3"},
] ]
pyopenssl = [ pyopenssl = [
{file = "pyOpenSSL-20.0.1-py2.py3-none-any.whl", hash = "sha256:818ae18e06922c066f777a33f1fca45786d85edfe71cd043de6379337a7f274b"}, {file = "pyOpenSSL-20.0.1-py2.py3-none-any.whl", hash = "sha256:818ae18e06922c066f777a33f1fca45786d85edfe71cd043de6379337a7f274b"},

View File

@ -9,7 +9,7 @@ from datetime import datetime, timedelta, timezone
import json import json
import http import http
import calendar import calendar
from typing import Optional, Dict, Any, Union from typing import Optional, Dict, Any, Union, List
import logging import logging
import hashlib import hashlib
from urllib.parse import quote_plus, unquote_plus from urllib.parse import quote_plus, unquote_plus
@ -913,54 +913,50 @@ def web_misp_push_view(tree_uuid: str):
error = False error = False
if not lookyloo.misp.available: if not lookyloo.misp.available:
flash('MISP module not available.', 'error') flash('MISP module not available.', 'error')
error = True return redirect(url_for('tree', tree_uuid=tree_uuid))
elif not lookyloo.misp.enable_push: elif not lookyloo.misp.enable_push:
flash('Push not enabled in MISP module.', 'error') flash('Push not enabled in MISP module.', 'error')
error = True return redirect(url_for('tree', tree_uuid=tree_uuid))
else: else:
event = lookyloo.misp_export(tree_uuid) event = lookyloo.misp_export(tree_uuid)
if isinstance(event, dict): if isinstance(event, dict):
flash(f'Unable to generate the MISP export: {event}', 'error') flash(f'Unable to generate the MISP export: {event}', 'error')
error = True
if error:
return redirect(url_for('tree', tree_uuid=tree_uuid)) return redirect(url_for('tree', tree_uuid=tree_uuid))
# After this point, event is a MISPEvent
if request.method == 'POST': if request.method == 'POST':
# event is a MISPEvent at this point # event is a MISPEvent at this point
# Submit the event # Submit the event
tags = request.form.getlist('tags') tags = request.form.getlist('tags')
for tag in tags:
event.add_tag(tag) # type: ignore
error = False error = False
events: List[MISPEvent] = []
with_parents = request.form.get('with_parents') with_parents = request.form.get('with_parents')
if with_parents: if with_parents:
exports = lookyloo.misp_export(tree_uuid, True) exports = lookyloo.misp_export(tree_uuid, True)
if isinstance(exports, dict): if isinstance(exports, dict):
flash(f'Unable to create event: {exports}', 'error') flash(f'Unable to create event: {exports}', 'error')
error = True error = True
elif isinstance(exports, MISPEvent):
events = [exports]
else: else:
events = exports events = exports
else: else:
events = [event] # type: ignore events = event
if error: if error:
return redirect(url_for('tree', tree_uuid=tree_uuid)) return redirect(url_for('tree', tree_uuid=tree_uuid))
for event in events: for e in events:
event = lookyloo.misp.push(event) # type: ignore for tag in tags:
if isinstance(event, MISPEvent): e.add_tag(tag)
flash(f'MISP event {event.id} created on {lookyloo.misp.client.root_url}', 'success')
new_events = lookyloo.misp.push(events)
if isinstance(new_events, dict):
flash(f'Unable to create event(s): {new_events}', 'error')
else: else:
flash(f'Unable to create event: {event}', 'error') for e in new_events:
flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success')
return redirect(url_for('tree', tree_uuid=tree_uuid)) return redirect(url_for('tree', tree_uuid=tree_uuid))
else: else:
# the 1st attribute in the event is the link to lookyloo # the 1st attribute in the event is the link to lookyloo
existing_misp_url = lookyloo.misp.get_existing_event(event.attributes[0].value) # type: ignore existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value)
fav_tags = lookyloo.misp.get_fav_tags() fav_tags = lookyloo.misp.get_fav_tags()
cache = lookyloo.capture_cache(tree_uuid) cache = lookyloo.capture_cache(tree_uuid)
@ -1018,42 +1014,40 @@ def misp_export(tree_uuid: str):
event = lookyloo.misp_export(tree_uuid, True if with_parents else False) event = lookyloo.misp_export(tree_uuid, True if with_parents else False)
if isinstance(event, dict): if isinstance(event, dict):
return jsonify(event) return jsonify(event)
elif isinstance(event, list):
to_return = [] to_return = []
for e in event: for e in event:
to_return.append(e.to_json(indent=2)) to_return.append(e.to_json(indent=2))
return jsonify(to_return) return jsonify(to_return)
else:
return Response(event.to_json(indent=2), mimetype='application/json')
@app.route('/json/<string:tree_uuid>/misp_push', methods=['GET']) @app.route('/json/<string:tree_uuid>/misp_push', methods=['GET', 'POST'])
@flask_login.login_required @flask_login.login_required
def misp_push(tree_uuid: str): def misp_push(tree_uuid: str):
if request.method == 'POST':
parameters: Dict = request.get_json(force=True) # type: ignore
with_parents = True if 'with_parents' in parameters else False
allow_duplicates = True if 'allow_duplicates' in parameters else False
else:
with_parents = False
allow_duplicates = False
to_return: Dict = {} to_return: Dict = {}
if not lookyloo.misp.available: if not lookyloo.misp.available:
to_return['error'] = 'MISP module not available.' to_return['error'] = 'MISP module not available.'
elif not lookyloo.misp.enable_push: elif not lookyloo.misp.enable_push:
to_return['error'] = 'Push not enabled in MISP module.' to_return['error'] = 'Push not enabled in MISP module.'
else: else:
event = lookyloo.misp_export(tree_uuid) event = lookyloo.misp_export(tree_uuid, with_parents)
if isinstance(event, dict): if isinstance(event, dict):
to_return['error'] = event to_return['error'] = event
elif isinstance(event, MISPEvent):
event = lookyloo.misp.push(event)
if isinstance(event, MISPEvent):
return Response(event.to_json(indent=2), mimetype='application/json')
else: else:
to_return['error'] = event new_events = lookyloo.misp.push(event, allow_duplicates)
if isinstance(new_events, dict):
to_return['error'] = new_events
else: else:
events_to_return = [] events_to_return = []
for e in event: for e in new_events:
me = lookyloo.misp.push(e) events_to_return.append(e.to_json(indent=2))
if isinstance(me, MISPEvent):
events_to_return.append(me.to_json(indent=2))
else:
to_return['error'] = me
break
jsonify(events_to_return) jsonify(events_to_return)
return jsonify(to_return) return jsonify(to_return)