mirror of https://github.com/MISP/misp-modules
153 lines
6.8 KiB
Python
Executable File
153 lines
6.8 KiB
Python
Executable File
import json
|
|
import requests
|
|
from collections import defaultdict
|
|
|
|
misperrors = {'error': 'Error'}
|
|
mispattributes = {'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'md5', 'sha1', 'sha256', 'sha512'],
|
|
'output': ['domain', 'ip-src', 'ip-dst', 'text', 'md5', 'sha1', 'sha256', 'sha512', 'ssdeep',
|
|
'authentihash', 'filename', 'whois-registrant-email', 'url', 'link']
|
|
}
|
|
|
|
# possible module-types: 'expansion', 'hover' or both
|
|
moduleinfo = {
|
|
'version': '1',
|
|
'author': 'KX499',
|
|
'description': 'Module to get information from ThreatMiner.',
|
|
'module-type': ['expansion'],
|
|
'name': 'ThreatMiner Lookup',
|
|
'logo': 'threatminer.png',
|
|
'requirements': [],
|
|
'features': 'This module takes a MISP attribute as input and queries ThreatMiner with it.\n\nThe result of this query is then parsed and some data is mapped into MISP attributes in order to enrich the input attribute.',
|
|
'references': ['https://www.threatminer.org/'],
|
|
'input': 'A MISP attribute included in the following list:\n- hostname\n- domain\n- ip-src\n- ip-dst\n- md5\n- sha1\n- sha256\n- sha512',
|
|
'output': 'MISP attributes mapped from the result of the query on ThreatMiner, included in the following list:\n- domain\n- ip-src\n- ip-dst\n- text\n- md5\n- sha1\n- sha256\n- sha512\n- ssdeep\n- authentihash\n- filename\n- whois-registrant-email\n- url\n- link',
|
|
}
|
|
|
|
|
|
class ThreatMiner():
|
|
def __init__(self):
|
|
self.results = defaultdict(set)
|
|
self.comment = '{}: Threatminer - {}'
|
|
self.types_mapping = {'domain': '_get_domain', 'hostname': '_get_domain',
|
|
'ip-dst': '_get_ip', 'ip-src': '_get_ip',
|
|
'md5': '_get_hash', 'sha1': '_get_hash',
|
|
'sha256': '_get_hash', 'sha512': '_get_hash'}
|
|
|
|
@property
|
|
def parsed_results(self):
|
|
to_return = []
|
|
for key, values in self.results.items():
|
|
if values:
|
|
input_value, comment = key[:2]
|
|
types = [k for k in key[2:]]
|
|
to_return.append({'types': types, 'values': list(values),
|
|
'comment': self.comment.format(input_value, comment)})
|
|
return to_return
|
|
|
|
def parse_query(self, request):
|
|
for input_type, to_call in self.types_mapping.items():
|
|
if request.get(input_type):
|
|
getattr(self, to_call)(request[input_type])
|
|
|
|
def _get_domain(self, q):
|
|
queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'),
|
|
3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'),
|
|
5: ('_add_domain', 'subdomain'), 6: ('_add_link', 'report')}
|
|
for flag, mapped in queries_mapping.items():
|
|
req = requests.get('https://www.threatminer.org/domain.php', params={'q': q, 'api': 'True', 'rt': flag})
|
|
if not req.status_code == 200:
|
|
continue
|
|
results = req.json().get('results')
|
|
if not results:
|
|
continue
|
|
to_call, comment = mapped
|
|
getattr(self, to_call)(results, q, comment)
|
|
|
|
def _get_hash(self, q):
|
|
queries_mapping = {1: ('_add_filename', 'file'), 3: ('_add_network', 'network'),
|
|
6: ('_add_text', 'detection'), 7: ('_add_hash', 'report')}
|
|
for flag, mapped in queries_mapping.items():
|
|
req = requests.get('https://www.threatminer.org/sample.php', params={'q': q, 'api': 'True', 'rt': flag})
|
|
if not req.status_code == 200:
|
|
continue
|
|
results = req.json().get('results')
|
|
if not results:
|
|
continue
|
|
to_call, comment = mapped
|
|
getattr(self, to_call)(results, q, comment)
|
|
|
|
def _get_ip(self, q):
|
|
queries_mapping = {1: ('_add_whois', 'whois'), 2: ('_add_ip', 'pdns'),
|
|
3: ('_add_uri', 'uri'), 4: ('_add_hash', 'samples'),
|
|
5: ('_add_x509', 'ssl'), 6: ('_add_link', 'report')}
|
|
for flag, mapped in queries_mapping.items():
|
|
req = requests.get('https://www.threatminer.org/host.php', params={'q': q, 'api': 'True', 'rt': flag})
|
|
if not req.status_code == 200:
|
|
continue
|
|
results = req.json().get('results')
|
|
if not results:
|
|
continue
|
|
to_call, comment = mapped
|
|
getattr(self, to_call)(results, q, comment)
|
|
|
|
def _add_domain(self, results, q, comment):
|
|
self.results[(q, comment, 'domain')].update({result for result in results if isinstance(result, str)})
|
|
|
|
def _add_filename(self, results, q, comment):
|
|
self.results[(q, comment, 'filename')].update({result['file_name'] for result in results if result.get('file_name')})
|
|
|
|
def _add_hash(self, results, q, comment):
|
|
self.results[(q, comment, 'sha256')].update({result for result in results if isinstance(result, str)})
|
|
|
|
def _add_ip(self, results, q, comment):
|
|
self.results[(q, comment, 'ip-src', 'ip-dst')].update({result['ip'] for result in results if result.get('ip')})
|
|
|
|
def _add_link(self, results, q, comment):
|
|
self.results[(q, comment, 'link')].update({result['URL'] for result in results if result.get('URL')})
|
|
|
|
def _add_network(self, results, q, comment):
|
|
for result in results:
|
|
domains = result.get('domains')
|
|
if domains:
|
|
self.results[(q, comment, 'domain')].update({domain['domain'] for domain in domains if domain.get('domain')})
|
|
hosts = result.get('hosts')
|
|
if hosts:
|
|
self.results[(q, comment, 'ip-src', 'ip-dst')].update({host for host in hosts if isinstance(host, str)})
|
|
|
|
def _add_text(self, results, q, comment):
|
|
for result in results:
|
|
detections = result.get('av_detections')
|
|
if detections:
|
|
self.results[(q, comment, 'text')].update({d['detection'] for d in detections if d.get('detection')})
|
|
|
|
def _add_uri(self, results, q, comment):
|
|
self.results[(q, comment, 'url')].update({result['uri'] for result in results if result.get('uri')})
|
|
|
|
def _add_whois(self, results, q, comment):
|
|
for result in results:
|
|
emails = result.get('whois', {}).get('emails')
|
|
if emails:
|
|
self.results[(q, comment, 'whois-registrant-email')].update({email for em_type, email in emails.items() if em_type == 'registrant' and email})
|
|
|
|
def _add_x509(self, results, q, comment):
|
|
self.results[(q, comment, 'x509-fingerprint-sha1')].update({result for result in results if isinstance(result, str)})
|
|
|
|
|
|
def handler(q=False):
|
|
if q is False:
|
|
return False
|
|
|
|
q = json.loads(q)
|
|
|
|
parser = ThreatMiner()
|
|
parser.parse_query(q)
|
|
return {'results': parser.parsed_results}
|
|
|
|
|
|
def introspection():
|
|
return mispattributes
|
|
|
|
|
|
def version():
|
|
return moduleinfo
|