Add IP2Location.io module

pull/645/head
ip2location 2023-12-07 10:40:04 +08:00
parent 0db0f8c83c
commit 58265dc925
2 changed files with 96 additions and 1 deletions

View File

@ -20,7 +20,7 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c
'trustar_enrich', 'recordedfuture', 'html_to_markdown', 'socialscan', 'passive-ssh', 'trustar_enrich', 'recordedfuture', 'html_to_markdown', 'socialscan', 'passive-ssh',
'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring', 'qintel_qsentry', 'mwdb', 'hashlookup', 'mmdb_lookup', 'ipqs_fraud_and_risk_scoring',
'clamav', 'jinja_template_rendering','hyasinsight', 'variotdbs', 'crowdsec', 'clamav', 'jinja_template_rendering','hyasinsight', 'variotdbs', 'crowdsec',
'extract_url_components', 'ipinfo', 'whoisfreaks'] 'extract_url_components', 'ipinfo', 'whoisfreaks', 'ip2locationio']
minimum_required_fields = ('type', 'uuid', 'value') minimum_required_fields = ('type', 'uuid', 'value')

View File

@ -0,0 +1,95 @@
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
mispattributes = {
'input': ['ip-src', 'ip-dst'],
'format': 'misp_standard'
}
moduleinfo = {
'version': 1,
'author': 'IP2Location.io',
'description': 'An expansion module to query IP2Location.io for additional information on an IP address',
'module-type': ['expansion', 'hover']
}
moduleconfig = ['key']
_GEOLOCATION_OBJECT_MAPPING = {
'country_code': 'country code',
'country_name': 'country name',
'region_name': 'region name',
'city_name': 'city',
'zip_code': 'zipcode',
'latitude': 'latitude',
'longitude': 'longitude'
}
def handler(q=False):
# Input checks
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute.get('type') not in mispattributes['input']:
return {'error': 'Wrong input attribute type.'}
if not request.get('config'):
return {'error': 'Missing ip2locationio config.'}
if not request['config'].get('key'):
return {'error': 'Missing ip2locationio API key.'}
# Query ip2location.io
query = requests.get(
f"https://api.ip2location.io/json?key={request['config']['key']&ip={attribute['value']}"
)
if query.status_code != 200:
return {'error': f'Error while querying ip2location.io - {query.status_code}: {query.reason}'}
iplio_result = query.json()
# Check if the IP address is not reserved for special use
# if ipinfo.get('bogon', False):
if '' in iplio_result and iplio_result[''] == 'RSV':
return {'error': 'The IP address is reserved for special use'}
# Initiate the MISP data structures
misp_event = MISPEvent()
input_attribute = MISPAttribute()
input_attribute.from_dict(**attribute)
misp_event.add_attribute(**input_attribute)
# Parse the geolocation information related to the IP address
geolocation = MISPObject('geolocation')
for field, relation in _GEOLOCATION_OBJECT_MAPPING.items():
geolocation.add_attribute(relation, iplio_result[field])
geolocation.add_reference(input_attribute.uuid, 'locates')
misp_event.add_object(geolocation)
# Parse proxy information
proxy = MISPObject('proxy')
proxy.add_reference(input_attribute.uuid, 'related-to')
if iplio_result.get('proxy') is not None:
proxy_info = iplio_result['proxy']
proxy.add_attribute('proxy_type', proxy_info['proxy_type'])
proxy.add_attribute('threat', proxy_info['threat'])
proxy.add_attribute('provider', proxy_info['provider'])
proxy.add_attribute('last_seen', proxy_info['last_seen'])
misp_event.add_object(proxy)
# Return the results in MISP format
event = json.loads(misp_event.to_json())
return {
'results': {key: event[key] for key in ('Attribute', 'Object')}
}
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo