mirror of https://github.com/CIRCL/lookyloo
chg: Improve MISP export
* IPs of redirects * default tags * auto publishpull/165/head
parent
54ba67bb87
commit
a7484e6cc4
|
@ -16,7 +16,9 @@
|
||||||
"verify_tls_cert": true,
|
"verify_tls_cert": true,
|
||||||
"timeout": 10,
|
"timeout": 10,
|
||||||
"enable_lookup": false,
|
"enable_lookup": false,
|
||||||
"enable_push": false
|
"enable_push": false,
|
||||||
|
"default_tags": [],
|
||||||
|
"auto_publish": false
|
||||||
},
|
},
|
||||||
"_notes": {
|
"_notes": {
|
||||||
"apikey": "null disables the module. Pass a string otherwise.",
|
"apikey": "null disables the module. Pass a string otherwise.",
|
||||||
|
|
|
@ -899,6 +899,13 @@ class Lookyloo():
|
||||||
return 'embedded_ressource.bin', blob, mimetype
|
return 'embedded_ressource.bin', blob, mimetype
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def __misp_add_ips_to_URLObject(self, obj: URLObject, hostname_tree: HostNode) -> None:
|
||||||
|
hosts = obj.get_attributes_by_relation('host')
|
||||||
|
if hosts:
|
||||||
|
hostnodes = hostname_tree.search_nodes(name=hosts[0].value)
|
||||||
|
if hostnodes and hasattr(hostnodes[0], 'resolved_ips'):
|
||||||
|
obj.add_attributes('ip', *hostnodes[0].resolved_ips)
|
||||||
|
|
||||||
def misp_export(self, capture_uuid: str) -> Union[MISPEvent, Dict[str, str]]:
|
def misp_export(self, capture_uuid: str) -> Union[MISPEvent, Dict[str, str]]:
|
||||||
'''Export a capture in MISP format. You can POST the return of this method
|
'''Export a capture in MISP format. You can POST the return of this method
|
||||||
directly to a MISP instance and it will create an event.'''
|
directly to a MISP instance and it will create an event.'''
|
||||||
|
@ -921,7 +928,14 @@ class Lookyloo():
|
||||||
lookyloo_link.distribution = 0
|
lookyloo_link.distribution = 0
|
||||||
|
|
||||||
initial_url = URLObject(cache.url)
|
initial_url = URLObject(cache.url)
|
||||||
redirects = [URLObject(url) for url in cache.redirects if url != cache.url]
|
self.__misp_add_ips_to_URLObject(initial_url, ct.root_hartree.hostname_tree)
|
||||||
|
redirects: List[URLObject] = []
|
||||||
|
for url in cache.redirects:
|
||||||
|
if url == cache.url:
|
||||||
|
continue
|
||||||
|
obj = URLObject(url)
|
||||||
|
self.__misp_add_ips_to_URLObject(obj, ct.root_hartree.hostname_tree)
|
||||||
|
redirects.append(obj)
|
||||||
|
|
||||||
if redirects:
|
if redirects:
|
||||||
prec_object = initial_url
|
prec_object = initial_url
|
||||||
|
|
|
@ -44,11 +44,17 @@ class MISP():
|
||||||
self.enable_lookup = True
|
self.enable_lookup = True
|
||||||
if config.get('enable_push'):
|
if config.get('enable_push'):
|
||||||
self.enable_push = True
|
self.enable_push = True
|
||||||
|
self.default_tags: List[str] = config.get('default_tags') # type: ignore
|
||||||
|
self.auto_publish = config.get('auto_publish')
|
||||||
self.storage_dir_misp = get_homedir() / 'misp'
|
self.storage_dir_misp = get_homedir() / 'misp'
|
||||||
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
|
self.storage_dir_misp.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
def push(self, event: MISPEvent) -> Union[MISPEvent, Dict]:
|
def push(self, event: MISPEvent) -> Union[MISPEvent, Dict]:
|
||||||
if self.available and self.enable_push:
|
if self.available and self.enable_push:
|
||||||
|
for tag in self.default_tags:
|
||||||
|
event.add_tag(tag)
|
||||||
|
if self.auto_publish:
|
||||||
|
event.publish()
|
||||||
return self.client.add_event(event, pythonify=True)
|
return self.client.add_event(event, pythonify=True)
|
||||||
else:
|
else:
|
||||||
return {'error': 'Module not available or push not enabled.'}
|
return {'error': 'Module not available or push not enabled.'}
|
||||||
|
|
Loading…
Reference in New Issue