mirror of https://github.com/MISP/misp-modules
Merge branch 'master' of github.com:MISP/misp-modules
commit
18bad54603
|
@ -0,0 +1,263 @@
|
|||
import json
|
||||
import requests
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
log = logging.getLogger('urlscan')
|
||||
log.setLevel(logging.DEBUG)
|
||||
ch = logging.StreamHandler(sys.stdout)
|
||||
ch.setLevel(logging.DEBUG)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
ch.setFormatter(formatter)
|
||||
log.addHandler(ch)
|
||||
|
||||
moduleinfo = {
|
||||
'version': '0.1',
|
||||
'author': 'Dave Johnson',
|
||||
'description': 'Module to query urlscan.io',
|
||||
'module-type': ['expansion']
|
||||
}
|
||||
|
||||
moduleconfig = ['apikey']
|
||||
misperrors = {'error': 'Error'}
|
||||
mispattributes = {
|
||||
'input': ['hostname', 'domain', 'url'],
|
||||
'output': ['hostname', 'domain', 'ip-src', 'ip-dst', 'url', 'text', 'link']
|
||||
}
|
||||
|
||||
|
||||
def handler(q=False):
|
||||
if q is False:
|
||||
return False
|
||||
request = json.loads(q)
|
||||
if (request.get('config')):
|
||||
if (request['config'].get('apikey') is None):
|
||||
misperrors['error'] = 'urlscan apikey is missing'
|
||||
return misperrors
|
||||
client = urlscanAPI(request['config']['apikey'])
|
||||
|
||||
r = {'results': []}
|
||||
|
||||
if 'ip-src' in request:
|
||||
r['results'] += lookup_indicator(client, request['ip-src'])
|
||||
if 'ip-dst' in request:
|
||||
r['results'] += lookup_indicator(client, request['ip-dst'])
|
||||
if 'domain' in request:
|
||||
r['results'] += lookup_indicator(client, request['domain'])
|
||||
if 'hostname' in request:
|
||||
r['results'] += lookup_indicator(client, request['hostname'])
|
||||
if 'url' in request:
|
||||
r['results'] += lookup_indicator(client, request['url'])
|
||||
|
||||
# Return any errors generated from lookup to the UI and remove duplicates
|
||||
|
||||
uniq = []
|
||||
log.debug(r['results'])
|
||||
for item in r['results']:
|
||||
log.debug(item)
|
||||
if 'error' in item:
|
||||
misperrors['error'] = item['error']
|
||||
return misperrors
|
||||
if item not in uniq:
|
||||
uniq.append(item)
|
||||
r['results'] = uniq
|
||||
return r
|
||||
|
||||
|
||||
def lookup_indicator(client, query):
|
||||
result = client.search_url(query)
|
||||
log.debug('RESULTS: ' + json.dumps(result))
|
||||
r = []
|
||||
misp_comment = "{}: Enriched via the urlscan module".format(query)
|
||||
|
||||
# Determine if the page is reachable
|
||||
for request in result['data']['requests']:
|
||||
if request['response'].get('failed'):
|
||||
if request['response']['failed']['errorText']:
|
||||
log.debug('The page could not load')
|
||||
r.append(
|
||||
{'error': 'Domain could not be resolved: {}'.format(request['response']['failed']['errorText'])})
|
||||
|
||||
if result.get('page'):
|
||||
if result['page'].get('domain'):
|
||||
misp_val = result['page']['domain']
|
||||
r.append({'types': 'domain',
|
||||
'categories': ['Network activity'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result['page'].get('ip'):
|
||||
misp_val = result['page']['ip']
|
||||
r.append({'types': 'ip-dst',
|
||||
'categories': ['Network activity'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result['page'].get('country'):
|
||||
misp_val = 'country: ' + result['page']['country']
|
||||
if result['page'].get('city'):
|
||||
misp_val += ', city: ' + result['page']['city']
|
||||
r.append({'types': 'text',
|
||||
'categories': ['External analysis'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result['page'].get('asn'):
|
||||
misp_val = result['page']['asn']
|
||||
r.append({'types': 'AS', 'categories': ['External analysis'], 'values': misp_val, 'comment': misp_comment})
|
||||
|
||||
if result['page'].get('asnname'):
|
||||
misp_val = result['page']['asnname']
|
||||
r.append({'types': 'text',
|
||||
'categories': ['External analysis'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result.get('stats'):
|
||||
if result['stats'].get('malicious'):
|
||||
log.debug('There is something in results > stats > malicious')
|
||||
threat_list = set()
|
||||
|
||||
if 'matches' in result['meta']['processors']['gsb']['data']:
|
||||
for item in result['meta']['processors']['gsb']['data']['matches']:
|
||||
if item['threatType']:
|
||||
threat_list.add(item['threatType'])
|
||||
|
||||
threat_list = ', '.join(threat_list)
|
||||
log.debug('threat_list values are: \'' + threat_list + '\'')
|
||||
|
||||
if threat_list:
|
||||
misp_val = '{} threat(s) detected'.format(threat_list)
|
||||
r.append({'types': 'text',
|
||||
'categories': ['External analysis'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result.get('lists'):
|
||||
if result['lists'].get('urls'):
|
||||
for url in result['lists']['urls']:
|
||||
url = url.lower()
|
||||
if 'office' in url:
|
||||
misp_val = "Possible Office-themed phishing"
|
||||
elif 'o365' in url or '0365' in url:
|
||||
misp_val = "Possible O365-themed phishing"
|
||||
elif 'microsoft' in url:
|
||||
misp_val = "Possible Microsoft-themed phishing"
|
||||
elif 'paypal' in url:
|
||||
misp_val = "Possible PayPal-themed phishing"
|
||||
elif 'onedrive' in url:
|
||||
misp_val = "Possible OneDrive-themed phishing"
|
||||
elif 'docusign' in url:
|
||||
misp_val = "Possible DocuSign-themed phishing"
|
||||
r.append({'types': 'text',
|
||||
'categories': ['External analysis'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result.get('task'):
|
||||
if result['task'].get('reportURL'):
|
||||
misp_val = result['task']['reportURL']
|
||||
r.append({'types': 'link',
|
||||
'categories': ['External analysis'],
|
||||
'values': misp_val,
|
||||
'comment': misp_comment})
|
||||
|
||||
if result['task'].get('screenshotURL'):
|
||||
image_url = result['task']['screenshotURL']
|
||||
r.append({'types': 'link',
|
||||
'categories': ['External analysis'],
|
||||
'values': image_url,
|
||||
'comment': misp_comment})
|
||||
### TO DO ###
|
||||
### Add ability to add an in-line screenshot of the target website into an attribute
|
||||
# screenshot = requests.get(image_url).content
|
||||
# r.append({'types': ['attachment'],
|
||||
# 'categories': ['External analysis'],
|
||||
# 'values': image_url,
|
||||
# 'image': str(base64.b64encode(screenshot), 'utf-8'),
|
||||
# 'comment': 'Screenshot of website'})
|
||||
|
||||
return r
|
||||
|
||||
|
||||
def introspection():
|
||||
return mispattributes
|
||||
|
||||
|
||||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
||||
|
||||
|
||||
class urlscanAPI():
|
||||
def __init__(self, apikey=None, uuid=None):
|
||||
self.key = apikey
|
||||
self.uuid = uuid
|
||||
|
||||
def request(self, query):
|
||||
log.debug('From request function with the parameter: ' + query)
|
||||
payload = {'url': query}
|
||||
headers = {'API-Key': self.key,
|
||||
'Content-Type': "application/json",
|
||||
'Cache-Control': "no-cache"}
|
||||
|
||||
# Troubleshooting problems with initial search request
|
||||
log.debug('PAYLOAD: ' + json.dumps(payload))
|
||||
log.debug('HEADERS: ' + json.dumps(headers))
|
||||
|
||||
search_url_string = "https://urlscan.io/api/v1/scan/"
|
||||
response = requests.request("POST",
|
||||
search_url_string,
|
||||
data=json.dumps(payload),
|
||||
headers=headers)
|
||||
|
||||
# HTTP 400 - Bad Request
|
||||
if response.status_code == 400:
|
||||
raise Exception('HTTP Error 400 - Bad Request')
|
||||
|
||||
# HTTP 404 - Not found
|
||||
if response.status_code == 404:
|
||||
raise Exception('HTTP Error 404 - These are not the droids you\'re looking for')
|
||||
|
||||
# Any other status code
|
||||
if response.status_code != 200:
|
||||
raise Exception('HTTP Error ' + str(response.status_code))
|
||||
|
||||
if response.text:
|
||||
response = json.loads(response.content.decode("utf-8"))
|
||||
time.sleep(3)
|
||||
self.uuid = response['uuid']
|
||||
|
||||
# Strings for to check for errors on the results page
|
||||
# Null response string for any unavailable resources
|
||||
null_response_string = '"status": 404'
|
||||
# Redirect string accounting for 301/302/303/307/308 status codes
|
||||
redirect_string = '"status": 30'
|
||||
# Normal response string with 200 status code
|
||||
normal_response_string = '"status": 200'
|
||||
|
||||
results_url_string = "https://urlscan.io/api/v1/result/" + self.uuid
|
||||
log.debug('Results URL: ' + results_url_string)
|
||||
|
||||
# Need to wait for results to process and check if they are valid
|
||||
tries = 10
|
||||
while tries >= 0:
|
||||
results = requests.request("GET", results_url_string)
|
||||
log.debug('Made a GET request')
|
||||
results = results.content.decode("utf-8")
|
||||
# checking if there is a 404 status code and no available resources
|
||||
if null_response_string in results and \
|
||||
redirect_string not in results and \
|
||||
normal_response_string not in results:
|
||||
log.debug('Results not processed. Please check again later.')
|
||||
time.sleep(3)
|
||||
tries -= 1
|
||||
else:
|
||||
return json.loads(results)
|
||||
|
||||
raise Exception('Results contained a 404 status error and could not be processed.')
|
||||
|
||||
def search_url(self, query):
|
||||
log.debug('From search_url with parameter: ' + query)
|
||||
return self.request(query)
|
|
@ -15,7 +15,7 @@ misperrors = {'error': 'Error'}
|
|||
userConfig = {}
|
||||
inputSource = ['file']
|
||||
|
||||
moduleinfo = {'version': '0.8', 'author': 'Christophe Vandeplas',
|
||||
moduleinfo = {'version': '0.9', 'author': 'Christophe Vandeplas',
|
||||
'description': 'Import for ThreatAnalyzer archive.zip/analysis.json files',
|
||||
'module-type': ['import']}
|
||||
|
||||
|
@ -45,7 +45,7 @@ def handler(q=False):
|
|||
if re.match(r"Analysis/proc_\d+/modified_files/mapping\.log", zip_file_name):
|
||||
with zf.open(zip_file_name, mode='r', pwd=None) as fp:
|
||||
file_data = fp.read()
|
||||
for line in file_data.decode().split('\n'):
|
||||
for line in file_data.decode("utf-8", 'ignore').split('\n'):
|
||||
if not line:
|
||||
continue
|
||||
if line.count('|') == 3:
|
||||
|
@ -55,7 +55,8 @@ def handler(q=False):
|
|||
l_fname = cleanup_filepath(l_fname)
|
||||
if l_fname:
|
||||
if l_size == 0:
|
||||
pass # FIXME create an attribute for the filename/path
|
||||
results.append({'values': l_fname, 'type': 'filename', 'to_ids': True,
|
||||
'categories': ['Artifacts dropped', 'Payload delivery'], 'comment': ''})
|
||||
else:
|
||||
# file is a non empty sample, upload the sample later
|
||||
modified_files_mapping[l_md5] = l_fname
|
||||
|
@ -144,13 +145,14 @@ def process_analysis_json(analysis_json):
|
|||
# )
|
||||
yield({'values': connection_section_connection['@remote_hostname'], 'type': 'hostname', 'to_ids': True, 'comment': ''})
|
||||
if 'http_command' in connection_section_connection:
|
||||
# print('connection_section_connection HTTP COMMAND: {}\t{}'.format(
|
||||
# connection_section_connection['http_command']['@method'], # comment
|
||||
# connection_section_connection['http_command']['@url']) # url
|
||||
# )
|
||||
val = cleanup_url(connection_section_connection['http_command']['@url'])
|
||||
if val:
|
||||
yield({'values': val, 'type': 'url', 'categories': ['Network activity'], 'to_ids': True, 'comment': connection_section_connection['http_command']['@method']})
|
||||
for http_command in connection_section_connection['http_command']:
|
||||
# print('connection_section_connection HTTP COMMAND: {}\t{}'.format(
|
||||
# connection_section_connection['http_command']['@method'], # comment
|
||||
# connection_section_connection['http_command']['@url']) # url
|
||||
# )
|
||||
val = cleanup_url(http_command['@url'])
|
||||
if val:
|
||||
yield({'values': val, 'type': 'url', 'categories': ['Network activity'], 'to_ids': True, 'comment': http_command['@method']})
|
||||
|
||||
if 'http_header' in connection_section_connection:
|
||||
for http_header in connection_section_connection['http_header']:
|
||||
|
@ -453,9 +455,9 @@ def cleanup_filepath(item):
|
|||
'\\AppData\\Roaming\\Adobe\\Acrobat\\9.0\\UserCache.bin',
|
||||
|
||||
'\\AppData\\Roaming\\Macromedia\\Flash Player\\macromedia.com\\support\\flashplayer\\sys\\settings.sol',
|
||||
'\\AppData\\Roaming\Adobe\\Flash Player\\NativeCache\\',
|
||||
'\\AppData\\Roaming\\Adobe\\Flash Player\\NativeCache\\',
|
||||
'C:\\Windows\\AppCompat\\Programs\\',
|
||||
'C:\~' # caused by temp file created by MS Office when opening malicious doc/xls/...
|
||||
'C:\\~' # caused by temp file created by MS Office when opening malicious doc/xls/...
|
||||
}
|
||||
if list_in_string(noise_substrings, item):
|
||||
return None
|
||||
|
@ -531,4 +533,3 @@ def introspection():
|
|||
def version():
|
||||
moduleinfo['config'] = moduleconfig
|
||||
return moduleinfo
|
||||
|
||||
|
|
Loading…
Reference in New Issue