misp-modules/misp_modules/modules/expansion/threatminer.py

153 lines
6.8 KiB
Python
Raw Permalink Normal View History

2017-03-07 03:36:00 +01:00
import json
import requests
from collections import defaultdict
2017-03-07 03:36:00 +01:00
misperrors = {'error': 'Error'}
mispattributes = {'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'md5', 'sha1', 'sha256', 'sha512'],
'output': ['domain', 'ip-src', 'ip-dst', 'text', 'md5', 'sha1', 'sha256', 'sha512', 'ssdeep',
'authentihash', 'filename', 'whois-registrant-email', 'url', 'link']
}
# possible module-types: 'expansion', 'hover' or both
2024-08-12 11:23:10 +02:00
moduleinfo = {
'version': '1',
'author': 'KX499',
'description': 'Module to get information from ThreatMiner.',
'module-type': ['expansion'],
'name': 'ThreatMiner Lookup',
'logo': 'threatminer.png',
'requirements': [],
'features': 'This module takes a MISP attribute as input and queries ThreatMiner with it.\n\nThe result of this query is then parsed and some data is mapped into MISP attributes in order to enrich the input attribute.',
'references': ['https://www.threatminer.org/'],
'input': 'A MISP attribute included in the following list:\n- hostname\n- domain\n- ip-src\n- ip-dst\n- md5\n- sha1\n- sha256\n- sha512',
'output': 'MISP attributes mapped from the result of the query on ThreatMiner, included in the following list:\n- domain\n- ip-src\n- ip-dst\n- text\n- md5\n- sha1\n- sha256\n- sha512\n- ssdeep\n- authentihash\n- filename\n- whois-registrant-email\n- url\n- link',
}
2017-03-07 03:36:00 +01:00
2019-10-17 10:45:51 +02:00
class ThreatMiner():
def __init__(self):
self.results = defaultdict(set)
self.comment = '{}: Threatminer - {}'
self.types_mapping = {'domain': '_get_domain', 'hostname': '_get_domain',
2019-10-17 10:45:51 +02:00
'ip-dst': '_get_ip', 'ip-src': '_get_ip',
'md5': '_get_hash', 'sha1': '_get_hash',
'sha256': '_get_hash', 'sha512': '_get_hash'}
@property
def parsed_results(self):
to_return = []
for key, values in self.results.items():
if values:
input_value, comment = key[:2]
types = [k for k in key[2:]]
to_return.append({'types': types, 'values': list(values),
'comment': self.comment.format(input_value, comment)})
return to_return
def parse_query(self, request):
for input_type, to_call in self.types_mapping.items():
if request.get(input_type):
getattr(self, to_call)(request[input_type])
def _get_domain(self, q):
queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'),
3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'),
5: ('_add_domain', 'subdomain'), 6: ('_add_link', 'report')}
for flag, mapped in queries_mapping.items():
req = requests.get('https://www.threatminer.org/domain.php', params={'q': q, 'api': 'True', 'rt': flag})
if not req.status_code == 200:
continue
results = req.json().get('results')
if not results:
continue
to_call, comment = mapped
getattr(self, to_call)(results, q, comment)
def _get_hash(self, q):
queries_mapping = {1: ('_add_filename', 'file'), 3: ('_add_network', 'network'),
6: ('_add_text', 'detection'), 7: ('_add_hash', 'report')}
for flag, mapped in queries_mapping.items():
req = requests.get('https://www.threatminer.org/sample.php', params={'q': q, 'api': 'True', 'rt': flag})
if not req.status_code == 200:
continue
results = req.json().get('results')
if not results:
continue
to_call, comment = mapped
getattr(self, to_call)(results, q, comment)
def _get_ip(self, q):
queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'),
3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'),
5: ('_add_x509', 'ssl'), 6: ('_add_link', 'report')}
for flag, mapped in queries_mapping.items():
req = requests.get('https://www.threatminer.org/host.php', params={'q': q, 'api': 'True', 'rt': flag})
if not req.status_code == 200:
continue
results = req.json().get('results')
if not results:
continue
to_call, comment = mapped
getattr(self, to_call)(results, q, comment)
def _add_domain(self, results, q, comment):
self.results[(q, comment, 'domain')].update({result for result in results if isinstance(result, str)})
def _add_filename(self, results, q, comment):
self.results[(q, comment, 'filename')].update({result['file_name'] for result in results if result.get('file_name')})
def _add_hash(self, results, q, comment):
self.results[(q, comment, 'sha256')].update({result for result in results if isinstance(result, str)})
def _add_ip(self, results, q, comment):
self.results[(q, comment, 'ip-src', 'ip-dst')].update({result['ip'] for result in results if result.get('ip')})
def _add_link(self, results, q, comment):
self.results[(q, comment, 'link')].update({result['URL'] for result in results if result.get('URL')})
def _add_network(self, results, q, comment):
for result in results:
domains = result.get('domains')
if domains:
self.results[(q, comment, 'domain')].update({domain['domain'] for domain in domains if domain.get('domain')})
hosts = result.get('hosts')
if hosts:
self.results[(q, comment, 'ip-src', 'ip-dst')].update({host for host in hosts if isinstance(host, str)})
def _add_text(self, results, q, comment):
for result in results:
detections = result.get('av_detections')
if detections:
self.results[(q, comment, 'text')].update({d['detection'] for d in detections if d.get('detection')})
def _add_uri(self, results, q, comment):
self.results[(q, comment, 'url')].update({result['uri'] for result in results if result.get('uri')})
def _add_whois(self, results, q, comment):
for result in results:
emails = result.get('whois', {}).get('emails')
if emails:
self.results[(q, comment, 'whois-registrant-email')].update({email for em_type, email in emails.items() if em_type == 'registrant' and email})
def _add_x509(self, results, q, comment):
self.results[(q, comment, 'x509-fingerprint-sha1')].update({result for result in results if isinstance(result, str)})
2017-03-07 03:36:00 +01:00
def handler(q=False):
if q is False:
return False
q = json.loads(q)
parser = ThreatMiner()
parser.parse_query(q)
return {'results': parser.parsed_results}
2017-03-07 03:36:00 +01:00
def introspection():
return mispattributes
def version():
return moduleinfo