Create urlscan.py

pull/215/head
David J 2018-08-10 16:00:01 -05:00 committed by GitHub
parent 73161ad153
commit bdbf538893
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 269 additions and 0 deletions

View File

@ -0,0 +1,269 @@
import json
import requests
import logging
import sys
import time
# Need base64 if encoding data for attachments, but disabled for now
# import base64
log = logging.getLogger('urlscan')
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
moduleinfo = {
'version': '0.1',
'author': 'Dave Johnson',
'description': 'Module to query urlscan.io',
'module-type': ['expansion']
}
moduleconfig = ['apikey']
misperrors = {'error': 'Error'}
mispattributes = {
'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url'],
'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link']
}
def handler(q=False):
if q is False:
return False
request = json.loads(q)
if (request.get('config')):
if (request['config'].get('apikey') is None):
misperrors['error'] = 'urlscan apikey is missing'
return misperrors
client = urlscanAPI(request['config']['apikey'])
r = {'results': []}
if 'ip-src' in request:
r['results'] += lookup_indicator(client, request['ip-src'])
if 'ip-dst' in request:
r['results'] += lookup_indicator(client, request['ip-dst'])
if 'domain' in request:
r['results'] += lookup_indicator(client, request['domain'])
if 'hostname' in request:
r['results'] += lookup_indicator(client, request['hostname'])
if 'url' in request:
r['results'] += lookup_indicator(client, request['url'])
uniq = []
for item in r['results']:
if item not in uniq:
uniq.append(item)
r['results'] = uniq
return r
def lookup_indicator(client, query):
result = client.search_url(query)
log.debug('RESULTS: ' + json.dumps(result))
r = []
if result.get('page'):
if result['page'].get('domain'):
misp_val = result['page']['domain']
misp_comment = "Domain associated with {} (source: urlscan.io)".format(query)
r.append({'types': 'domain',
'categories': ['Network activity'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('ip'):
misp_val = result['page']['ip']
misp_comment = "IP associated with {} (source: urlscan.io)".format(query)
r.append({'types': 'ip-dst',
'categories': ['Network activity'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('country'):
misp_val = 'Country: ' + result['page']['country']
if result['page'].get('city'):
misp_val += ', City: ' + result['page']['city']
misp_comment = "Location associated with {} (source: urlscan.io)".format(query)
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result['page'].get('asn'):
misp_val = result['page']['asn']
misp_comment = "ASN associated with {} (source: urlscan.io)".format(query)
r.append({'types': 'AS', 'categories': ['Network activity'], 'values': misp_val, 'comment': misp_comment})
if result['page'].get('asnname'):
misp_val = result['page']['asnname']
misp_comment = "ASN name associated with {} (source: urlscan.io)".format(query)
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('stats'):
if result['stats'].get('malicious'):
log.debug('There is something in results > stats > malicious')
threat_list = set()
if 'matches' in result['meta']['processors']['gsb']['data']:
for item in result['meta']['processors']['gsb']['data']['matches']:
if item['threatType']:
threat_list.add(item['threatType'])
threat_list = ', '.join(threat_list)
log.debug('threat_list values are: \'' + threat_list + '\'')
if threat_list:
misp_val = '{} threat(s) detected'.format(threat_list)
misp_comment = '{} malicious indicator(s) were present on ' \
'{} (source: urlscan.io)'.format(result['stats']['malicious'], query, threat_list)
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('lists'):
if result['lists'].get('urls'):
for url in result['lists']['urls']:
url = url.lower()
if 'office' in url:
misp_val = 'Possible Microsoft Office themed phishing page'
misp_comment = 'There was resource containing an \'Office\' string in the URL.'
elif 'o365' in url or '0365' in url:
misp_val = 'Possible Microsoft O365 themed phishing page'
misp_comment = 'There was resource containing an \'O365\' string in the URL.'
elif 'microsoft' in url:
misp_val = 'Possible Microsoft themed phishing page'
misp_comment = 'There was resource containing an \'Office\' string in the URL.'
elif 'paypal' in url:
misp_val = 'Possible PayPal themed phishing page'
misp_comment = 'There was resource containing a \'PayPal\' string in the URL.'
elif 'onedrive' in url:
misp_val = 'Possible OneDrive themed phishing page'
misp_comment = 'There was resource containing a \'OneDrive\' string in the URL.'
elif 'docusign' in url:
misp_val = 'Possible DocuSign themed phishing page'
misp_comment = 'There was resource containing a \'DocuSign\' string in the URL'
r.append({'types': 'text',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result.get('task'):
if result['task'].get('reportURL'):
misp_val = result['task']['reportURL']
misp_comment = 'Link to full report (source: urlscan.io)'
r.append({'types': 'link',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
if result['task'].get('screenshotURL'):
image_url = result['task']['screenshotURL']
misp_comment = 'Link to screenshot (source: urlscan.io)'
r.append({'types': 'link',
'categories': ['External analysis'],
'values': image_url,
'comment': misp_comment})
### TO DO ###
### Add ability to add an in-line screenshot of the target website into an attribute
# screenshot = requests.get(image_url).content
# r.append({'types': ['attachment'],
# 'categories': ['External analysis'],
# 'values': image_url,
# 'image': str(base64.b64encode(screenshot), 'utf-8'),
# 'comment': 'Screenshot of website'})
if result['task'].get('domURL'):
misp_val = result['task']['domURL']
misp_comment = 'Link to DOM (source: urlscan.io)'
r.append({'types': 'link',
'categories': ['External analysis'],
'values': misp_val,
'comment': misp_comment})
return r
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
class urlscanAPI():
def __init__(self, apikey=None, uuid=None):
self.key = apikey
self.uuid = uuid
def request(self, query):
log.debug('From request function with the parameter: ' + query)
payload = {'url': query}
headers = {'API-Key': self.key,
'Content-Type': "application/json",
'Cache-Control': "no-cache"}
# Troubleshooting problems with initial search request
log.debug('PAYLOAD: ' + json.dumps(payload))
log.debug('HEADERS: ' + json.dumps(headers))
search_url_string = "https://urlscan.io/api/v1/scan/"
response = requests.request("POST",
search_url_string,
data=json.dumps(payload),
headers=headers)
# HTTP 400 - Bad Request
if response.status_code == 400:
raise Exception('HTTP Error 400 - Bad Request')
# HTTP 404 - Not found
if response.status_code == 404:
raise Exception('HTTP Error 404 - These are not the droids you\'re looking for')
# Any other status code
if response.status_code != 200:
raise Exception('HTTP Error ' + str(response.status_code))
if response.text:
response = json.loads(response.content.decode("utf-8"))
time.sleep(3)
self.uuid = response['uuid']
# Strings for to check for errors on the results page
# Null response string for any unavailable resources
null_response_string = '"status": 404'
# Redirect string accounting for 301/302/303/307/308 status codes
redirect_string = '"status": 30'
# Normal response string with 200 status code
normal_response_string = '"status": 200'
results_url_string = "https://urlscan.io/api/v1/result/" + self.uuid
log.debug('Results URL: ' + results_url_string)
# Need to wait for results to process and check if they are valid
tries = 10
while tries >= 0:
results = requests.request("GET", results_url_string)
log.debug('Made a GET request')
results = results.content.decode("utf-8")
# checking if there is a 404 status code and no available resources
if null_response_string in results and \
redirect_string not in results and \
normal_response_string not in results:
log.debug('Results not processed. Please check again later.')
time.sleep(3)
tries -= 1
else:
return json.loads(results)
raise Exception('Results contained a 404 status error and could not be processed.')
def search_url(self, query):
log.debug('From search_url with parameter: ' + query)
return self.request(query)