mirror of https://github.com/CIRCL/lookyloo
new: Keep captures parent, use extends in MISP export
parent
a47615fb0a
commit
1d5925d755
lookyloo
website/web
templates
|
@ -33,3 +33,4 @@ class CaptureCache():
|
|||
self.incomplete_redirects: bool = True if cache_entry.get('incomplete_redirects') in [1, '1'] else False
|
||||
self.no_index: bool = True if cache_entry.get('no_index') in [1, '1'] else False
|
||||
self.categories: List[str] = json.loads(cache_entry['categories']) if cache_entry.get('categories') else []
|
||||
self.parent: Optional[str] = cache_entry.get('parent')
|
||||
|
|
|
@ -462,6 +462,10 @@ class Lookyloo():
|
|||
if (capture_dir / 'no_index').exists(): # If the folders claims anonymity
|
||||
cache['no_index'] = 1
|
||||
|
||||
if (capture_dir / 'parent').exists(): # The capture was initiated from an other one
|
||||
with (capture_dir / 'parent').open() as f:
|
||||
cache['parent'] = f.read().strip()
|
||||
|
||||
p.hmset(str(capture_dir), cache) # type: ignore
|
||||
if not redis_pipeline:
|
||||
p.execute()
|
||||
|
@ -575,7 +579,7 @@ class Lookyloo():
|
|||
return CaptureStatus.UNKNOWN
|
||||
|
||||
def enqueue_capture(self, query: MutableMapping[str, Any]) -> str:
|
||||
'''Enqueue a query in the capture queue (used by the API for asynchronous processing)'''
|
||||
'''Enqueue a query in the capture queue (used by the UI and the API for asynchronous processing)'''
|
||||
perma_uuid = str(uuid4())
|
||||
p = self.redis.pipeline()
|
||||
for key, value in query.items():
|
||||
|
@ -609,7 +613,7 @@ class Lookyloo():
|
|||
if 'cookies' in to_capture:
|
||||
to_capture['cookies_pseudofile'] = to_capture.pop('cookies')
|
||||
|
||||
status = self.capture(**to_capture) # type: ignore
|
||||
status = self._capture(**to_capture) # type: ignore
|
||||
self.redis.srem('ongoing', uuid)
|
||||
self.redis.delete(uuid)
|
||||
if status:
|
||||
|
@ -764,10 +768,10 @@ class Lookyloo():
|
|||
except Exception as err:
|
||||
return False, f'Other error occurred: {err}'
|
||||
|
||||
def capture(self, url: str, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
|
||||
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
|
||||
referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
|
||||
browser: Optional[str]=None) -> Union[bool, str]:
|
||||
def _capture(self, url: str, *, cookies_pseudofile: Optional[Union[BufferedIOBase, str]]=None,
|
||||
depth: int=1, listing: bool=True, user_agent: Optional[str]=None,
|
||||
referer: str='', perma_uuid: Optional[str]=None, os: Optional[str]=None,
|
||||
browser: Optional[str]=None, parent: Optional[str]=None) -> Union[bool, str]:
|
||||
'''Launch a capture'''
|
||||
url = url.strip()
|
||||
url = refang(url)
|
||||
|
@ -824,12 +828,20 @@ class Lookyloo():
|
|||
with (dirpath / 'meta').open('w') as _meta:
|
||||
json.dump(meta, _meta)
|
||||
|
||||
for i, item in enumerate(items):
|
||||
if not listing: # Write no_index marker
|
||||
(dirpath / 'no_index').touch()
|
||||
with (dirpath / 'uuid').open('w') as _uuid:
|
||||
_uuid.write(perma_uuid)
|
||||
# Write UUID
|
||||
with (dirpath / 'uuid').open('w') as _uuid:
|
||||
_uuid.write(perma_uuid)
|
||||
|
||||
# Write no_index marker (optional)
|
||||
if not listing:
|
||||
(dirpath / 'no_index').touch()
|
||||
|
||||
# Write parent UUID (optional)
|
||||
if parent:
|
||||
with (dirpath / 'parent').open('w') as _parent:
|
||||
_parent.write(parent)
|
||||
|
||||
for i, item in enumerate(items):
|
||||
if 'error' in item:
|
||||
with (dirpath / 'error.txt').open('w') as _error:
|
||||
json.dump(item['error'], _error)
|
||||
|
@ -1033,7 +1045,7 @@ class Lookyloo():
|
|||
obj.add_reference(vt_obj, 'analysed-with')
|
||||
return vt_obj
|
||||
|
||||
def misp_export(self, capture_uuid: str) -> Union[MISPEvent, Dict[str, str]]:
|
||||
def misp_export(self, capture_uuid: str, with_parent: bool=False) -> Union[List[MISPEvent], MISPEvent, Dict[str, str]]:
|
||||
'''Export a capture in MISP format. You can POST the return of this method
|
||||
directly to a MISP instance and it will create an event.'''
|
||||
cache = self.capture_cache(capture_uuid)
|
||||
|
@ -1101,6 +1113,21 @@ class Lookyloo():
|
|||
except AttributeError:
|
||||
# No `body` in rendered node
|
||||
pass
|
||||
|
||||
if with_parent and cache.parent:
|
||||
parent = self.misp_export(cache.parent, with_parent)
|
||||
if isinstance(parent, dict):
|
||||
# Something bad happened
|
||||
return parent
|
||||
if isinstance(parent, list):
|
||||
# The parent has a parent
|
||||
event.extends_uuid = parent[-1].uuid
|
||||
parent.append(event)
|
||||
return parent
|
||||
else:
|
||||
event.extends_uuid = parent.uuid
|
||||
return [parent, event]
|
||||
|
||||
return event
|
||||
|
||||
def get_hashes(self, tree_uuid: str, hostnode_uuid: Optional[str]=None, urlnode_uuid: Optional[str]=None) -> Set[str]:
|
||||
|
|
|
@ -442,7 +442,8 @@ def bulk_captures(base_tree_uuid: str):
|
|||
capture = {'url': url,
|
||||
'cookies': cookies,
|
||||
'referer': ct.root_url,
|
||||
'user_agent': ct.user_agent
|
||||
'user_agent': ct.user_agent,
|
||||
'parent': base_tree_uuid
|
||||
}
|
||||
new_capture_uuid = lookyloo.enqueue_capture(capture)
|
||||
bulk_captures.append((new_capture_uuid, url))
|
||||
|
@ -932,22 +933,43 @@ def web_misp_push_view(tree_uuid: str):
|
|||
tags = request.form.getlist('tags')
|
||||
for tag in tags:
|
||||
event.add_tag(tag) # type: ignore
|
||||
event = lookyloo.misp.push(event) # type: ignore
|
||||
if isinstance(event, MISPEvent):
|
||||
flash(f'MISP event {event.id} created on {lookyloo.misp.client.root_url}', 'success')
|
||||
|
||||
error = False
|
||||
with_parents = request.form.get('with_parents')
|
||||
if with_parents:
|
||||
exports = lookyloo.misp_export(tree_uuid, True)
|
||||
if isinstance(exports, dict):
|
||||
flash(f'Unable to create event: {exports}', 'error')
|
||||
error = True
|
||||
elif isinstance(exports, MISPEvent):
|
||||
events = [exports]
|
||||
else:
|
||||
events = exports
|
||||
else:
|
||||
flash(f'Unable to create event: {event}', 'error')
|
||||
events = [event] # type: ignore
|
||||
|
||||
if error:
|
||||
return redirect(url_for('tree', tree_uuid=tree_uuid))
|
||||
|
||||
for event in events:
|
||||
event = lookyloo.misp.push(event) # type: ignore
|
||||
if isinstance(event, MISPEvent):
|
||||
flash(f'MISP event {event.id} created on {lookyloo.misp.client.root_url}', 'success')
|
||||
else:
|
||||
flash(f'Unable to create event: {event}', 'error')
|
||||
return redirect(url_for('tree', tree_uuid=tree_uuid))
|
||||
else:
|
||||
# the 1st attribute in the event is the link to lookyloo
|
||||
existing_misp_url = lookyloo.misp.get_existing_event(event.attributes[0].value) # type: ignore
|
||||
|
||||
fav_tags = lookyloo.misp.get_fav_tags()
|
||||
cache = lookyloo.capture_cache(tree_uuid)
|
||||
|
||||
return render_template('misp_push_view.html', tree_uuid=tree_uuid,
|
||||
event=event, fav_tags=fav_tags,
|
||||
existing_event=existing_misp_url,
|
||||
auto_publish=lookyloo.misp.auto_publish,
|
||||
has_parent=True if cache and cache.parent else False,
|
||||
default_tags=lookyloo.misp.default_tags)
|
||||
|
||||
|
||||
|
@ -992,10 +1014,17 @@ def json_redirects(tree_uuid: str):
|
|||
|
||||
@app.route('/json/<string:tree_uuid>/misp_export', methods=['GET'])
|
||||
def misp_export(tree_uuid: str):
|
||||
event = lookyloo.misp_export(tree_uuid)
|
||||
with_parents = request.args.get('with_parents')
|
||||
event = lookyloo.misp_export(tree_uuid, True if with_parents else False)
|
||||
if isinstance(event, dict):
|
||||
return jsonify(event)
|
||||
return Response(event.to_json(indent=2), mimetype='application/json')
|
||||
elif isinstance(event, list):
|
||||
to_return = []
|
||||
for e in event:
|
||||
to_return.append(e.to_json(indent=2))
|
||||
return jsonify(to_return)
|
||||
else:
|
||||
return Response(event.to_json(indent=2), mimetype='application/json')
|
||||
|
||||
|
||||
@app.route('/json/<string:tree_uuid>/misp_push', methods=['GET'])
|
||||
|
@ -1010,12 +1039,22 @@ def misp_push(tree_uuid: str):
|
|||
event = lookyloo.misp_export(tree_uuid)
|
||||
if isinstance(event, dict):
|
||||
to_return['error'] = event
|
||||
else:
|
||||
elif isinstance(event, MISPEvent):
|
||||
event = lookyloo.misp.push(event)
|
||||
if isinstance(event, MISPEvent):
|
||||
return Response(event.to_json(indent=2), mimetype='application/json')
|
||||
else:
|
||||
to_return['error'] = event
|
||||
else:
|
||||
events_to_return = []
|
||||
for e in event:
|
||||
me = lookyloo.misp.push(e)
|
||||
if isinstance(me, MISPEvent):
|
||||
events_to_return.append(me.to_json(indent=2))
|
||||
else:
|
||||
to_return['error'] = me
|
||||
break
|
||||
jsonify(events_to_return)
|
||||
|
||||
return jsonify(to_return)
|
||||
|
||||
|
|
|
@ -20,6 +20,12 @@
|
|||
<label for="force_push" class="form-check-label">Tick this box if you want to push anyway</label>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% if has_parent %}
|
||||
<div class="form-check">
|
||||
<input class="form-check-input" type="checkbox" name="with_parents"></input>
|
||||
<label for="with_parents" class="form-check-label">Also push the parents</label>
|
||||
</div>
|
||||
{% endif %}
|
||||
<button type="submit" class="btn btn-info" id="btn-misp-push" {% if existing_event %}disabled=true{% endif %}>Push to MISP</button>
|
||||
</form>
|
||||
</div>
|
||||
|
|
Loading…
Reference in New Issue