diff --git a/lookyloo/lookyloo.py b/lookyloo/lookyloo.py index d5338dae..c8a56014 100644 --- a/lookyloo/lookyloo.py +++ b/lookyloo/lookyloo.py @@ -1045,7 +1045,7 @@ class Lookyloo(): obj.add_reference(vt_obj, 'analysed-with') return vt_obj - def misp_export(self, capture_uuid: str, with_parent: bool=False) -> Union[List[MISPEvent], MISPEvent, Dict[str, str]]: + def misp_export(self, capture_uuid: str, with_parent: bool=False) -> Union[List[MISPEvent], Dict[str, str]]: '''Export a capture in MISP format. You can POST the return of this method directly to a MISP instance and it will create an event.''' cache = self.capture_cache(capture_uuid) @@ -1092,6 +1092,20 @@ class Lookyloo(): for u_object in redirects: event.add_object(u_object) + final_redirect = event.objects[-1] + + screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png', data=self.get_screenshot(capture_uuid), disable_correlation=True) # type: ignore + try: + fo = FileObject(pseudofile=ct.root_hartree.rendered_node.body, filename=ct.root_hartree.rendered_node.filename) + fo.comment = 'Content received for the final redirect (before rendering)' + fo.add_reference(final_redirect, 'loaded-by', 'URL loading that content') + fo.add_reference(screenshot, 'rendered-as', 'Screenshot of the page') + event.add_object(fo) + except Har2TreeError: + pass + except AttributeError: + # No `body` in rendered node + pass if self.vt.available: for e_obj in event.objects: @@ -1101,34 +1115,17 @@ class Lookyloo(): if vt_obj: event.add_object(vt_obj) - screenshot: MISPAttribute = event.add_attribute('attachment', 'screenshot_landing_page.png', data=self.get_screenshot(capture_uuid), disable_correlation=True) # type: ignore - try: - fo = FileObject(pseudofile=ct.root_hartree.rendered_node.body, filename=ct.root_hartree.rendered_node.filename) - fo.comment = 'Content received for the final redirect (before rendering)' - fo.add_reference(event.objects[-1], 'loaded-by', 'URL loading that content') - fo.add_reference(screenshot, 'rendered-as', 'Screenshot of the page') - event.add_object(fo) - except Har2TreeError: - pass - except AttributeError: - # No `body` in rendered node - pass - if with_parent and cache.parent: parent = self.misp_export(cache.parent, with_parent) if isinstance(parent, dict): # Something bad happened return parent - if isinstance(parent, list): - # The parent has a parent - event.extends_uuid = parent[-1].uuid - parent.append(event) - return parent - else: - event.extends_uuid = parent.uuid - return [parent, event] - return event + event.extends_uuid = parent[-1].uuid + parent.append(event) + return parent + + return [event] def get_hashes(self, tree_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]: """Return hashes of resources. diff --git a/lookyloo/modules.py b/lookyloo/modules.py index 298983ef..51c0a1ed 100644 --- a/lookyloo/modules.py +++ b/lookyloo/modules.py @@ -53,23 +53,67 @@ class MISP(): def get_fav_tags(self): return self.client.tags(pythonify=True, favouritesOnly=1) - def push(self, event: MISPEvent) -> Union[MISPEvent, Dict]: - if self.available and self.enable_push: + def _prepare_push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False) -> Union[List[MISPEvent], MISPEvent, Dict]: + '''Adds the pre-configured information as required by the instance. + If duplicates aren't allowed, they will be automatically skiped and the + extends_uuid key in the next element in the list updated''' + if isinstance(to_push, MISPEvent): + events = [to_push] + else: + events = to_push + events_to_push = [] + existing_uuid_to_extend = None + for event in events: + if not allow_duplicates: + existing_event = self.get_existing_event(event.attributes[0].value) + if existing_event: + existing_uuid_to_extend = existing_event.uuid + continue + if existing_uuid_to_extend: + event.extends_uuid = existing_uuid_to_extend + existing_uuid_to_extend = None + for tag in self.default_tags: event.add_tag(tag) if self.auto_publish: event.publish() - return self.client.add_event(event, pythonify=True) + events_to_push.append(event) + return events_to_push + + def push(self, to_push: Union[List[MISPEvent], MISPEvent], allow_duplicates: bool=False) -> Union[List[MISPEvent], Dict]: + if self.available and self.enable_push: + events = self._prepare_push(to_push, allow_duplicates) + if not events: + return {'error': 'All the events are already on the MISP instance.'} + if isinstance(events, Dict): + return {'error': events} + to_return = [] + for event in events: + new_event = self.client.add_event(event, pythonify=True) + if isinstance(new_event, MISPEvent): + to_return.append(new_event) + else: + return {'error': new_event} + return to_return else: return {'error': 'Module not available or push not enabled.'} - def get_existing_event(self, permaurl: str) -> Optional[str]: + def get_existing_event_url(self, permaurl: str) -> Optional[str]: attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True) if not attributes or not isinstance(attributes[0], MISPAttribute): return None url = f'{self.client.root_url}/events/{attributes[0].event_id}' return url + def get_existing_event(self, permaurl: str) -> Optional[MISPEvent]: + attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True) + if not attributes or not isinstance(attributes[0], MISPAttribute): + return None + event = self.client.get_event(attributes[0].event_id, pythonify=True) + if isinstance(event, MISPEvent): + return event + return None + class UniversalWhois(): diff --git a/poetry.lock b/poetry.lock index 938debfa..f5028817 100644 --- a/poetry.lock +++ b/poetry.lock @@ -307,7 +307,7 @@ hyperframe = ">=5.2.0,<6" [[package]] name = "har2tree" -version = "1.5.5" +version = "1.5.6" description = "HTTP Archive (HAR) to ETE Toolkit generator" category = "main" optional = false @@ -762,7 +762,7 @@ docs = ["Sphinx (>=3.5.3,<4.0.0)", "myst-parser (>=0.13.5,<0.14.0)"] [[package]] name = "pymisp" -version = "2.4.142" +version = "2.4.143" description = "Python API for MISP." category = "main" optional = false @@ -1365,8 +1365,8 @@ h2 = [ {file = "h2-3.2.0.tar.gz", hash = "sha256:875f41ebd6f2c44781259005b157faed1a5031df3ae5aa7bcb4628a6c0782f14"}, ] har2tree = [ - {file = "har2tree-1.5.5-py3-none-any.whl", hash = "sha256:df666eb01228e112f6fb88622a7bb7dcd31713a727a63ea357b1e0c1466aa31a"}, - {file = "har2tree-1.5.5.tar.gz", hash = "sha256:3c49d6e487d7e3d0683de9318cae66bf2ad4389508514bebda1d131cc673c06c"}, + {file = "har2tree-1.5.6-py3-none-any.whl", hash = "sha256:94dcb58890d1e76a595e965cda8e3dd841b169a69f6d4fc84b7f1d869e65b11a"}, + {file = "har2tree-1.5.6.tar.gz", hash = "sha256:a1c578552669688c3899d85ca3ac96c0b8e68860c161e49b99ac797a8e054bb2"}, ] hpack = [ {file = "hpack-3.0.0-py2.py3-none-any.whl", hash = "sha256:0edd79eda27a53ba5be2dfabf3b15780928a0dff6eb0c60a3d6767720e970c89"}, @@ -1742,8 +1742,8 @@ pylookyloo = [ {file = "pylookyloo-1.5.0.tar.gz", hash = "sha256:2f616e1f05206d84f4695b517b5c851775af69ac9df2e4f6fa880b6a2c6b6ec8"}, ] pymisp = [ - {file = "pymisp-2.4.142-py3-none-any.whl", hash = "sha256:677046cdb7b8f89b2cccd925377c136d6dfce63c8c44e6ca4ff6b342babe28a4"}, - {file = "pymisp-2.4.142.tar.gz", hash = "sha256:172c7bfa1bc907c37682bce7054c2147f132781a0019f082e71871f46186b23f"}, + {file = "pymisp-2.4.143-py3-none-any.whl", hash = "sha256:ceb6029045cfd2ab803902dc82e761909ca5337c2f6df484aef4e067e2f08d82"}, + {file = "pymisp-2.4.143.tar.gz", hash = "sha256:a51cddc4f274ec8daf7ccd0dd1fd599ee755ed77d168e72dfad01a41823877e3"}, ] pyopenssl = [ {file = "pyOpenSSL-20.0.1-py2.py3-none-any.whl", hash = "sha256:818ae18e06922c066f777a33f1fca45786d85edfe71cd043de6379337a7f274b"}, diff --git a/website/web/__init__.py b/website/web/__init__.py index 796b37da..126c9fed 100644 --- a/website/web/__init__.py +++ b/website/web/__init__.py @@ -9,7 +9,7 @@ from datetime import datetime, timedelta, timezone import json import http import calendar -from typing import Optional, Dict, Any, Union +from typing import Optional, Dict, Any, Union, List import logging import hashlib from urllib.parse import quote_plus, unquote_plus @@ -913,54 +913,50 @@ def web_misp_push_view(tree_uuid: str): error = False if not lookyloo.misp.available: flash('MISP module not available.', 'error') - error = True + return redirect(url_for('tree', tree_uuid=tree_uuid)) elif not lookyloo.misp.enable_push: flash('Push not enabled in MISP module.', 'error') - error = True + return redirect(url_for('tree', tree_uuid=tree_uuid)) else: event = lookyloo.misp_export(tree_uuid) if isinstance(event, dict): flash(f'Unable to generate the MISP export: {event}', 'error') - error = True - if error: - return redirect(url_for('tree', tree_uuid=tree_uuid)) - - # After this point, event is a MISPEvent + return redirect(url_for('tree', tree_uuid=tree_uuid)) if request.method == 'POST': # event is a MISPEvent at this point # Submit the event tags = request.form.getlist('tags') - for tag in tags: - event.add_tag(tag) # type: ignore - error = False + events: List[MISPEvent] = [] with_parents = request.form.get('with_parents') if with_parents: exports = lookyloo.misp_export(tree_uuid, True) if isinstance(exports, dict): flash(f'Unable to create event: {exports}', 'error') error = True - elif isinstance(exports, MISPEvent): - events = [exports] else: events = exports else: - events = [event] # type: ignore + events = event if error: return redirect(url_for('tree', tree_uuid=tree_uuid)) - for event in events: - event = lookyloo.misp.push(event) # type: ignore - if isinstance(event, MISPEvent): - flash(f'MISP event {event.id} created on {lookyloo.misp.client.root_url}', 'success') - else: - flash(f'Unable to create event: {event}', 'error') + for e in events: + for tag in tags: + e.add_tag(tag) + + new_events = lookyloo.misp.push(events) + if isinstance(new_events, dict): + flash(f'Unable to create event(s): {new_events}', 'error') + else: + for e in new_events: + flash(f'MISP event {e.id} created on {lookyloo.misp.client.root_url}', 'success') return redirect(url_for('tree', tree_uuid=tree_uuid)) else: # the 1st attribute in the event is the link to lookyloo - existing_misp_url = lookyloo.misp.get_existing_event(event.attributes[0].value) # type: ignore + existing_misp_url = lookyloo.misp.get_existing_event_url(event[-1].attributes[0].value) fav_tags = lookyloo.misp.get_fav_tags() cache = lookyloo.capture_cache(tree_uuid) @@ -1018,43 +1014,41 @@ def misp_export(tree_uuid: str): event = lookyloo.misp_export(tree_uuid, True if with_parents else False) if isinstance(event, dict): return jsonify(event) - elif isinstance(event, list): - to_return = [] - for e in event: - to_return.append(e.to_json(indent=2)) - return jsonify(to_return) - else: - return Response(event.to_json(indent=2), mimetype='application/json') + + to_return = [] + for e in event: + to_return.append(e.to_json(indent=2)) + return jsonify(to_return) -@app.route('/json//misp_push', methods=['GET']) +@app.route('/json//misp_push', methods=['GET', 'POST']) @flask_login.login_required def misp_push(tree_uuid: str): + if request.method == 'POST': + parameters: Dict = request.get_json(force=True) # type: ignore + with_parents = True if 'with_parents' in parameters else False + allow_duplicates = True if 'allow_duplicates' in parameters else False + else: + with_parents = False + allow_duplicates = False to_return: Dict = {} if not lookyloo.misp.available: to_return['error'] = 'MISP module not available.' elif not lookyloo.misp.enable_push: to_return['error'] = 'Push not enabled in MISP module.' else: - event = lookyloo.misp_export(tree_uuid) + event = lookyloo.misp_export(tree_uuid, with_parents) if isinstance(event, dict): to_return['error'] = event - elif isinstance(event, MISPEvent): - event = lookyloo.misp.push(event) - if isinstance(event, MISPEvent): - return Response(event.to_json(indent=2), mimetype='application/json') - else: - to_return['error'] = event else: - events_to_return = [] - for e in event: - me = lookyloo.misp.push(e) - if isinstance(me, MISPEvent): - events_to_return.append(me.to_json(indent=2)) - else: - to_return['error'] = me - break - jsonify(events_to_return) + new_events = lookyloo.misp.push(event, allow_duplicates) + if isinstance(new_events, dict): + to_return['error'] = new_events + else: + events_to_return = [] + for e in new_events: + events_to_return.append(e.to_json(indent=2)) + jsonify(events_to_return) return jsonify(to_return)