diff --git a/misp_modules/modules/expansion/urlscan.py b/misp_modules/modules/expansion/urlscan.py index 8f4067c..a0adc25 100644 --- a/misp_modules/modules/expansion/urlscan.py +++ b/misp_modules/modules/expansion/urlscan.py @@ -3,8 +3,6 @@ import requests import logging import sys import time -# Need base64 if encoding data for attachments, but disabled for now -# import base64 log = logging.getLogger('urlscan') log.setLevel(logging.DEBUG) @@ -15,18 +13,19 @@ ch.setFormatter(formatter) log.addHandler(ch) moduleinfo = { - 'version': '0.1', - 'author': 'Dave Johnson', - 'description': 'Module to query urlscan.io', - 'module-type': ['expansion'] - } + 'version': '0.1', + 'author': 'Dave Johnson', + 'description': 'Module to query urlscan.io', + 'module-type': ['expansion'] +} moduleconfig = ['apikey'] misperrors = {'error': 'Error'} mispattributes = { - 'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url'], - 'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link'] - } + 'input': ['hostname', 'domain', 'url'], + 'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link'] +} + def handler(q=False): if q is False: @@ -51,8 +50,15 @@ def handler(q=False): if 'url' in request: r['results'] += lookup_indicator(client, request['url']) + # Return any errors generated from lookup to the UI and remove duplicates + uniq = [] + log.debug(r['results']) for item in r['results']: + log.debug(item) + if 'error' in item: + misperrors['error'] = item['error'] + return misperrors if item not in uniq: uniq.append(item) r['results'] = uniq @@ -63,10 +69,19 @@ def lookup_indicator(client, query): result = client.search_url(query) log.debug('RESULTS: ' + json.dumps(result)) r = [] + misp_comment = "{}: Enriched via the urlscan module".format(query) + + # Determine if the page is reachable + for request in result['data']['requests']: + if request['response'].get('failed'): + if request['response']['failed']['errorText']: + log.debug('The page could not load') + r.append( + {'error': 'Domain could not be resolved: {}'.format(request['response']['failed']['errorText'])}) + if result.get('page'): if result['page'].get('domain'): misp_val = result['page']['domain'] - misp_comment = "Domain associated with {} (source: urlscan.io)".format(query) r.append({'types': 'domain', 'categories': ['Network activity'], 'values': misp_val, @@ -74,17 +89,15 @@ def lookup_indicator(client, query): if result['page'].get('ip'): misp_val = result['page']['ip'] - misp_comment = "IP associated with {} (source: urlscan.io)".format(query) r.append({'types': 'ip-dst', 'categories': ['Network activity'], 'values': misp_val, 'comment': misp_comment}) if result['page'].get('country'): - misp_val = 'Country: ' + result['page']['country'] + misp_val = 'country: ' + result['page']['country'] if result['page'].get('city'): - misp_val += ', City: ' + result['page']['city'] - misp_comment = "Location associated with {} (source: urlscan.io)".format(query) + misp_val += ', city: ' + result['page']['city'] r.append({'types': 'text', 'categories': ['External analysis'], 'values': misp_val, @@ -92,12 +105,10 @@ def lookup_indicator(client, query): if result['page'].get('asn'): misp_val = result['page']['asn'] - misp_comment = "ASN associated with {} (source: urlscan.io)".format(query) - r.append({'types': 'AS', 'categories': ['Network activity'], 'values': misp_val, 'comment': misp_comment}) + r.append({'types': 'AS', 'categories': ['External analysis'], 'values': misp_val, 'comment': misp_comment}) if result['page'].get('asnname'): misp_val = result['page']['asnname'] - misp_comment = "ASN name associated with {} (source: urlscan.io)".format(query) r.append({'types': 'text', 'categories': ['External analysis'], 'values': misp_val, @@ -118,8 +129,6 @@ def lookup_indicator(client, query): if threat_list: misp_val = '{} threat(s) detected'.format(threat_list) - misp_comment = '{} malicious indicator(s) were present on ' \ - '{} (source: urlscan.io)'.format(result['stats']['malicious'], query, threat_list) r.append({'types': 'text', 'categories': ['External analysis'], 'values': misp_val, @@ -130,23 +139,17 @@ def lookup_indicator(client, query): for url in result['lists']['urls']: url = url.lower() if 'office' in url: - misp_val = 'Possible Microsoft Office themed phishing page' - misp_comment = 'There was resource containing an \'Office\' string in the URL.' + misp_val = "Possible Office-themed phishing" elif 'o365' in url or '0365' in url: - misp_val = 'Possible Microsoft O365 themed phishing page' - misp_comment = 'There was resource containing an \'O365\' string in the URL.' + misp_val = "Possible O365-themed phishing" elif 'microsoft' in url: - misp_val = 'Possible Microsoft themed phishing page' - misp_comment = 'There was resource containing an \'Office\' string in the URL.' + misp_val = "Possible Microsoft-themed phishing" elif 'paypal' in url: - misp_val = 'Possible PayPal themed phishing page' - misp_comment = 'There was resource containing a \'PayPal\' string in the URL.' + misp_val = "Possible PayPal-themed phishing" elif 'onedrive' in url: - misp_val = 'Possible OneDrive themed phishing page' - misp_comment = 'There was resource containing a \'OneDrive\' string in the URL.' + misp_val = "Possible OneDrive-themed phishing" elif 'docusign' in url: - misp_val = 'Possible DocuSign themed phishing page' - misp_comment = 'There was resource containing a \'DocuSign\' string in the URL' + misp_val = "Possible DocuSign-themed phishing" r.append({'types': 'text', 'categories': ['External analysis'], 'values': misp_val, @@ -155,7 +158,6 @@ def lookup_indicator(client, query): if result.get('task'): if result['task'].get('reportURL'): misp_val = result['task']['reportURL'] - misp_comment = 'Link to full report (source: urlscan.io)' r.append({'types': 'link', 'categories': ['External analysis'], 'values': misp_val, @@ -163,7 +165,6 @@ def lookup_indicator(client, query): if result['task'].get('screenshotURL'): image_url = result['task']['screenshotURL'] - misp_comment = 'Link to screenshot (source: urlscan.io)' r.append({'types': 'link', 'categories': ['External analysis'], 'values': image_url, @@ -177,14 +178,6 @@ def lookup_indicator(client, query): # 'image': str(base64.b64encode(screenshot), 'utf-8'), # 'comment': 'Screenshot of website'}) - if result['task'].get('domURL'): - misp_val = result['task']['domURL'] - misp_comment = 'Link to DOM (source: urlscan.io)' - r.append({'types': 'link', - 'categories': ['External analysis'], - 'values': misp_val, - 'comment': misp_comment}) - return r @@ -262,6 +255,7 @@ class urlscanAPI(): tries -= 1 else: return json.loads(results) + raise Exception('Results contained a 404 status error and could not be processed.') def search_url(self, query):