misp-modules/misp_modules/modules/expansion/ipgeolocation.py

106 lines
3.0 KiB
Python
Raw Normal View History

2023-06-19 09:52:11 +02:00
import json
import requests
from pymisp import MISPAttribute, MISPEvent, MISPObject
mispattributes = {
2023-06-19 12:11:41 +02:00
'input': ['ip-dst', 'ip-src'],
2023-06-19 09:52:11 +02:00
'format': 'misp_standard'
}
moduleinfo = {
'version': '1', 'author': 'IpGeolocation',
'description': 'Querry Using IpGeolocation.io',
'module-type': ['expansion', 'hover']
}
moduleconfig = ['apiKey']
def handler(q=False):
# Input checks
if q is False:
return False
request = json.loads(q)
if not request.get('config'):
2023-06-19 14:26:19 +02:00
return {'error' : 'IpGeolocation Configuration is missing'}
2023-06-19 09:52:11 +02:00
if not request['config'].get('apiKey'):
2023-06-19 14:26:19 +02:00
return {'error' : 'IpGeolocation apiKey is missing'}
2023-06-19 09:52:11 +02:00
2023-06-19 12:11:41 +02:00
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
2023-06-19 14:26:19 +02:00
ip = request['attribute']['value']
apiKey = request['config']['apiKey']
response = handle_ip(apiKey, ip)
2023-06-19 15:29:59 +02:00
return {'error': 'Going to the handleIP method'}
2023-06-19 14:26:19 +02:00
2023-06-19 09:52:11 +02:00
2023-06-19 14:26:19 +02:00
def handle_ip(apiKey, ip):
2023-06-19 09:52:11 +02:00
try:
results = query_ipgeolocation(apiKey, ip)
except Exception:
2023-06-19 14:26:19 +02:00
return {'error' : 'Error during querying IPGeolocation API.'}
2023-06-19 09:52:11 +02:00
# Check if the IP address is not reserved for special use
if results.get('message'):
if 'bogon' in results['message']:
return {'error': 'The IP address(bogon IP) is reserved for special use'}
else:
2023-06-19 12:11:41 +02:00
return {'error': 'Error Occurred during IP data Extraction from Message'}
2023-06-19 09:52:11 +02:00
# Initiate the MISP data structures
misp_event = MISPEvent()
input_attribute = MISPAttribute()
misp_event.add_attribute(**input_attribute)
# Parse the geolocation information related to the IP address
ipObject = MISPObject('ip-api-address')
mapping = get_mapping()
2023-06-19 13:11:58 +02:00
try:
for field, relation in mapping.items():
ipObject.add_attribute(relation, results[field])
except Exception:
return {'error': 'Error while Adding attributes'}
2023-06-19 09:52:11 +02:00
misp_event.add_object(ipObject)
# Return the results in MISP format
event = json.loads(misp_event.to_json())
return {
'results': {key: event[key] for key in ('Attribute', 'Object')}
}
def query_ipgeolocation(apiKey, ip):
query = requests.get(
f"https://api.ipgeolocation.io/ipgeo?apiKey={apiKey}&ip={ip}"
)
if query.status_code != 200:
return {'error': f'Error while querying ipGeolocation.io - {query.status_code}: {query.reason}'}
return query.json()
def get_mapping():
return {
'isp':'ISP',
'asn':'asn',
'city':'city',
'country_name':'country',
'country_code2':'country-code',
'latitude':'latitude',
'longitude':'longitude',
'organization':'organization',
'continent_name':'region',
'continent_code':'region-code',
'state_prov':'state',
'zipcode':'zipcode',
'ip':'ip-src'
}
def introspection():
return mispattributes
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo