diff --git a/misp_modules/modules/expansion/ipqualityscore.py b/misp_modules/modules/expansion/ipqualityscore.py deleted file mode 100644 index ebdb395..0000000 --- a/misp_modules/modules/expansion/ipqualityscore.py +++ /dev/null @@ -1,128 +0,0 @@ -import json -import logging -import requests -import urllib.parse -from . import check_input_attribute, standard_error_message -from pymisp import MISPAttribute, MISPEvent, MISPTag, MISPObject, Distribution - -logger = logging.getLogger('ipqualityscore') -logger.setLevel(logging.DEBUG) - -misperrors = {'error': 'Error'} -mispattributes = {'input': ['hostname', 'domain', 'url', 'uri', 'ip-src', 'ip-dst', 'email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email', 'phone-number','whois-registrant-phone'], 'output': ['text'], 'format': 'misp_standard'} -moduleinfo = {'version': '0.1', 'author': 'David Mackler', 'description': 'Query IPQualityScore for IP reputation, Email Validation, Phone Number Validation and Malicious Domain/URL Scanner.', - 'module-type': ['hover', 'expansion']} -moduleconfig = ['apikey'] - -BASE_URL = 'https://ipqualityscore.com/api/json' -DEFAULT_DISTRIBUTION_SETTING = Distribution.your_organisation_only.value - -IP_API_ATTRIBUTE_TYPES = ['ip-src', 'ip-dst'] -URL_API_ATTRIBUTE_TYPES = ['hostname', 'domain', 'url', 'uri'] -EMAIL_API_ATTRIBUTE_TYPES = ['email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email'] -PHONE_API_ATTRIBUTE_TYPES = ['phone-number','whois-registrant-phone'] - -def _format_result(attribute, result, enrichment_type): - - event = MISPEvent() - - orig_attr = MISPAttribute() - orig_attr.from_dict(**attribute) - - event = _make_enriched_attr(event, result, orig_attr) - - return event - -def _make_enriched_attr(event, result, orig_attr): - - enriched_object = MISPObject('IPQualityScore Enrichment') - enriched_object.add_reference(orig_attr.uuid, 'related-to') - - enriched_attr = MISPAttribute() - enriched_attr.from_dict(**{ - 'value': orig_attr.value, - 'type': orig_attr.type, - 'distribution': 0, - 'object_relation': 'enriched-attr', - 'to_ids': orig_attr.to_ids - }) - - # enriched_attr = _make_tags(enriched_attr, result) - # enriched_object.add_attribute(**enriched_attr) - - - fraud_score_attr = MISPAttribute() - fraud_score_attr.from_dict(**{ - 'value': result.get('fraud_score'), - 'type': 'text', - 'object_relation': 'fraud_score', - 'distribution': 0 - }) - enriched_object.add_attribute(**fraud_score_attr) - - latitude = MISPAttribute() - latitude.from_dict(**{ - 'value': result.get('latitude'), - 'type': 'text', - 'object_relation': 'latitude', - 'distribution': 0 - }) - enriched_object.add_attribute(**latitude) - - event.add_attribute(**orig_attr) - event.add_object(**enriched_object) - - longitude = MISPAttribute() - longitude.from_dict(**{ - 'value': result.get('longitude'), - 'type': 'text', - 'object_relation': 'longitude', - 'distribution': 0 - }) - enriched_object.add_attribute(**longitude) - - return event - -def handler(q=False): - if q is False: - return False - request = json.loads(q) - - # check if the apikey is pprovided - if not request.get('config') or not request['config'].get('apikey'): - misperrors['error'] = 'IPQualityScore apikey is missing' - return misperrors - apikey = request['config'].get('apikey') - # check attribute is added to the event - if not request.get('attribute') or not check_input_attribute(request['attribute']): - return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} - - input_attribute = request['attribute'] - input_attribute_type = input_attribute['type'] - input_attribute_value = attribute['value'] - # check if the attribute type is supported by IPQualityScore - if input_attribute_type not in mispattributes['input']: - return {'error': 'Unsupported attributes type for IPqualityScore Enrichment'} - - if input_attribute_type in IP_API_ATTRIBUTE_TYPES: - url = f"{BASE_URL}/ip/{input_attribute_value}" - headers = {"IPQS-KEY": apikey} - response = self.get(url, headers) - data = response.data - if str(data.get('success')) == "True": - event = _format_result(input_attribute, data, "ip") - event = json.loads(event.to_json()) - ret_result = {key: event[key] for key in ('Attribute', 'Object') if key - in event} - return {'results': ret_result} - else: - return {'error', str(data.get('message')) - -def introspection(): - return mispattributes - - -def version(): - moduleinfo['config'] = moduleconfig - return moduleinfo - \ No newline at end of file