From 3101e5bc26ee3e45afed16dfb7a5b45ac70a270c Mon Sep 17 00:00:00 2001 From: chrisr3d Date: Tue, 8 Sep 2020 16:08:57 +0200 Subject: [PATCH] chg: Updated the bgpranking expansion module to return MISP objects - The module no longer returns freetext, since the result returned to the freetext import as text only allowed MISP to parse the same AS number as the input attribute. - The new result returned with the updated module is an asn object describing more precisely the AS number, and its ranking for a given day --- misp_modules/modules/expansion/bgpranking.py | 72 ++++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/misp_modules/modules/expansion/bgpranking.py b/misp_modules/modules/expansion/bgpranking.py index b01088d..c021d62 100755 --- a/misp_modules/modules/expansion/bgpranking.py +++ b/misp_modules/modules/expansion/bgpranking.py @@ -1,13 +1,15 @@ # -*- coding: utf-8 -*- import json -from datetime import date, timedelta +from . import check_input_attribute, standard_error_message +from datetime import date, datetime, timedelta from pybgpranking import BGPRanking +from pymisp import MISPAttribute, MISPEvent, MISPObject misperrors = {'error': 'Error'} -mispattributes = {'input': ['AS'], 'output': ['freetext']} +mispattributes = {'input': ['AS'], 'format': 'misp_standard'} moduleinfo = {'version': '0.1', 'author': 'Raphaƫl Vinot', - 'description': 'Query an ASN Description history service (https://github.com/CIRCL/ASN-Description-History.git)', + 'description': 'Query BGP Ranking to get the ranking of an Autonomous System number.', 'module-type': ['expansion', 'hover']} @@ -15,19 +17,65 @@ def handler(q=False): if q is False: return False request = json.loads(q) - if request.get('AS'): - toquery = request['AS'] - else: - misperrors['error'] = "Unsupported attributes type" - return misperrors + if not request.get('attribute') or not check_input_attribute(request['attribute']): + return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'} + toquery = request['attribute'] + if toquery['type'] not in mispattributes['input']: + return {'error': 'Unsupported attribute type.'} bgpranking = BGPRanking() - values = bgpranking.query(toquery, date=(date.today() - timedelta(1)).isoformat()) + value_toquery = int(toquery['value'][2:]) if toquery['value'].startswith('AS') else int(toquery['value']) + values = bgpranking.query(value_toquery, date=(date.today() - timedelta(1)).isoformat()) - if not values: - misperrors['error'] = 'Unable to find the ASN in BGP Ranking' + if not values['response'] or not values['response']['asn_description']: + misperrors['error'] = 'There is no result about this ASN in BGP Ranking' return misperrors - return {'results': [{'types': mispattributes['output'], 'values': values}]} + + event = MISPEvent() + attribute = MISPAttribute() + attribute.from_dict(**toquery) + event.add_attribute(**attribute) + + asn_object = MISPObject('asn') + asn_object.add_attribute(**{ + 'type': 'AS', + 'object_relation': 'asn', + 'value': values['meta']['asn'] + }) + description, country = values['response']['asn_description'].split(', ') + for relation, value in zip(('description', 'country'), (description, country)): + asn_object.add_attribute(**{ + 'type': 'text', + 'object_relation': relation, + 'value': value + }) + + mapping = { + 'address_family': {'type': 'text', 'object_relation': 'address-family'}, + 'date': {'type': 'datetime', 'object_relation': 'date'}, + 'position': {'type': 'float', 'object_relation': 'position'}, + 'rank': {'type': 'float', 'object_relation': 'ranking'} + } + bgp_object = MISPObject('bgp-ranking') + for feature in ('rank', 'position'): + bgp_attribute = {'value': values['response']['ranking'][feature]} + bgp_attribute.update(mapping[feature]) + bgp_object.add_attribute(**bgp_attribute) + date_attribute = {'value': datetime.strptime(values['meta']['date'], '%Y-%m-%d')} + date_attribute.update(mapping['date']) + bgp_object.add_attribute(**date_attribute) + address_attribute = {'value': values['meta']['address_family']} + address_attribute.update(mapping['address_family']) + bgp_object.add_attribute(**address_attribute) + + asn_object.add_reference(attribute.uuid, 'describes') + asn_object.add_reference(bgp_object.uuid, 'ranked-with') + event.add_object(asn_object) + event.add_object(bgp_object) + + event = json.loads(event.to_json()) + results = {key: event[key] for key in ('Attribute', 'Object')} + return {'results': results} def introspection():