diff --git a/README.md b/README.md index a260d20..a9184b5 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj * [hashdd](misp_modules/modules/expansion/hashdd.py) - a hover module to check file hashes against [hashdd.com](http://www.hashdd.com) including NSLR dataset. * [hibp](misp_modules/modules/expansion/hibp.py) - a hover module to lookup against Have I Been Pwned? * [html_to_markdown](misp_modules/modules/expansion/html_to_markdown.py) - Simple HTML to markdown converter +* [HYAS Insight](misp_modules/modules/expansion/hyasinsight.py) - a hover and expansion module to get information from [HYAS Insight](https://www.hyas.com/hyas-insight). * [intel471](misp_modules/modules/expansion/intel471.py) - an expansion module to get info from [Intel471](https://intel471.com). * [IPASN](misp_modules/modules/expansion/ipasn.py) - a hover and expansion to get the BGP ASN of an IP address. * [iprep](misp_modules/modules/expansion/iprep.py) - an expansion module to get IP reputation from packetmail.net. diff --git a/documentation/logos/hyas.png b/documentation/logos/hyas.png new file mode 100644 index 0000000..42acf22 Binary files /dev/null and b/documentation/logos/hyas.png differ diff --git a/documentation/website/expansion/hyasinsight.json b/documentation/website/expansion/hyasinsight.json new file mode 100644 index 0000000..9647047 --- /dev/null +++ b/documentation/website/expansion/hyasinsight.json @@ -0,0 +1,12 @@ +{ + "description": "HYAS Insight integration to MISP provides direct, high volume access to HYAS Insight data. It enables investigators and analysts to understand and defend against cyber adversaries and their infrastructure.", + "logo": "hyasinsight.png", + "requirements": [ + "A HYAS Insight API Key." + ], + "input": "A MISP attribute of type IP Address(ip-src, ip-dst), Domain(hostname, domain), Email Address(email, email-src, email-dst, target-email, whois-registrant-email), Phone Number(phone-number, whois-registrant-phone), MDS(md5, x509-fingerprint-md5, ja3-fingerprint-md5, hassh-md5, hasshserver-md5), SHA1(sha1, x509-fingerprint-sha1), SHA256(sha256, x509-fingerprint-sha256), SHA512(sha512)", + "output": "Hyas Insight objects, resulting from the query on the HYAS Insight API.", + "references": [ + "https://www.hyas.com/hyas-insight/" + ], + "features": "This Module takes the IP Address, Domain, URL, Email, Phone Number, MD5, SHA1, Sha256, SHA512 MISP Attributes as input to query the HYAS Insight API.\n The results of the HYAS Insight API are than are then returned and parsed into Hyas Insight Objects. \n\nAn API key is required to submit queries to the HYAS Insight API.\n" \ No newline at end of file diff --git a/misp_modules/modules/expansion/__init__.py b/misp_modules/modules/expansion/__init__.py index 8e3b243..2506265 100644 --- a/misp_modules/modules/expansion/__init__.py +++ b/misp_modules/modules/expansion/__init__.py @@ -18,7 +18,7 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c 'assemblyline_submit', 'assemblyline_query', 'ransomcoindb', 'malwarebazaar', 'lastline_query', 'lastline_submit', 'sophoslabs_intelix', 'cytomic_orion', 'censys_enrich', 'trustar_enrich', 'recordedfuture', 'html_to_markdown', 'socialscan', 'passive-ssh', - 'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring', 'clamav', 'jinja_template_rendering'] + 'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring', 'clamav', 'jinja_template_rendering','hyasinsight'] minimum_required_fields = ('type', 'uuid', 'value') diff --git a/misp_modules/modules/expansion/hyasinsight.py b/misp_modules/modules/expansion/hyasinsight.py new file mode 100644 index 0000000..cfd1e20 --- /dev/null +++ b/misp_modules/modules/expansion/hyasinsight.py @@ -0,0 +1,812 @@ +import json +import logging +from typing import Dict, List, Any + +import requests +import re +from requests.exceptions import ( + HTTPError, + ProxyError, + InvalidURL, + ConnectTimeout +) +from . import check_input_attribute, standard_error_message +from pymisp import MISPEvent, MISPObject, Distribution + +ip_query_input_type = [ + 'ip-src', + 'ip-dst' +] +domain_query_input_type = [ + 'hostname', + 'domain' +] +email_query_input_type = [ + 'email', + 'email-src', + 'email-dst', + 'target-email', + 'whois-registrant-email' +] +phone_query_input_type = [ + 'phone-number', + 'whois-registrant-phone' +] + +md5_query_input_type = [ + 'md5', + 'x509-fingerprint-md5', + 'ja3-fingerprint-md5', + 'hassh-md5', + 'hasshserver-md5' +] + +sha1_query_input_type = [ + 'sha1', + 'x509-fingerprint-sha1' +] + +sha256_query_input_type = [ + 'sha256', + 'x509-fingerprint-sha256' +] + +sha512_query_input_type = [ + 'sha512' +] + +misperrors = { + 'error': 'Error' +} +mispattributes = { + 'input': ip_query_input_type + domain_query_input_type + email_query_input_type + phone_query_input_type + + md5_query_input_type + sha1_query_input_type + sha256_query_input_type + sha512_query_input_type, + 'format': 'misp_standard' +} + +moduleinfo = { + 'version': '0.1', + 'author': 'Mike Champ', + 'description': '', + 'module-type': ['expansion', 'hover'] +} +moduleconfig = ['apikey'] +TIMEOUT = 60 +logger = logging.getLogger('hyasinsight') +logger.setLevel(logging.DEBUG) +HYAS_API_BASE_URL = 'https://insight.hyas.com/api/ext/' +WHOIS_CURRENT_BASE_URL = 'https://api.hyas.com/' +DEFAULT_DISTRIBUTION_SETTING = Distribution.your_organisation_only.value +IPV4_REGEX = r'\b((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b([^\/]|$)' +IPV6_REGEX = r'\b(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:(?:(:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\b' # noqa: E501 +# Enrichment Types +# HYAS API endpoints +PASSIVE_DNS_ENDPOINT = 'passivedns' +DYNAMIC_DNS_ENDPOINT = 'dynamicdns' +PASSIVE_HASH_ENDPOINT = 'passivehash' +SINKHOLE_ENDPOINT = 'sinkhole' +SSL_CERTIFICATE_ENDPOINT = 'ssl_certificate' +DEVICE_GEO_ENDPOINT = 'device_geo' +WHOIS_HISTORIC_ENDPOINT = 'whois' +WHOIS_CURRENT_ENDPOINT = 'whois/v1' +MALWARE_RECORDS_ENDPOINT = 'sample' +MALWARE_INFORMATION_ENDPOINT = 'sample/information' +C2ATTRIBUTION_ENDPOINT = 'c2attribution' +OPEN_SOURCE_INDICATORS_ENDPOINT = 'os_indicators' + +# HYAS API endpoint params +DOMAIN_PARAM = 'domain' +IP_PARAM = 'ip' +IPV4_PARAM = 'ipv4' +IPV6_PARAM = 'ipv6' +EMAIL_PARAM = 'email' +PHONE_PARAM = 'phone' +MD5_PARAM = 'md5' +SHA256_PARAM = 'sha256' +SHA512_PARAM = 'sha512' +HASH_PARAM = 'hash' +SHA1_PARAM = 'sha1' + +HYAS_IP_ENRICHMENT_ENDPOINTS_LIST = [DYNAMIC_DNS_ENDPOINT, PASSIVE_HASH_ENDPOINT, SINKHOLE_ENDPOINT, + SSL_CERTIFICATE_ENDPOINT, DEVICE_GEO_ENDPOINT, C2ATTRIBUTION_ENDPOINT] +HYAS_DOMAIN_ENRICHMENT_ENDPOINTS_LIST = [PASSIVE_DNS_ENDPOINT, WHOIS_HISTORIC_ENDPOINT, WHOIS_CURRENT_ENDPOINT, + C2ATTRIBUTION_ENDPOINT] +HYAS_EMAIL_ENRICHMENT_ENDPOINTS_LIST = [DYNAMIC_DNS_ENDPOINT, WHOIS_HISTORIC_ENDPOINT, C2ATTRIBUTION_ENDPOINT] +HYAS_PHONE_ENRICHMENT_ENDPOINTS_LIST = [WHOIS_HISTORIC_ENDPOINT] +HYAS_SHA1_ENRICHMENT_ENDPOINTS_LIST = [SSL_CERTIFICATE_ENDPOINT, MALWARE_INFORMATION_ENDPOINT, + OPEN_SOURCE_INDICATORS_ENDPOINT] +HYAS_SHA256_ENRICHMENT_ENDPOINTS_LIST = [C2ATTRIBUTION_ENDPOINT, MALWARE_INFORMATION_ENDPOINT, + OPEN_SOURCE_INDICATORS_ENDPOINT] +HYAS_SHA512_ENRICHMENT_ENDPOINTS_LIST = [MALWARE_INFORMATION_ENDPOINT] +HYAS_MD5_ENRICHMENT_ENDPOINTS_LIST = [MALWARE_RECORDS_ENDPOINT, MALWARE_INFORMATION_ENDPOINT, + OPEN_SOURCE_INDICATORS_ENDPOINT] + +HYAS_OBJECT_NAMES = { + DYNAMIC_DNS_ENDPOINT: "Dynamic DNS Information", + PASSIVE_HASH_ENDPOINT: "Passive Hash Information", + SINKHOLE_ENDPOINT: "Sinkhole Information", + SSL_CERTIFICATE_ENDPOINT: "SSL Certificate Information", + DEVICE_GEO_ENDPOINT: "Mobile Geolocation Information", + C2ATTRIBUTION_ENDPOINT: "C2 Attribution Information", + PASSIVE_DNS_ENDPOINT: "Passive DNS Information", + WHOIS_HISTORIC_ENDPOINT: "Whois Related Information", + WHOIS_CURRENT_ENDPOINT: "Whois Current Related Information", + MALWARE_INFORMATION_ENDPOINT: "Malware Sample Information", + OPEN_SOURCE_INDICATORS_ENDPOINT: "Open Source Intel for malware, ssl certificates and other indicators Information", + MALWARE_RECORDS_ENDPOINT: "Malware Sample Records Information" +} + + +def parse_attribute(comment, feature, value): + """Generic Method for parsing the attributes in the object""" + attribute = { + 'type': 'text', + 'value': value, + 'comment': comment, + 'distribution': DEFAULT_DISTRIBUTION_SETTING, + 'object_relation': feature + } + return attribute + + +def misp_object(endpoint, attribute_value): + object_name = HYAS_OBJECT_NAMES[endpoint] + hyas_object = MISPObject(object_name) + hyas_object.distribution = DEFAULT_DISTRIBUTION_SETTING + hyas_object.template_uuid = "d69d3d15-7b4d-49b1-9e0a-bb29f3d421d9" + hyas_object.template_id = "1" + hyas_object.description = "HYAS INSIGHT " + object_name + hyas_object.comment = "HYAS INSIGHT " + object_name + " for " + attribute_value + setattr(hyas_object, 'meta-category', 'network') + description = ( + "An object containing the enriched attribute and " + "related entities from HYAS Insight." + ) + hyas_object.from_dict( + **{"meta-category": "misc", "description": description, + "distribution": DEFAULT_DISTRIBUTION_SETTING} + ) + return hyas_object + + +def flatten_json(y: Dict) -> Dict[str, Any]: + """ + :param y: raw_response from HYAS api + :return: Flatten json response + """ + out = {} + + def flatten(x, name=''): + # If the Nested key-value + # pair is of dict type + if type(x) is dict: + for a in x: + flatten(x[a], name + a + '_') + else: + out[name[:-1]] = x + + flatten(y) + return out + + +def get_flatten_json_response(raw_api_response: List[Dict]) -> List[Dict]: + """ + :param raw_api_response: raw_api response from the API + :return: Flatten Json response + """ + flatten_json_response = [] + if raw_api_response: + for obj in raw_api_response: + flatten_json_response.append(flatten_json(obj)) + + return flatten_json_response + + +def request_body(query_input, query_param, current): + """ + This Method returns the request body for specific endpoint. + """ + + if current: + return { + "applied_filters": { + query_input: query_param, + "current": True + } + } + else: + return { + "applied_filters": { + query_input: query_param + } + } + + +class RequestHandler: + """A class for handling any outbound requests from this module.""" + + def __init__(self, apikey): + self.session = requests.Session() + self.api_key = apikey + + def get(self, url: str, headers: dict = None, req_body=None) -> requests.Response: + """General post method to fetch the response from HYAS Insight.""" + response = [] + try: + response = self.session.post( + url, headers=headers, json=req_body + ) + if response: + response = response.json() + except (ConnectTimeout, ProxyError, InvalidURL) as error: + msg = "Error connecting with the HYAS Insight." + logger.error(f"{msg} Error: {error}") + misperrors["error"] = msg + return response + + def hyas_lookup(self, end_point: str, query_input, query_param, current=False) -> requests.Response: + """Do a lookup call.""" + # Building the request + if current: + url = f'{WHOIS_CURRENT_BASE_URL}{WHOIS_CURRENT_ENDPOINT}' + else: + url = f'{HYAS_API_BASE_URL}{end_point}' + headers = { + 'Content-type': 'application/json', + 'X-API-Key': self.api_key, + } + req_body = request_body(query_input, query_param, current) + try: + response = self.get(url, headers, req_body) + except HTTPError as error: + msg = f"Error when requesting data from HYAS Insight. {error.response}: {error.response.reason}" + logger.error(msg) + misperrors["error"] = msg + raise + return response + + +class HyasInsightParser: + """A class for handling the enrichment objects""" + + def __init__(self, attribute): + self.attribute = attribute + self.misp_event = MISPEvent() + self.misp_event.add_attribute(**attribute) + + self.c2_attribution_data_items = [ + 'actor_ipv4', + 'c2_domain', + 'c2_ipv4', + 'c2_url', + 'datetime', + 'email', + 'email_domain', + 'referrer_domain', + 'referrer_ipv4', + 'referrer_url', + 'sha256' + ] + self.c2_attribution_data_items_friendly_names = { + 'actor_ipv4': 'Actor IPv4', + 'c2_domain': 'C2 Domain', + 'c2_ipv4': 'C2 IPv4', + 'c2_url': 'C2 URL', + 'datetime': 'DateTime', + 'email': 'Email', + 'email_domain': 'Email Domain', + 'referrer_domain': 'Referrer Domain', + 'referrer_ipv4': 'Referrer IPv4', + 'referrer_url': 'Referrer URL', + 'sha256': 'SHA256' + } + + self.device_geo_data_items = [ + 'datetime', + 'device_user_agent', + 'geo_country_alpha_2', + 'geo_horizontal_accuracy', + 'ipv4', + 'ipv6', + 'latitude', + 'longitude', + 'wifi_bssid' + ] + + self.device_geo_data_items_friendly_names = { + 'datetime': 'DateTime', + 'device_user_agent': 'Device User Agent', + 'geo_country_alpha_2': 'Alpha-2 Code', + 'geo_horizontal_accuracy': 'GPS Horizontal Accuracy', + 'ipv4': 'IPv4 Address', + 'ipv6': 'IPv6 Address', + 'latitude': 'Latitude', + 'longitude': 'Longitude', + 'wifi_bssid': 'WIFI BSSID' + } + + self.dynamic_dns_data_items = [ + 'a_record', + 'account', + 'created', + 'created_ip', + 'domain', + 'domain_creator_ip', + 'email', + ] + + self.dynamic_dns_data_items_friendly_names = { + 'a_record': 'A Record', + 'account': 'Account Holder', + 'created': 'Created Date', + 'created_ip': 'Account Holder IP Address', + 'domain': 'Domain', + 'domain_creator_ip': 'Domain Creator IP Address', + 'email': 'Email Address', + } + + self.os_indicators_data_items = [ + 'context', + 'datetime', + 'domain', + 'domain_2tld', + 'first_seen', + 'ipv4', + 'ipv6', + 'last_seen', + 'md5', + 'sha1', + 'sha256', + 'source_name', + 'source_url', + 'url' + ] + + self.os_indicators_data_items_friendly_names = { + 'context': 'Context', + 'datetime': 'DateTime', + 'domain': 'Domain', + 'domain_2tld': 'Domain 2TLD', + 'first_seen': 'First Seen', + 'ipv4': 'IPv4 Address', + 'ipv6': 'IPv6 Address', + 'last_seen': 'Last Seen', + 'md5': 'MD5', + 'sha1': 'SHA1', + 'sha256': 'SHA256', + 'source_name': 'Source Name', + 'source_url': 'Source URL', + 'url': 'URL' + } + + self.passive_dns_data_items = [ + 'cert_name', + 'count', + 'domain', + 'first_seen', + 'ip_geo_city_name', + 'ip_geo_country_iso_code', + 'ip_geo_country_name', + 'ip_geo_location_latitude', + 'ip_geo_location_longitude', + 'ip_geo_postal_code', + 'ip_ip', + 'ip_isp_autonomous_system_number', + 'ip_isp_autonomous_system_organization', + 'ip_isp_ip_address', + 'ip_isp_isp', + 'ip_isp_organization', + 'ipv4', + 'ipv6', + 'last_seen' + ] + + self.passive_dns_data_items_friendly_names = { + 'cert_name': 'Certificate Provider Name', + 'count': 'Passive DNS Count', + 'domain': 'Domain', + 'first_seen': 'First Seen', + 'ip_geo_city_name': 'IP Organization City', + 'ip_geo_country_iso_code': 'IP Organization Country ISO Code', + 'ip_geo_country_name': 'IP Organization Country Name', + 'ip_geo_location_latitude': 'IP Organization Latitude', + 'ip_geo_location_longitude': 'IP Organization Longitude', + 'ip_geo_postal_code': 'IP Organization Postal Code', + 'ip_ip': 'IP Address', + 'ip_isp_autonomous_system_number': 'ASN IP', + 'ip_isp_autonomous_system_organization': 'ASO IP', + 'ip_isp_ip_address': 'IP Address', + 'ip_isp_isp': 'ISP', + 'ip_isp_organization': 'ISP Organization', + 'ipv4': 'IPv4 Address', + 'ipv6': 'IPv6 Address', + 'last_seen': 'Last Seen' + } + + self.passive_hash_data_items = [ + 'domain', + 'md5_count' + ] + + self.passive_hash_data_items_friendly_names = { + 'domain': 'Domain', + 'md5_count': 'Passive DNS Count' + } + + self.malware_records_data_items = [ + 'datetime', + 'domain', + 'ipv4', + 'ipv6', + 'md5', + 'sha1', + 'sha256' + ] + + self.malware_records_data_items_friendly_names = { + 'datetime': 'DateTime', + 'domain': 'Domain', + 'ipv4': 'IPv4 Address', + 'ipv6': 'IPv6 Address', + 'md5': 'MD5', + 'sha1': 'SHA1', + 'sha256': 'SHA256' + } + + self.malware_information_data_items = [ + 'avscan_score', + 'md5', + 'av_name', + 'def_time', + 'threat_found', + 'scan_time', + 'sha1', + 'sha256', + 'sha512' + ] + + self.malware_information_data_items_friendly_names = { + 'avscan_score': 'AV Scan Score', + 'md5': 'MD5', + 'av_name': 'AV Name', + 'def_time': 'AV DateTime', + 'threat_found': 'Source', + 'scan_time': 'Scan DateTime', + 'sha1': 'SHA1', + 'sha256': 'SHA256', + 'sha512': 'SHA512' + } + + self.sinkhole_data_items = [ + 'count', + 'country_name', + 'data_port', + 'datetime', + 'ipv4', + 'last_seen', + 'organization_name', + 'sink_source' + ] + + self.sinkhole_data_items_friendly_names = { + 'count': 'Sinkhole Count', + 'country_name': 'IP Address Country', + 'data_port': 'Data Port', + 'datetime': 'First Seen', + 'ipv4': 'IP Address', + 'last_seen': 'Last Seen', + 'organization_name': 'ISP Organization', + 'sink_source': 'Sink Source IP' + } + + self.ssl_certificate_data_items = [ + 'ip', + 'ssl_cert_cert_key', + 'ssl_cert_expire_date', + 'ssl_cert_issue_date', + 'ssl_cert_issuer_commonName', + 'ssl_cert_issuer_countryName', + 'ssl_cert_issuer_localityName', + 'ssl_cert_issuer_organizationName', + 'ssl_cert_issuer_organizationalUnitName', + 'ssl_cert_issuer_stateOrProvinceName', + 'ssl_cert_md5', + 'ssl_cert_serial_number', + 'ssl_cert_sha1', + 'ssl_cert_sha_256', + 'ssl_cert_sig_algo', + 'ssl_cert_ssl_version', + 'ssl_cert_subject_commonName', + 'ssl_cert_subject_countryName', + 'ssl_cert_subject_localityName', + 'ssl_cert_subject_organizationName', + 'ssl_cert_subject_organizationalUnitName', + 'ssl_cert_timestamp' + ] + + self.ssl_certificate_data_items_friendly_names = { + 'ip': 'IP Address', + 'ssl_cert_cert_key': 'Certificate Key', + 'ssl_cert_expire_date': 'Certificate Expiration Date', + 'ssl_cert_issue_date': 'Certificate Issue Date', + 'ssl_cert_issuer_commonName': 'Issuer Common Name', + 'ssl_cert_issuer_countryName': 'Issuer Country Name', + 'ssl_cert_issuer_localityName': 'Issuer City Name', + 'ssl_cert_issuer_organizationName': 'Issuer Organization Name', + 'ssl_cert_issuer_organizationalUnitName': 'Issuer Organization Unit Name', + 'ssl_cert_issuer_stateOrProvinceName': 'Issuer State or Province Name', + 'ssl_cert_md5': 'Certificate MD5', + 'ssl_cert_serial_number': 'Certificate Serial Number', + 'ssl_cert_sha1': 'Certificate SHA1', + 'ssl_cert_sha_256': 'Certificate SHA256', + 'ssl_cert_sig_algo': 'Certificate Signature Algorith', + 'ssl_cert_ssl_version': 'SSL Version', + 'ssl_cert_subject_commonName': 'Reciever Subject Name', + 'ssl_cert_subject_countryName': 'Receiver Country Name', + 'ssl_cert_subject_localityName': 'Receiver City Name', + 'ssl_cert_subject_organizationName': 'Receiver Organization Name', + 'ssl_cert_subject_organizationalUnitName': 'Receiver Organization Unit Name', + 'ssl_cert_timestamp': 'Certificate DateTime' + } + + self.whois_historic_data_items = [ + 'address', + 'city', + 'country', + 'domain', + 'domain_2tld', + 'domain_created_datetime', + 'domain_expires_datetime', + 'domain_updated_datetime', + 'email', + 'idn_name', + 'nameserver', + 'phone', + 'privacy_punch', + 'registrar' + ] + + self.whois_historic_data_items_friendly_names = { + 'address': 'Address', + 'city': 'City', + 'country': 'Country', + 'domain': 'Domain', + 'domain_2tld': 'Domain 2tld', + 'domain_created_datetime': 'Domain Created Time', + 'domain_expires_datetime': 'Domain Expires Time', + 'domain_updated_datetime': 'Domain Updated Time', + 'email': 'Email Address', + 'idn_name': 'IDN Name', + 'nameserver': 'Nameserver', + 'phone': 'Phone Info', + 'privacy_punch': 'Privacy Punch', + 'registrar': 'Registrar' + } + + self.whois_current_data_items = [ + 'abuse_emails', + 'address', + 'city', + 'country', + 'domain', + 'domain_2tld', + 'domain_created_datetime', + 'domain_expires_datetime', + 'domain_updated_datetime', + 'email', + 'idn_name', + 'nameserver', + 'organization', + 'phone', + 'registrar', + 'state' + ] + + self.whois_current_data_items_friendly_names = { + 'abuse_emails': 'Abuse Emails', + 'address': 'Address', + 'city': 'City', + 'country': 'Country', + 'domain': 'Domain', + 'domain_2tld': 'Domain 2tld', + 'domain_created_datetime': 'Domain Created Time', + 'domain_expires_datetime': 'Domain Expires Time', + 'domain_updated_datetime': 'Domain Updated Time', + 'email': 'Email Address', + 'idn_name': 'IDN Name', + 'nameserver': 'Nameserver', + 'organization': 'Organization', + 'phone': 'Phone Info', + 'registrar': 'Registrar', + 'state': 'State' + } + + def create_misp_attributes_and_objects(self, response, endpoint, attribute_value): + flatten_json_response = get_flatten_json_response(response) + data_items: List[str] = [] + data_items_friendly_names: Dict[str, str] = {} + if endpoint == DEVICE_GEO_ENDPOINT: + data_items: List[str] = self.device_geo_data_items + data_items_friendly_names: Dict[str, str] = self.device_geo_data_items_friendly_names + elif endpoint == DYNAMIC_DNS_ENDPOINT: + data_items: List[str] = self.dynamic_dns_data_items + data_items_friendly_names: Dict[str, str] = self.dynamic_dns_data_items_friendly_names + elif endpoint == PASSIVE_DNS_ENDPOINT: + data_items: List[str] = self.passive_dns_data_items + data_items_friendly_names: Dict[str, str] = self.passive_dns_data_items_friendly_names + elif endpoint == PASSIVE_HASH_ENDPOINT: + data_items: List[str] = self.passive_hash_data_items + data_items_friendly_names: Dict[str, str] = self.passive_hash_data_items_friendly_names + elif endpoint == SINKHOLE_ENDPOINT: + data_items: List[str] = self.sinkhole_data_items + data_items_friendly_names: Dict[str, str] = self.sinkhole_data_items_friendly_names + elif endpoint == WHOIS_HISTORIC_ENDPOINT: + data_items = self.whois_historic_data_items + data_items_friendly_names = self.whois_historic_data_items_friendly_names + elif endpoint == WHOIS_CURRENT_ENDPOINT: + data_items: List[str] = self.whois_current_data_items + data_items_friendly_names: Dict[str, str] = self.whois_current_data_items_friendly_names + elif endpoint == SSL_CERTIFICATE_ENDPOINT: + data_items: List[str] = self.ssl_certificate_data_items + data_items_friendly_names: Dict[str, str] = self.ssl_certificate_data_items_friendly_names + elif endpoint == MALWARE_INFORMATION_ENDPOINT: + data_items: List[str] = self.malware_information_data_items + data_items_friendly_names = self.malware_information_data_items_friendly_names + elif endpoint == MALWARE_RECORDS_ENDPOINT: + data_items: List[str] = self.malware_records_data_items + data_items_friendly_names = self.malware_records_data_items_friendly_names + elif endpoint == OPEN_SOURCE_INDICATORS_ENDPOINT: + data_items: List[str] = self.os_indicators_data_items + data_items_friendly_names = self.os_indicators_data_items_friendly_names + elif endpoint == C2ATTRIBUTION_ENDPOINT: + data_items: List[str] = self.c2_attribution_data_items + data_items_friendly_names = self.c2_attribution_data_items_friendly_names + for result in flatten_json_response: + hyas_object = misp_object(endpoint, attribute_value) + for data_item in result.keys(): + if data_item in data_items: + data_item_text = data_items_friendly_names[data_item] + data_item_value = str(result[data_item]) + hyas_object.add_attribute( + **parse_attribute(hyas_object.comment, data_item_text, data_item_value)) + hyas_object.add_reference(self.attribute['uuid'], 'related-to') + self.misp_event.add_object(hyas_object) + + def get_results(self): + """returns the dictionary object to MISP Instance""" + event = json.loads(self.misp_event.to_json()) + results = {key: event[key] for key in ('Attribute', 'Object')} + return {'results': results} + + +def handler(q=False): + """The function which accepts a JSON document to expand the values and return a dictionary of the expanded + values. """ + if q is False: + return False + request = json.loads(q) + # check if the apikey is provided + if not request.get('config') or not request['config'].get('apikey'): + misperrors['error'] = 'HYAS Insight apikey is missing' + return misperrors + apikey = request['config'].get('apikey') + # check attribute is added to the event + if not request.get('attribute') or not check_input_attribute(request['attribute']): + return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} + + attribute = request['attribute'] + attribute_type = attribute['type'] + attribute_value = attribute['value'] + + # check if the attribute type is supported by IPQualityScore + if attribute_type not in mispattributes['input']: + return {'error': 'Unsupported attributes type for HYAS Insight Enrichment'} + request_handler = RequestHandler(apikey) + parser = HyasInsightParser(attribute) + has_results = False + if attribute_type in ip_query_input_type: + ip_param = '' + for endpoint in HYAS_IP_ENRICHMENT_ENDPOINTS_LIST: + if endpoint == DEVICE_GEO_ENDPOINT: + if re.match(IPV4_REGEX, attribute_value): + ip_param = IPV4_PARAM + elif re.match(IPV6_REGEX, attribute_value): + ip_param = IPV6_PARAM + elif endpoint == PASSIVE_HASH_ENDPOINT: + ip_param = IPV4_PARAM + elif endpoint == SINKHOLE_ENDPOINT: + ip_param = IPV4_PARAM + else: + ip_param = IP_PARAM + enrich_response = request_handler.hyas_lookup(endpoint, ip_param, attribute_value) + if endpoint == SSL_CERTIFICATE_ENDPOINT: + enrich_response = enrich_response.get('ssl_certs') + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in domain_query_input_type: + for endpoint in HYAS_DOMAIN_ENRICHMENT_ENDPOINTS_LIST: + if not endpoint == WHOIS_CURRENT_ENDPOINT: + enrich_response = request_handler.hyas_lookup(endpoint, DOMAIN_PARAM, attribute_value) + else: + enrich_response = request_handler.hyas_lookup(endpoint, DOMAIN_PARAM, attribute_value, + endpoint == WHOIS_CURRENT_ENDPOINT) + enrich_response = enrich_response.get('items') + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in email_query_input_type: + for endpoint in HYAS_EMAIL_ENRICHMENT_ENDPOINTS_LIST: + enrich_response = request_handler.hyas_lookup(endpoint, EMAIL_PARAM, attribute_value) + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in phone_query_input_type: + for endpoint in HYAS_PHONE_ENRICHMENT_ENDPOINTS_LIST: + enrich_response = request_handler.hyas_lookup(endpoint, PHONE_PARAM, attribute_value) + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in md5_query_input_type: + for endpoint in HYAS_MD5_ENRICHMENT_ENDPOINTS_LIST: + if endpoint == MALWARE_INFORMATION_ENDPOINT: + md5_param = HASH_PARAM + else: + md5_param = MD5_PARAM + enrich_response = request_handler.hyas_lookup(endpoint, md5_param, attribute_value) + if endpoint == MALWARE_INFORMATION_ENDPOINT: + if not enrich_response.get("Message"): + enrich_response = enrich_response.get("scan_results") + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in sha1_query_input_type: + for endpoint in HYAS_SHA1_ENRICHMENT_ENDPOINTS_LIST: + enrich_response = request_handler.hyas_lookup(endpoint, SHA1_PARAM, attribute_value) + if endpoint == MALWARE_INFORMATION_ENDPOINT: + if not enrich_response.get("Message"): + enrich_response = enrich_response.get("scan_results") + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in sha256_query_input_type: + for endpoint in HYAS_SHA256_ENRICHMENT_ENDPOINTS_LIST: + if endpoint == MALWARE_INFORMATION_ENDPOINT: + sha256_param = HASH_PARAM + else: + sha256_param = SHA256_PARAM + enrich_response = request_handler.hyas_lookup(endpoint, sha256_param, attribute_value) + if endpoint == MALWARE_INFORMATION_ENDPOINT: + if not enrich_response.get("Message"): + enrich_response = enrich_response.get("scan_results") + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + elif attribute_type in sha512_query_input_type: + for endpoint in HYAS_SHA512_ENRICHMENT_ENDPOINTS_LIST: + if endpoint == MALWARE_INFORMATION_ENDPOINT: + sha512_param = HASH_PARAM + enrich_response = request_handler.hyas_lookup(endpoint, sha512_param, attribute_value) + if endpoint == MALWARE_INFORMATION_ENDPOINT: + if not enrich_response.get("Message"): + enrich_response = enrich_response.get("scan_results") + if enrich_response: + has_results = True + parser.create_misp_attributes_and_objects(enrich_response, endpoint, attribute_value) + + if has_results: + return parser.get_results() + else: + return {'error': 'No records found in HYAS Insight for the provided attribute.'} + + +def introspection(): + """The function that returns a dict of the supported attributes (input and output) by your expansion module.""" + return mispattributes + + +def version(): + """The function that returns a dict with the version and the associated meta-data including potential + configurations required of the module. """ + moduleinfo['config'] = moduleconfig + return moduleinfo diff --git a/tests/test_expansions.py b/tests/test_expansions.py index 9f11f8c..0a7bcf7 100644 --- a/tests/test_expansions.py +++ b/tests/test_expansions.py @@ -263,6 +263,21 @@ class TestExpansions(unittest.TestCase): self.assertEqual(to_check, 'OK (Not Found)', response) else: self.assertEqual(self.get_errors(response), 'Have I Been Pwned authentication is incomplete (no API key)') + + def test_hyasinsight(self): + module_name = "hyasinsight" + query = {"module": module_name, + "attribute": {"type": "phone-number", + "value": "+84853620279", + "uuid": "b698dc2b-94c1-487d-8b65-3114bad5a40c"}, + "config": {}} + if module_name in self.configs: + query['config'] = self.configs[module_name] + response = self.misp_modules_post(query) + self.assertEqual(self.get_values(response)['domain'], 'tienichphongnet.com') + else: + response = self.misp_modules_post(query) + self.assertEqual(self.get_errors(response), 'HYAS Insight apikey is missing') def test_greynoise(self): module_name = 'greynoise' @@ -308,6 +323,7 @@ class TestExpansions(unittest.TestCase): response = self.misp_modules_post(query) self.assertEqual(self.get_errors(response), 'IPQualityScore apikey is missing') + def test_macaddess_io(self): module_name = 'macaddress_io' query = {"module": module_name, "mac-address": "44:38:39:ff:ef:57"}