mirror of https://github.com/MISP/misp-modules
delete
parent
17541e2938
commit
47dde7943b
|
@ -1,128 +0,0 @@
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import requests
|
|
||||||
import urllib.parse
|
|
||||||
from . import check_input_attribute, standard_error_message
|
|
||||||
from pymisp import MISPAttribute, MISPEvent, MISPTag, MISPObject, Distribution
|
|
||||||
|
|
||||||
logger = logging.getLogger('ipqualityscore')
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
misperrors = {'error': 'Error'}
|
|
||||||
mispattributes = {'input': ['hostname', 'domain', 'url', 'uri', 'ip-src', 'ip-dst', 'email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email', 'phone-number','whois-registrant-phone'], 'output': ['text'], 'format': 'misp_standard'}
|
|
||||||
moduleinfo = {'version': '0.1', 'author': 'David Mackler', 'description': 'Query IPQualityScore for IP reputation, Email Validation, Phone Number Validation and Malicious Domain/URL Scanner.',
|
|
||||||
'module-type': ['hover', 'expansion']}
|
|
||||||
moduleconfig = ['apikey']
|
|
||||||
|
|
||||||
BASE_URL = 'https://ipqualityscore.com/api/json'
|
|
||||||
DEFAULT_DISTRIBUTION_SETTING = Distribution.your_organisation_only.value
|
|
||||||
|
|
||||||
IP_API_ATTRIBUTE_TYPES = ['ip-src', 'ip-dst']
|
|
||||||
URL_API_ATTRIBUTE_TYPES = ['hostname', 'domain', 'url', 'uri']
|
|
||||||
EMAIL_API_ATTRIBUTE_TYPES = ['email', 'email-src', 'email-dst', 'target-email', 'whois-registrant-email']
|
|
||||||
PHONE_API_ATTRIBUTE_TYPES = ['phone-number','whois-registrant-phone']
|
|
||||||
|
|
||||||
def _format_result(attribute, result, enrichment_type):
|
|
||||||
|
|
||||||
event = MISPEvent()
|
|
||||||
|
|
||||||
orig_attr = MISPAttribute()
|
|
||||||
orig_attr.from_dict(**attribute)
|
|
||||||
|
|
||||||
event = _make_enriched_attr(event, result, orig_attr)
|
|
||||||
|
|
||||||
return event
|
|
||||||
|
|
||||||
def _make_enriched_attr(event, result, orig_attr):
|
|
||||||
|
|
||||||
enriched_object = MISPObject('IPQualityScore Enrichment')
|
|
||||||
enriched_object.add_reference(orig_attr.uuid, 'related-to')
|
|
||||||
|
|
||||||
enriched_attr = MISPAttribute()
|
|
||||||
enriched_attr.from_dict(**{
|
|
||||||
'value': orig_attr.value,
|
|
||||||
'type': orig_attr.type,
|
|
||||||
'distribution': 0,
|
|
||||||
'object_relation': 'enriched-attr',
|
|
||||||
'to_ids': orig_attr.to_ids
|
|
||||||
})
|
|
||||||
|
|
||||||
# enriched_attr = _make_tags(enriched_attr, result)
|
|
||||||
# enriched_object.add_attribute(**enriched_attr)
|
|
||||||
|
|
||||||
|
|
||||||
fraud_score_attr = MISPAttribute()
|
|
||||||
fraud_score_attr.from_dict(**{
|
|
||||||
'value': result.get('fraud_score'),
|
|
||||||
'type': 'text',
|
|
||||||
'object_relation': 'fraud_score',
|
|
||||||
'distribution': 0
|
|
||||||
})
|
|
||||||
enriched_object.add_attribute(**fraud_score_attr)
|
|
||||||
|
|
||||||
latitude = MISPAttribute()
|
|
||||||
latitude.from_dict(**{
|
|
||||||
'value': result.get('latitude'),
|
|
||||||
'type': 'text',
|
|
||||||
'object_relation': 'latitude',
|
|
||||||
'distribution': 0
|
|
||||||
})
|
|
||||||
enriched_object.add_attribute(**latitude)
|
|
||||||
|
|
||||||
event.add_attribute(**orig_attr)
|
|
||||||
event.add_object(**enriched_object)
|
|
||||||
|
|
||||||
longitude = MISPAttribute()
|
|
||||||
longitude.from_dict(**{
|
|
||||||
'value': result.get('longitude'),
|
|
||||||
'type': 'text',
|
|
||||||
'object_relation': 'longitude',
|
|
||||||
'distribution': 0
|
|
||||||
})
|
|
||||||
enriched_object.add_attribute(**longitude)
|
|
||||||
|
|
||||||
return event
|
|
||||||
|
|
||||||
def handler(q=False):
|
|
||||||
if q is False:
|
|
||||||
return False
|
|
||||||
request = json.loads(q)
|
|
||||||
|
|
||||||
# check if the apikey is pprovided
|
|
||||||
if not request.get('config') or not request['config'].get('apikey'):
|
|
||||||
misperrors['error'] = 'IPQualityScore apikey is missing'
|
|
||||||
return misperrors
|
|
||||||
apikey = request['config'].get('apikey')
|
|
||||||
# check attribute is added to the event
|
|
||||||
if not request.get('attribute') or not check_input_attribute(request['attribute']):
|
|
||||||
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
|
|
||||||
|
|
||||||
input_attribute = request['attribute']
|
|
||||||
input_attribute_type = input_attribute['type']
|
|
||||||
input_attribute_value = attribute['value']
|
|
||||||
# check if the attribute type is supported by IPQualityScore
|
|
||||||
if input_attribute_type not in mispattributes['input']:
|
|
||||||
return {'error': 'Unsupported attributes type for IPqualityScore Enrichment'}
|
|
||||||
|
|
||||||
if input_attribute_type in IP_API_ATTRIBUTE_TYPES:
|
|
||||||
url = f"{BASE_URL}/ip/{input_attribute_value}"
|
|
||||||
headers = {"IPQS-KEY": apikey}
|
|
||||||
response = self.get(url, headers)
|
|
||||||
data = response.data
|
|
||||||
if str(data.get('success')) == "True":
|
|
||||||
event = _format_result(input_attribute, data, "ip")
|
|
||||||
event = json.loads(event.to_json())
|
|
||||||
ret_result = {key: event[key] for key in ('Attribute', 'Object') if key
|
|
||||||
in event}
|
|
||||||
return {'results': ret_result}
|
|
||||||
else:
|
|
||||||
return {'error', str(data.get('message'))
|
|
||||||
|
|
||||||
def introspection():
|
|
||||||
return mispattributes
|
|
||||||
|
|
||||||
|
|
||||||
def version():
|
|
||||||
moduleinfo['config'] = moduleconfig
|
|
||||||
return moduleinfo
|
|
||||||
|
|
Loading…
Reference in New Issue