diff --git a/misp_modules/modules/expansion/ipqualityscore.py b/misp_modules/modules/expansion/ipqualityscore.py new file mode 100644 index 0000000..ebdb395 --- /dev/null +++ b/misp_modules/modules/expansion/ipqualityscore.py @@ -0,0 +1,128 @@ +import json +import logging +import requests +import urllib.parse +from . import check_input_attribute, standard_error_message +from pymisp import MISPAttribute, MISPEvent, MISPTag, MISPObject, Distribution + +logger = logging.getLogger('ipqualityscore') +logger.setLevel(logging.DEBUG) + +misperrors = {'error': 'Error'} +mispattributes = {'input': ['hostname', 'domain', 'url', 'uri', 'ip-src', 'ip-dst', 'email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email', 'phone-number','whois-registrant-phone'], 'output': ['text'], 'format': 'misp_standard'} +moduleinfo = {'version': '0.1', 'author': 'David Mackler', 'description': 'Query IPQualityScore for IP reputation, Email Validation, Phone Number Validation and Malicious Domain/URL Scanner.', + 'module-type': ['hover', 'expansion']} +moduleconfig = ['apikey'] + +BASE_URL = 'https://ipqualityscore.com/api/json' +DEFAULT_DISTRIBUTION_SETTING = Distribution.your_organisation_only.value + +IP_API_ATTRIBUTE_TYPES = ['ip-src', 'ip-dst'] +URL_API_ATTRIBUTE_TYPES = ['hostname', 'domain', 'url', 'uri'] +EMAIL_API_ATTRIBUTE_TYPES = ['email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email'] +PHONE_API_ATTRIBUTE_TYPES = ['phone-number','whois-registrant-phone'] + +def _format_result(attribute, result, enrichment_type): + + event = MISPEvent() + + orig_attr = MISPAttribute() + orig_attr.from_dict(**attribute) + + event = _make_enriched_attr(event, result, orig_attr) + + return event + +def _make_enriched_attr(event, result, orig_attr): + + enriched_object = MISPObject('IPQualityScore Enrichment') + enriched_object.add_reference(orig_attr.uuid, 'related-to') + + enriched_attr = MISPAttribute() + enriched_attr.from_dict(**{ + 'value': orig_attr.value, + 'type': orig_attr.type, + 'distribution': 0, + 'object_relation': 'enriched-attr', + 'to_ids': orig_attr.to_ids + }) + + # enriched_attr = _make_tags(enriched_attr, result) + # enriched_object.add_attribute(**enriched_attr) + + + fraud_score_attr = MISPAttribute() + fraud_score_attr.from_dict(**{ + 'value': result.get('fraud_score'), + 'type': 'text', + 'object_relation': 'fraud_score', + 'distribution': 0 + }) + enriched_object.add_attribute(**fraud_score_attr) + + latitude = MISPAttribute() + latitude.from_dict(**{ + 'value': result.get('latitude'), + 'type': 'text', + 'object_relation': 'latitude', + 'distribution': 0 + }) + enriched_object.add_attribute(**latitude) + + event.add_attribute(**orig_attr) + event.add_object(**enriched_object) + + longitude = MISPAttribute() + longitude.from_dict(**{ + 'value': result.get('longitude'), + 'type': 'text', + 'object_relation': 'longitude', + 'distribution': 0 + }) + enriched_object.add_attribute(**longitude) + + return event + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + + # check if the apikey is pprovided + if not request.get('config') or not request['config'].get('apikey'): + misperrors['error'] = 'IPQualityScore apikey is missing' + return misperrors + apikey = request['config'].get('apikey') + # check attribute is added to the event + if not request.get('attribute') or not check_input_attribute(request['attribute']): + return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} + + input_attribute = request['attribute'] + input_attribute_type = input_attribute['type'] + input_attribute_value = attribute['value'] + # check if the attribute type is supported by IPQualityScore + if input_attribute_type not in mispattributes['input']: + return {'error': 'Unsupported attributes type for IPqualityScore Enrichment'} + + if input_attribute_type in IP_API_ATTRIBUTE_TYPES: + url = f"{BASE_URL}/ip/{input_attribute_value}" + headers = {"IPQS-KEY": apikey} + response = self.get(url, headers) + data = response.data + if str(data.get('success')) == "True": + event = _format_result(input_attribute, data, "ip") + event = json.loads(event.to_json()) + ret_result = {key: event[key] for key in ('Attribute', 'Object') if key + in event} + return {'results': ret_result} + else: + return {'error', str(data.get('message')) + +def introspection(): + return mispattributes + + +def version(): + moduleinfo['config'] = moduleconfig + return moduleinfo + \ No newline at end of file