mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			87 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			87 lines
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| import json
 | |
| from . import check_input_attribute, standard_error_message
 | |
| from datetime import date, datetime, timedelta
 | |
| from pybgpranking import BGPRanking
 | |
| from pymisp import MISPAttribute, MISPEvent, MISPObject
 | |
| 
 | |
| misperrors = {'error': 'Error'}
 | |
| mispattributes = {'input': ['AS'], 'format': 'misp_standard'}
 | |
| moduleinfo = {'version': '0.1', 'author': 'Raphaël Vinot',
 | |
|               'description': 'Query BGP Ranking to get the ranking of an Autonomous System number.',
 | |
|               'module-type': ['expansion', 'hover']}
 | |
| 
 | |
| 
 | |
| def handler(q=False):
 | |
|     if q is False:
 | |
|         return False
 | |
|     request = json.loads(q)
 | |
|     if not request.get('attribute') or not check_input_attribute(request['attribute']):
 | |
|         return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
 | |
|     toquery = request['attribute']
 | |
|     if toquery['type'] not in mispattributes['input']:
 | |
|         return {'error': 'Unsupported attribute type.'}
 | |
| 
 | |
|     bgpranking = BGPRanking()
 | |
|     value_toquery = int(toquery['value'][2:]) if toquery['value'].startswith('AS') else int(toquery['value'])
 | |
|     values = bgpranking.query(value_toquery, date=(date.today() - timedelta(1)).isoformat())
 | |
| 
 | |
|     if not values['response'] or not values['response']['asn_description']:
 | |
|         misperrors['error'] = 'There is no result about this ASN in BGP Ranking'
 | |
|         return misperrors
 | |
| 
 | |
|     event = MISPEvent()
 | |
|     attribute = MISPAttribute()
 | |
|     attribute.from_dict(**toquery)
 | |
|     event.add_attribute(**attribute)
 | |
| 
 | |
|     asn_object = MISPObject('asn')
 | |
|     asn_object.add_attribute(**{
 | |
|         'type': 'AS',
 | |
|         'object_relation': 'asn',
 | |
|         'value': values['meta']['asn']
 | |
|     })
 | |
|     description, country = values['response']['asn_description'].split(', ')
 | |
|     for relation, value in zip(('description', 'country'), (description, country)):
 | |
|         asn_object.add_attribute(**{
 | |
|             'type': 'text',
 | |
|             'object_relation': relation,
 | |
|             'value': value
 | |
|         })
 | |
| 
 | |
|     mapping = {
 | |
|         'address_family': {'type': 'text', 'object_relation': 'address-family'},
 | |
|         'date': {'type': 'datetime', 'object_relation': 'date'},
 | |
|         'position': {'type': 'float', 'object_relation': 'position'},
 | |
|         'rank': {'type': 'float', 'object_relation': 'ranking'}
 | |
|     }
 | |
|     bgp_object = MISPObject('bgp-ranking')
 | |
|     for feature in ('rank', 'position'):
 | |
|         bgp_attribute = {'value': values['response']['ranking'][feature]}
 | |
|         bgp_attribute.update(mapping[feature])
 | |
|         bgp_object.add_attribute(**bgp_attribute)
 | |
|     date_attribute = {'value': datetime.strptime(values['meta']['date'], '%Y-%m-%d')}
 | |
|     date_attribute.update(mapping['date'])
 | |
|     bgp_object.add_attribute(**date_attribute)
 | |
|     address_attribute = {'value': values['meta']['address_family']}
 | |
|     address_attribute.update(mapping['address_family'])
 | |
|     bgp_object.add_attribute(**address_attribute)
 | |
| 
 | |
|     asn_object.add_reference(attribute.uuid, 'describes')
 | |
|     asn_object.add_reference(bgp_object.uuid, 'ranked-with')
 | |
|     event.add_object(asn_object)
 | |
|     event.add_object(bgp_object)
 | |
| 
 | |
|     event = json.loads(event.to_json())
 | |
|     results = {key: event[key] for key in ('Attribute', 'Object')}
 | |
|     return {'results': results}
 | |
| 
 | |
| 
 | |
| def introspection():
 | |
|     return mispattributes
 | |
| 
 | |
| 
 | |
| def version():
 | |
|     return moduleinfo
 |