mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			222 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			222 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
import logging
 | 
						|
import json
 | 
						|
 | 
						|
from pymisp import MISPAttribute, MISPEvent, MISPTag,  MISPObject
 | 
						|
from . import check_input_attribute, checking_error, standard_error_message
 | 
						|
 | 
						|
from qintel_helper import search_qsentry
 | 
						|
 | 
						|
logger = logging.getLogger('qintel_qsentry')
 | 
						|
logger.setLevel(logging.DEBUG)
 | 
						|
 | 
						|
moduleinfo = {
 | 
						|
    'version': '1.0',
 | 
						|
    'author': 'Qintel, LLC',
 | 
						|
    'description': 'Query Qintel QSentry for ip intelligence',
 | 
						|
    'module-type': ['hover', 'expansion']
 | 
						|
}
 | 
						|
 | 
						|
moduleconfig = ['token', 'remote']
 | 
						|
 | 
						|
misperrors = {'error': 'Error'}
 | 
						|
 | 
						|
mispattributes = {
 | 
						|
    'input': ['ip-src', 'ip-dst'],
 | 
						|
    'output': ['ip-src', 'ip-dst', 'AS', 'freetext'],
 | 
						|
    'format': 'misp_standard'
 | 
						|
}
 | 
						|
 | 
						|
TAG_COLOR = {
 | 
						|
    'benign': '#27ae60',
 | 
						|
    'suspicious': '#e6a902',
 | 
						|
    'malicious': '#c0392b'
 | 
						|
}
 | 
						|
 | 
						|
CLIENT_HEADERS = {
 | 
						|
        'User-Agent': f"MISP/{moduleinfo['version']}",
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
def _return_error(message):
 | 
						|
    misperrors['error'] = message
 | 
						|
    return misperrors
 | 
						|
 | 
						|
 | 
						|
def _make_tags(enriched_attr, result):
 | 
						|
 | 
						|
    for tag in result['tags']:
 | 
						|
        color = TAG_COLOR['suspicious']
 | 
						|
        if tag == 'criminal':
 | 
						|
            color = TAG_COLOR['malicious']
 | 
						|
 | 
						|
        t = MISPTag()
 | 
						|
        t.from_dict(**{
 | 
						|
            'name': f'qintel:tag="{tag}"',
 | 
						|
            'colour': color
 | 
						|
        })
 | 
						|
        enriched_attr.add_tag(**t)
 | 
						|
 | 
						|
    return enriched_attr
 | 
						|
 | 
						|
 | 
						|
def _make_enriched_attr(event, result, orig_attr):
 | 
						|
 | 
						|
    enriched_object = MISPObject('Qintel Threat Enrichment')
 | 
						|
    enriched_object.add_reference(orig_attr.uuid, 'related-to')
 | 
						|
 | 
						|
    enriched_attr = MISPAttribute()
 | 
						|
    enriched_attr.from_dict(**{
 | 
						|
        'value': orig_attr.value,
 | 
						|
        'type': orig_attr.type,
 | 
						|
        'distribution': 0,
 | 
						|
        'object_relation': 'enriched-attr',
 | 
						|
        'to_ids': orig_attr.to_ids
 | 
						|
    })
 | 
						|
 | 
						|
    enriched_attr = _make_tags(enriched_attr, result)
 | 
						|
    enriched_object.add_attribute(**enriched_attr)
 | 
						|
 | 
						|
    comment_attr = MISPAttribute()
 | 
						|
    comment_attr.from_dict(**{
 | 
						|
        'value': '\n'.join(result.get('descriptions', [])),
 | 
						|
        'type': 'text',
 | 
						|
        'object_relation': 'descriptions',
 | 
						|
        'distribution': 0
 | 
						|
    })
 | 
						|
    enriched_object.add_attribute(**comment_attr)
 | 
						|
 | 
						|
    last_seen = MISPAttribute()
 | 
						|
    last_seen.from_dict(**{
 | 
						|
        'value': result.get('last_seen'),
 | 
						|
        'type': 'datetime',
 | 
						|
        'object_relation': 'last-seen',
 | 
						|
        'distribution': 0
 | 
						|
    })
 | 
						|
    enriched_object.add_attribute(**last_seen)
 | 
						|
 | 
						|
    event.add_attribute(**orig_attr)
 | 
						|
    event.add_object(**enriched_object)
 | 
						|
 | 
						|
    return event
 | 
						|
 | 
						|
 | 
						|
def _make_asn_attr(event, result, orig_attr):
 | 
						|
 | 
						|
    asn_object = MISPObject('asn')
 | 
						|
    asn_object.add_reference(orig_attr.uuid, 'related-to')
 | 
						|
 | 
						|
    asn_attr = MISPAttribute()
 | 
						|
    asn_attr.from_dict(**{
 | 
						|
        'type': 'AS',
 | 
						|
        'value': result.get('asn'),
 | 
						|
        'object_relation': 'asn',
 | 
						|
        'distribution': 0
 | 
						|
    })
 | 
						|
    asn_object.add_attribute(**asn_attr)
 | 
						|
 | 
						|
    org_attr = MISPAttribute()
 | 
						|
    org_attr.from_dict(**{
 | 
						|
        'type': 'text',
 | 
						|
        'value': result.get('asn_name', 'unknown').title(),
 | 
						|
        'object_relation': 'description',
 | 
						|
        'distribution': 0
 | 
						|
    })
 | 
						|
    asn_object.add_attribute(**org_attr)
 | 
						|
 | 
						|
    event.add_object(**asn_object)
 | 
						|
 | 
						|
    return event
 | 
						|
 | 
						|
 | 
						|
def _format_hover(event, result):
 | 
						|
 | 
						|
    enriched_object = event.get_objects_by_name('Qintel Threat Enrichment')[0]
 | 
						|
 | 
						|
    tags = ', '.join(result.get('tags'))
 | 
						|
    enriched_object.add_attribute('Tags', type='text', value=tags)
 | 
						|
 | 
						|
    return event
 | 
						|
 | 
						|
 | 
						|
def _format_result(attribute, result):
 | 
						|
 | 
						|
    event = MISPEvent()
 | 
						|
 | 
						|
    orig_attr = MISPAttribute()
 | 
						|
    orig_attr.from_dict(**attribute)
 | 
						|
 | 
						|
    event = _make_enriched_attr(event, result, orig_attr)
 | 
						|
    event = _make_asn_attr(event, result, orig_attr)
 | 
						|
 | 
						|
    return event
 | 
						|
 | 
						|
 | 
						|
def _check_config(config):
 | 
						|
    if not config:
 | 
						|
        return False
 | 
						|
 | 
						|
    if not isinstance(config, dict):
 | 
						|
        return False
 | 
						|
 | 
						|
    if config.get('token', '') == '':
 | 
						|
        return False
 | 
						|
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def _check_request(request):
 | 
						|
    if not request.get('attribute'):
 | 
						|
        return f'{standard_error_message}, {checking_error}'
 | 
						|
 | 
						|
    check_reqs = ('type', 'value')
 | 
						|
    if not check_input_attribute(request['attribute'],
 | 
						|
                                 requirements=check_reqs):
 | 
						|
        return f'{standard_error_message}, {checking_error}'
 | 
						|
 | 
						|
    if request['attribute']['type'] not in mispattributes['input']:
 | 
						|
        return 'Unsupported attribute type'
 | 
						|
 | 
						|
 | 
						|
def handler(q=False):
 | 
						|
    if not q:
 | 
						|
        return False
 | 
						|
 | 
						|
    request = json.loads(q)
 | 
						|
    config = request.get('config')
 | 
						|
 | 
						|
    if not _check_config(config):
 | 
						|
        return _return_error('Missing Qintel token')
 | 
						|
 | 
						|
    check_request_error = _check_request(request)
 | 
						|
    if check_request_error:
 | 
						|
        return _return_error(check_request_error)
 | 
						|
 | 
						|
    search_args = {
 | 
						|
        'token': config['token'],
 | 
						|
        'remote': config.get('remote')
 | 
						|
    }
 | 
						|
 | 
						|
    try:
 | 
						|
        result = search_qsentry(request['attribute']['value'], **search_args)
 | 
						|
    except Exception as e:
 | 
						|
        return _return_error(str(e))
 | 
						|
 | 
						|
    event = _format_result(request['attribute'], result)
 | 
						|
    if not request.get('event_id'):
 | 
						|
        event = _format_hover(event, result)
 | 
						|
 | 
						|
    event = json.loads(event.to_json())
 | 
						|
 | 
						|
    ret_result = {key: event[key] for key in ('Attribute', 'Object') if key
 | 
						|
                  in event}
 | 
						|
    return {'results': ret_result}
 | 
						|
 | 
						|
 | 
						|
def introspection():
 | 
						|
    return mispattributes
 | 
						|
 | 
						|
 | 
						|
def version():
 | 
						|
    moduleinfo['config'] = moduleconfig
 | 
						|
    return moduleinfo
 |