diff --git a/misp_modules/modules/expansion/urlscan.py b/misp_modules/modules/expansion/urlscan.py new file mode 100644 index 0000000..8f4067c --- /dev/null +++ b/misp_modules/modules/expansion/urlscan.py @@ -0,0 +1,269 @@ +import json +import requests +import logging +import sys +import time +# Need base64 if encoding data for attachments, but disabled for now +# import base64 + +log = logging.getLogger('urlscan') +log.setLevel(logging.DEBUG) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +log.addHandler(ch) + +moduleinfo = { + 'version': '0.1', + 'author': 'Dave Johnson', + 'description': 'Module to query urlscan.io', + 'module-type': ['expansion'] + } + +moduleconfig = ['apikey'] +misperrors = {'error': 'Error'} +mispattributes = { + 'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url'], + 'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link'] + } + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + if (request.get('config')): + if (request['config'].get('apikey') is None): + misperrors['error'] = 'urlscan apikey is missing' + return misperrors + client = urlscanAPI(request['config']['apikey']) + + r = {'results': []} + + if 'ip-src' in request: + r['results'] += lookup_indicator(client, request['ip-src']) + if 'ip-dst' in request: + r['results'] += lookup_indicator(client, request['ip-dst']) + if 'domain' in request: + r['results'] += lookup_indicator(client, request['domain']) + if 'hostname' in request: + r['results'] += lookup_indicator(client, request['hostname']) + if 'url' in request: + r['results'] += lookup_indicator(client, request['url']) + + uniq = [] + for item in r['results']: + if item not in uniq: + uniq.append(item) + r['results'] = uniq + return r + + +def lookup_indicator(client, query): + result = client.search_url(query) + log.debug('RESULTS: ' + json.dumps(result)) + r = [] + if result.get('page'): + if result['page'].get('domain'): + misp_val = result['page']['domain'] + misp_comment = "Domain associated with {} (source: urlscan.io)".format(query) + r.append({'types': 'domain', + 'categories': ['Network activity'], + 'values': misp_val, + 'comment': misp_comment}) + + if result['page'].get('ip'): + misp_val = result['page']['ip'] + misp_comment = "IP associated with {} (source: urlscan.io)".format(query) + r.append({'types': 'ip-dst', + 'categories': ['Network activity'], + 'values': misp_val, + 'comment': misp_comment}) + + if result['page'].get('country'): + misp_val = 'Country: ' + result['page']['country'] + if result['page'].get('city'): + misp_val += ', City: ' + result['page']['city'] + misp_comment = "Location associated with {} (source: urlscan.io)".format(query) + r.append({'types': 'text', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + if result['page'].get('asn'): + misp_val = result['page']['asn'] + misp_comment = "ASN associated with {} (source: urlscan.io)".format(query) + r.append({'types': 'AS', 'categories': ['Network activity'], 'values': misp_val, 'comment': misp_comment}) + + if result['page'].get('asnname'): + misp_val = result['page']['asnname'] + misp_comment = "ASN name associated with {} (source: urlscan.io)".format(query) + r.append({'types': 'text', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + if result.get('stats'): + if result['stats'].get('malicious'): + log.debug('There is something in results > stats > malicious') + threat_list = set() + + if 'matches' in result['meta']['processors']['gsb']['data']: + for item in result['meta']['processors']['gsb']['data']['matches']: + if item['threatType']: + threat_list.add(item['threatType']) + + threat_list = ', '.join(threat_list) + log.debug('threat_list values are: \'' + threat_list + '\'') + + if threat_list: + misp_val = '{} threat(s) detected'.format(threat_list) + misp_comment = '{} malicious indicator(s) were present on ' \ + '{} (source: urlscan.io)'.format(result['stats']['malicious'], query, threat_list) + r.append({'types': 'text', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + if result.get('lists'): + if result['lists'].get('urls'): + for url in result['lists']['urls']: + url = url.lower() + if 'office' in url: + misp_val = 'Possible Microsoft Office themed phishing page' + misp_comment = 'There was resource containing an \'Office\' string in the URL.' + elif 'o365' in url or '0365' in url: + misp_val = 'Possible Microsoft O365 themed phishing page' + misp_comment = 'There was resource containing an \'O365\' string in the URL.' + elif 'microsoft' in url: + misp_val = 'Possible Microsoft themed phishing page' + misp_comment = 'There was resource containing an \'Office\' string in the URL.' + elif 'paypal' in url: + misp_val = 'Possible PayPal themed phishing page' + misp_comment = 'There was resource containing a \'PayPal\' string in the URL.' + elif 'onedrive' in url: + misp_val = 'Possible OneDrive themed phishing page' + misp_comment = 'There was resource containing a \'OneDrive\' string in the URL.' + elif 'docusign' in url: + misp_val = 'Possible DocuSign themed phishing page' + misp_comment = 'There was resource containing a \'DocuSign\' string in the URL' + r.append({'types': 'text', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + if result.get('task'): + if result['task'].get('reportURL'): + misp_val = result['task']['reportURL'] + misp_comment = 'Link to full report (source: urlscan.io)' + r.append({'types': 'link', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + if result['task'].get('screenshotURL'): + image_url = result['task']['screenshotURL'] + misp_comment = 'Link to screenshot (source: urlscan.io)' + r.append({'types': 'link', + 'categories': ['External analysis'], + 'values': image_url, + 'comment': misp_comment}) + ### TO DO ### + ### Add ability to add an in-line screenshot of the target website into an attribute + # screenshot = requests.get(image_url).content + # r.append({'types': ['attachment'], + # 'categories': ['External analysis'], + # 'values': image_url, + # 'image': str(base64.b64encode(screenshot), 'utf-8'), + # 'comment': 'Screenshot of website'}) + + if result['task'].get('domURL'): + misp_val = result['task']['domURL'] + misp_comment = 'Link to DOM (source: urlscan.io)' + r.append({'types': 'link', + 'categories': ['External analysis'], + 'values': misp_val, + 'comment': misp_comment}) + + return r + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo['config'] = moduleconfig + return moduleinfo + + +class urlscanAPI(): + def __init__(self, apikey=None, uuid=None): + self.key = apikey + self.uuid = uuid + + def request(self, query): + log.debug('From request function with the parameter: ' + query) + payload = {'url': query} + headers = {'API-Key': self.key, + 'Content-Type': "application/json", + 'Cache-Control': "no-cache"} + + # Troubleshooting problems with initial search request + log.debug('PAYLOAD: ' + json.dumps(payload)) + log.debug('HEADERS: ' + json.dumps(headers)) + + search_url_string = "https://urlscan.io/api/v1/scan/" + response = requests.request("POST", + search_url_string, + data=json.dumps(payload), + headers=headers) + + # HTTP 400 - Bad Request + if response.status_code == 400: + raise Exception('HTTP Error 400 - Bad Request') + + # HTTP 404 - Not found + if response.status_code == 404: + raise Exception('HTTP Error 404 - These are not the droids you\'re looking for') + + # Any other status code + if response.status_code != 200: + raise Exception('HTTP Error ' + str(response.status_code)) + + if response.text: + response = json.loads(response.content.decode("utf-8")) + time.sleep(3) + self.uuid = response['uuid'] + + # Strings for to check for errors on the results page + # Null response string for any unavailable resources + null_response_string = '"status": 404' + # Redirect string accounting for 301/302/303/307/308 status codes + redirect_string = '"status": 30' + # Normal response string with 200 status code + normal_response_string = '"status": 200' + + results_url_string = "https://urlscan.io/api/v1/result/" + self.uuid + log.debug('Results URL: ' + results_url_string) + + # Need to wait for results to process and check if they are valid + tries = 10 + while tries >= 0: + results = requests.request("GET", results_url_string) + log.debug('Made a GET request') + results = results.content.decode("utf-8") + # checking if there is a 404 status code and no available resources + if null_response_string in results and \ + redirect_string not in results and \ + normal_response_string not in results: + log.debug('Results not processed. Please check again later.') + time.sleep(3) + tries -= 1 + else: + return json.loads(results) + raise Exception('Results contained a 404 status error and could not be processed.') + + def search_url(self, query): + log.debug('From search_url with parameter: ' + query) + return self.request(query)