|
|
|
@ -35,6 +35,11 @@ class URLhaus(): |
|
|
|
|
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} |
|
|
|
|
return {'results': results} |
|
|
|
|
|
|
|
|
|
def parse_error(self, query_status): |
|
|
|
|
if query_status == 'no_results': |
|
|
|
|
return {'error': f'No results found on URLhaus for this {self.attribute.type} attribute'} |
|
|
|
|
return {'error': f'Error encountered during the query of URLhaus: {query_status}'} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HostQuery(URLhaus): |
|
|
|
|
def __init__(self, attribute): |
|
|
|
@ -45,9 +50,12 @@ class HostQuery(URLhaus): |
|
|
|
|
|
|
|
|
|
def query_api(self): |
|
|
|
|
response = requests.post(self.url, data={'host': self.attribute.value}).json() |
|
|
|
|
if response['query_status'] != 'ok': |
|
|
|
|
return self.parse_error(response['query_status']) |
|
|
|
|
if 'urls' in response and response['urls']: |
|
|
|
|
for url in response['urls']: |
|
|
|
|
self.misp_event.add_attribute(type='url', value=url['url']) |
|
|
|
|
return self.get_result() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PayloadQuery(URLhaus): |
|
|
|
@ -63,6 +71,8 @@ class PayloadQuery(URLhaus): |
|
|
|
|
if hasattr(self.attribute, 'object_id') and hasattr(self.attribute, 'event_id') and self.attribute.event_id != '0': |
|
|
|
|
file_object.id = self.attribute.object_id |
|
|
|
|
response = requests.post(self.url, data={'{}_hash'.format(hash_type): self.attribute.value}).json() |
|
|
|
|
if response['query_status'] != 'ok': |
|
|
|
|
return self.parse_error(response['query_status']) |
|
|
|
|
other_hash_type = 'md5' if hash_type == 'sha256' else 'sha256' |
|
|
|
|
for key, relation in zip(('{}_hash'.format(other_hash_type), 'file_size'), (other_hash_type, 'size-in-bytes')): |
|
|
|
|
if response[key]: |
|
|
|
@ -81,6 +91,7 @@ class PayloadQuery(URLhaus): |
|
|
|
|
file_object.add_attribute(_filename_, **{'type': _filename_, 'value': url[_filename_]}) |
|
|
|
|
if any((file_object.attributes, file_object.references)): |
|
|
|
|
self.misp_event.add_object(**file_object) |
|
|
|
|
return self.get_result() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UrlQuery(URLhaus): |
|
|
|
@ -100,6 +111,8 @@ class UrlQuery(URLhaus): |
|
|
|
|
|
|
|
|
|
def query_api(self): |
|
|
|
|
response = requests.post(self.url, data={'url': self.attribute.value}).json() |
|
|
|
|
if response['query_status'] != 'ok': |
|
|
|
|
return self.parse_error(response['query_status']) |
|
|
|
|
if 'payloads' in response and response['payloads']: |
|
|
|
|
for payload in response['payloads']: |
|
|
|
|
file_object = self._create_file_object(payload) |
|
|
|
@ -109,6 +122,7 @@ class UrlQuery(URLhaus): |
|
|
|
|
self.misp_event.add_object(**vt_object) |
|
|
|
|
if any((file_object.attributes, file_object.references)): |
|
|
|
|
self.misp_event.add_object(**file_object) |
|
|
|
|
return self.get_result() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_misp_type_mapping = {'url': UrlQuery, 'md5': PayloadQuery, 'sha256': PayloadQuery, |
|
|
|
@ -122,8 +136,7 @@ def handler(q=False): |
|
|
|
|
request = json.loads(q) |
|
|
|
|
attribute = request['attribute'] |
|
|
|
|
urlhaus_parser = _misp_type_mapping[attribute['type']](attribute) |
|
|
|
|
urlhaus_parser.query_api() |
|
|
|
|
return urlhaus_parser.get_result() |
|
|
|
|
return urlhaus_parser.query_api() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def introspection(): |
|
|
|
|