mirror of https://github.com/MISP/misp-modules
Extend MMDB with max_country_qt
When querying MMDB there are sometimes multiple country_info objects returned, mostly due to the different db_source. Sometimes customers are not interested in the db_source, and only the geo-info. This change adds max_country_qt. When - Set to None or 0, has no effect - Set to a value higher than 0, the number of country_info entries is limited to max_country_qtpull/697/head
parent
6a3557bae7
commit
4c6a215802
|
@ -5,30 +5,22 @@ from pymisp import MISPEvent, MISPObject
|
||||||
|
|
||||||
misperrors = {'error': 'Error'}
|
misperrors = {'error': 'Error'}
|
||||||
mispattributes = {'input': ['ip-src', 'ip-src|port', 'ip-dst', 'ip-dst|port'], 'format': 'misp_standard'}
|
mispattributes = {'input': ['ip-src', 'ip-src|port', 'ip-dst', 'ip-dst|port'], 'format': 'misp_standard'}
|
||||||
moduleinfo = {
|
moduleinfo = {'version': '1', 'author': 'Jeroen Pinoy',
|
||||||
'version': '1',
|
'description': "An expansion module to enrich an ip with geolocation and asn information from an mmdb server "
|
||||||
'author': 'Jeroen Pinoy',
|
"such as ip.circl.lu.",
|
||||||
'description': "A hover and expansion module to enrich an ip with geolocation and ASN information from an mmdb server instance, such as CIRCL's ip.circl.lu.",
|
'module-type': ['expansion', 'hover']}
|
||||||
'module-type': ['expansion', 'hover'],
|
moduleconfig = ["custom_API", "db_source_filter", "max_country_qt"]
|
||||||
'name': 'GeoIP Enrichment',
|
|
||||||
'logo': 'circl.png',
|
|
||||||
'requirements': [],
|
|
||||||
'features': 'The module takes an IP address related attribute as input.\n It queries the public CIRCL.lu mmdb-server instance, available at ip.circl.lu, by default. The module can be configured with a custom mmdb server url if required.\n It is also possible to filter results on 1 db_source by configuring db_source_filter.',
|
|
||||||
'references': ['https://data.public.lu/fr/datasets/geo-open-ip-address-geolocation-per-country-in-mmdb-format/', 'https://github.com/adulau/mmdb-server'],
|
|
||||||
'input': 'An IP address attribute (for example ip-src or ip-src|port).',
|
|
||||||
'output': 'Geolocation and asn objects.',
|
|
||||||
}
|
|
||||||
moduleconfig = ["custom_API", "db_source_filter"]
|
|
||||||
mmdblookup_url = 'https://ip.circl.lu/'
|
mmdblookup_url = 'https://ip.circl.lu/'
|
||||||
|
|
||||||
|
|
||||||
class MmdbLookupParser():
|
class MmdbLookupParser():
|
||||||
def __init__(self, attribute, mmdblookupresult, api_url):
|
def __init__(self, attribute, mmdblookupresult, api_url, max_country_qt=0):
|
||||||
self.attribute = attribute
|
self.attribute = attribute
|
||||||
self.mmdblookupresult = mmdblookupresult
|
self.mmdblookupresult = mmdblookupresult
|
||||||
self.api_url = api_url
|
self.api_url = api_url
|
||||||
self.misp_event = MISPEvent()
|
self.misp_event = MISPEvent()
|
||||||
self.misp_event.add_attribute(**attribute)
|
self.misp_event.add_attribute(**attribute)
|
||||||
|
self.max_country_qt = int(max_country_qt)
|
||||||
|
|
||||||
def get_result(self):
|
def get_result(self):
|
||||||
event = json.loads(self.misp_event.to_json())
|
event = json.loads(self.misp_event.to_json())
|
||||||
|
@ -37,26 +29,29 @@ class MmdbLookupParser():
|
||||||
|
|
||||||
def parse_mmdblookup_information(self):
|
def parse_mmdblookup_information(self):
|
||||||
# There is a chance some db's have a hit while others don't so we have to check if entry is empty each time
|
# There is a chance some db's have a hit while others don't so we have to check if entry is empty each time
|
||||||
|
country_info_qt = 0
|
||||||
for result_entry in self.mmdblookupresult:
|
for result_entry in self.mmdblookupresult:
|
||||||
if result_entry['country_info']:
|
if result_entry['country_info']:
|
||||||
mmdblookup_object = MISPObject('geolocation')
|
if (self.max_country_qt == 0) or (self.max_country_qt > 0 and country_info_qt < self.max_country_qt):
|
||||||
mmdblookup_object.add_attribute('country',
|
mmdblookup_object = MISPObject('geolocation')
|
||||||
**{'type': 'text', 'value': result_entry['country_info']['Country']})
|
mmdblookup_object.add_attribute('country',
|
||||||
mmdblookup_object.add_attribute('countrycode',
|
**{'type': 'text', 'value': result_entry['country_info']['Country']})
|
||||||
**{'type': 'text', 'value': result_entry['country']['iso_code']})
|
mmdblookup_object.add_attribute('countrycode',
|
||||||
mmdblookup_object.add_attribute('latitude',
|
**{'type': 'text', 'value': result_entry['country']['iso_code']})
|
||||||
**{'type': 'float',
|
mmdblookup_object.add_attribute('latitude',
|
||||||
'value': result_entry['country_info']['Latitude (average)']})
|
**{'type': 'float',
|
||||||
mmdblookup_object.add_attribute('longitude',
|
'value': result_entry['country_info']['Latitude (average)']})
|
||||||
**{'type': 'float',
|
mmdblookup_object.add_attribute('longitude',
|
||||||
'value': result_entry['country_info']['Longitude (average)']})
|
**{'type': 'float',
|
||||||
mmdblookup_object.add_attribute('text',
|
'value': result_entry['country_info']['Longitude (average)']})
|
||||||
**{'type': 'text',
|
mmdblookup_object.add_attribute('text',
|
||||||
'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format(
|
**{'type': 'text',
|
||||||
result_entry['meta']['db_source'],
|
'value': 'db_source: {}. build_db: {}. Latitude and longitude are country average.'.format(
|
||||||
result_entry['meta']['build_db'])})
|
result_entry['meta']['db_source'],
|
||||||
mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to')
|
result_entry['meta']['build_db'])})
|
||||||
self.misp_event.add_object(mmdblookup_object)
|
mmdblookup_object.add_reference(self.attribute['uuid'], 'related-to')
|
||||||
|
self.misp_event.add_object(mmdblookup_object)
|
||||||
|
country_info_qt += 1
|
||||||
if 'AutonomousSystemNumber' in result_entry['country']:
|
if 'AutonomousSystemNumber' in result_entry['country']:
|
||||||
mmdblookup_object_asn = MISPObject('asn')
|
mmdblookup_object_asn = MISPObject('asn')
|
||||||
mmdblookup_object_asn.add_attribute('asn',
|
mmdblookup_object_asn.add_attribute('asn',
|
||||||
|
@ -96,6 +91,9 @@ def handler(q=False):
|
||||||
else:
|
else:
|
||||||
misperrors['error'] = 'There is no attribute of type ip-src or ip-dst provided as input'
|
misperrors['error'] = 'There is no attribute of type ip-src or ip-dst provided as input'
|
||||||
return misperrors
|
return misperrors
|
||||||
|
max_country_qt = request['config'].get('max_country_qt', 0)
|
||||||
|
if max_country_qt is None:
|
||||||
|
max_country_qt = 0
|
||||||
api_url = check_url(request['config']['custom_API']) if 'config' in request and request['config'].get(
|
api_url = check_url(request['config']['custom_API']) if 'config' in request and request['config'].get(
|
||||||
'custom_API') else mmdblookup_url
|
'custom_API') else mmdblookup_url
|
||||||
r = requests.get("{}/geolookup/{}".format(api_url, toquery))
|
r = requests.get("{}/geolookup/{}".format(api_url, toquery))
|
||||||
|
@ -123,7 +121,7 @@ def handler(q=False):
|
||||||
else:
|
else:
|
||||||
misperrors['error'] = 'API not accessible - http status code {} was returned'.format(r.status_code)
|
misperrors['error'] = 'API not accessible - http status code {} was returned'.format(r.status_code)
|
||||||
return misperrors
|
return misperrors
|
||||||
parser = MmdbLookupParser(attribute, mmdblookupresult, api_url)
|
parser = MmdbLookupParser(attribute, mmdblookupresult, api_url, max_country_qt)
|
||||||
parser.parse_mmdblookup_information()
|
parser.parse_mmdblookup_information()
|
||||||
result = parser.get_result()
|
result = parser.get_result()
|
||||||
return result
|
return result
|
||||||
|
|
Loading…
Reference in New Issue