mirror of https://github.com/MISP/MISP-maltego
chg: [transform] in MISP? is now search in MISP
and also supports % search paramspull/40/head
parent
6fc393bed9
commit
a1ba3890eb
|
@ -1,8 +1,8 @@
|
|||
from canari.maltego.entities import Unknown
|
||||
from canari.maltego.entities import Unknown, Hashtag
|
||||
from canari.maltego.transform import Transform
|
||||
# from canari.framework import EnableDebugWindow
|
||||
from MISP_maltego.transforms.common.util import check_update, get_misp_connection, event_to_entity, object_to_entity, get_attribute_in_event, get_attribute_in_object, attribute_to_entity, get_entity_property
|
||||
from canari.maltego.message import LinkDirection
|
||||
from MISP_maltego.transforms.common.entities import MISPGalaxy
|
||||
from MISP_maltego.transforms.common.util import check_update, get_misp_connection, event_to_entity, object_to_entity, get_attribute_in_event, get_attribute_in_object, attribute_to_entity, get_entity_property, search_galaxy_cluster, galaxycluster_to_entity, tag_matches_note_prefix
|
||||
from canari.maltego.message import LinkDirection, Bookmark
|
||||
|
||||
__author__ = 'Christophe Vandeplas'
|
||||
__copyright__ = 'Copyright 2018, MISP_maltego Project'
|
||||
|
@ -15,34 +15,81 @@ __email__ = 'christophe@vandeplas.com'
|
|||
__status__ = 'Development'
|
||||
|
||||
|
||||
# @EnableDebugWindow
|
||||
class AttributeInMISP(Transform):
|
||||
"""Green bookmark if known in MISP"""
|
||||
class SearchInMISP(Transform):
|
||||
"""Search an attribute, event in MISP, allowing the use of % at the front and end"""
|
||||
input_type = Unknown
|
||||
display_name = 'in MISP?'
|
||||
display_name = 'Search in MISP'
|
||||
remote = True
|
||||
|
||||
def do_transform(self, request, response, config):
|
||||
response += check_update(config)
|
||||
maltego_misp_attribute = request.entity
|
||||
# skip MISP Events (value = int)
|
||||
try:
|
||||
int(maltego_misp_attribute.value)
|
||||
return response
|
||||
except Exception:
|
||||
pass
|
||||
link_label = 'Search result'
|
||||
|
||||
if 'properties.mispevent' in request.entity.fields:
|
||||
misp = get_misp_connection(config, request.parameters)
|
||||
events_json = misp.search(controller='events', value=maltego_misp_attribute.value, with_attachments=False)
|
||||
# if event_id
|
||||
try:
|
||||
if request.entity.value == '0':
|
||||
return response
|
||||
eventid = int(request.entity.value)
|
||||
events_json = misp.search(controller='events', eventid=eventid, with_attachments=False)
|
||||
for e in events_json:
|
||||
response += event_to_entity(e, link_label=link_label, link_direction=LinkDirection.OutputToInput)
|
||||
return response
|
||||
except ValueError:
|
||||
pass
|
||||
# if event_info string as value
|
||||
events_json = misp.search(controller='events', eventinfo=request.entity.value, with_attachments=False)
|
||||
for e in events_json:
|
||||
response += event_to_entity(e, link_label=link_label, link_direction=LinkDirection.OutputToInput)
|
||||
return response
|
||||
|
||||
# From galaxy or Hashtag
|
||||
if 'properties.mispgalaxy' in request.entity.fields or 'properties.temp' in request.entity.fields:
|
||||
if request.entity.value == '-':
|
||||
return response
|
||||
# First search in galaxies
|
||||
keyword = get_entity_property(request.entity, 'Temp')
|
||||
if not keyword:
|
||||
keyword = request.entity.value
|
||||
# assume the user is searching for a cluster based on a substring.
|
||||
# Search in the list for those that match and return galaxy entities'
|
||||
potential_clusters = search_galaxy_cluster(keyword)
|
||||
# LATER check if duplicates are possible
|
||||
if potential_clusters:
|
||||
for potential_cluster in potential_clusters:
|
||||
new_entity = galaxycluster_to_entity(potential_cluster, link_label=link_label)
|
||||
# LATER support the type_filter - unfortunately this is not possible, we need Canari to tell us the original entity type
|
||||
if isinstance(new_entity, MISPGalaxy):
|
||||
response += new_entity
|
||||
|
||||
# from Hashtag search also in tags
|
||||
if 'properties.temp' in request.entity.fields:
|
||||
keyword = get_entity_property(request.entity, 'Temp')
|
||||
if not keyword:
|
||||
keyword = request.entity.value
|
||||
misp = get_misp_connection(config, request.parameters)
|
||||
result = misp.direct_call('tags/search', {'name': keyword})
|
||||
for t in result:
|
||||
# skip misp-galaxies as we have processed them earlier on
|
||||
if t['Tag']['name'].startswith('misp-galaxy'):
|
||||
continue
|
||||
# In this case we do not filter away those we add as notes, as people might want to pivot on it explicitly.
|
||||
response += Hashtag(t['Tag']['name'], link_label=link_label, bookmark=Bookmark.Green)
|
||||
|
||||
return response
|
||||
|
||||
# for all other normal entities
|
||||
misp = get_misp_connection(config, request.parameters)
|
||||
events_json = misp.search(controller='events', value=request.entity.value, with_attachments=False)
|
||||
# we need to do really rebuild the Entity from scratch as request.entity is of type Unknown
|
||||
for e in events_json:
|
||||
attr = get_attribute_in_event(e, maltego_misp_attribute.value)
|
||||
attr = get_attribute_in_event(e, request.entity.value, substring=True)
|
||||
if attr:
|
||||
for item in attribute_to_entity(attr, only_self=True):
|
||||
response += item
|
||||
return response
|
||||
|
||||
|
||||
# placeholder for https://github.com/MISP/MISP-maltego/issues/11
|
||||
# waiting for support of CIDR search through the REST API
|
||||
# @EnableDebugWindow
|
||||
|
@ -56,7 +103,7 @@ class AttributeInMISP(Transform):
|
|||
# misp = get_misp_connection(config, request.parameters)
|
||||
# import ipaddress
|
||||
# ip_start, ip_end = maltego_misp_attribute.value.split('-')
|
||||
# # FIXME make this work with IPv4 and IPv6
|
||||
# # LATER make this work with IPv4 and IPv6
|
||||
# # automagically detect the different CIDRs
|
||||
# cidrs = ipaddress.summarize_address_range(ipaddress.IPv4Address(ip_start), ipaddress.IPv4Address(ip_end))
|
||||
# for cidr in cidrs:
|
||||
|
@ -66,7 +113,6 @@ class AttributeInMISP(Transform):
|
|||
# return response
|
||||
|
||||
|
||||
# @EnableDebugWindow
|
||||
class AttributeToEvent(Transform):
|
||||
input_type = Unknown
|
||||
display_name = 'to MISP Event'
|
||||
|
|
|
@ -300,13 +300,37 @@ def get_attribute_in_object(o, attribute_type=False, attribute_value=False, drop
|
|||
return found_attribute
|
||||
|
||||
|
||||
def get_attribute_in_event(e, attribute_value):
|
||||
def get_attribute_in_event(e, attribute_value, substring=False):
|
||||
for a in e['Event']["Attribute"]:
|
||||
if a['value'] == attribute_value:
|
||||
return a
|
||||
if '|' in a['type'] or a['type'] == 'malware-sample':
|
||||
if attribute_value in a['value'].split('|'):
|
||||
return a
|
||||
if substring:
|
||||
keyword = attribute_value.strip('%')
|
||||
if attribute_value.startswith('%') and attribute_value.endswith('%'):
|
||||
if attribute_value in a['value']:
|
||||
return a
|
||||
if '|' in a['type'] or a['type'] == 'malware-sample':
|
||||
val1, val2 = a['value'].split('|')
|
||||
if attribute_value in val1 or attribute_value in val2:
|
||||
return a
|
||||
elif attribute_value.startswith('%'):
|
||||
if a['value'].endswith(keyword):
|
||||
return a
|
||||
if '|' in a['type'] or a['type'] == 'malware-sample':
|
||||
val1, val2 = a['value'].split('|')
|
||||
if val1.endswith(keyword) or val2.endswith(keyword):
|
||||
return a
|
||||
|
||||
elif attribute_value.endswith('%'):
|
||||
if a['value'].startswith(keyword):
|
||||
return a
|
||||
if '|' in a['type'] or a['type'] == 'malware-sample':
|
||||
val1, val2 = a['value'].split('|')
|
||||
if val1.startswith(keyword) or val2.startswith(keyword):
|
||||
return a
|
||||
|
||||
return None
|
||||
|
||||
|
@ -329,7 +353,7 @@ def tag_matches_note_prefix(tag):
|
|||
return False
|
||||
|
||||
|
||||
def event_to_entity(e, link_style=LinkStyle.Normal, link_direction=LinkDirection.InputToOutput):
|
||||
def event_to_entity(e, link_style=LinkStyle.Normal, link_label=None, link_direction=LinkDirection.InputToOutput):
|
||||
tags = []
|
||||
if 'Tag' in e['Event']:
|
||||
for t in e['Event']['Tag']:
|
||||
|
@ -340,6 +364,7 @@ def event_to_entity(e, link_style=LinkStyle.Normal, link_direction=LinkDirection
|
|||
uuid=e['Event']['uuid'],
|
||||
info=e['Event']['info'],
|
||||
link_style=link_style,
|
||||
link_label=link_label,
|
||||
link_direction=link_direction,
|
||||
count_attributes=len(e['Event'].get('Attribute')),
|
||||
count_objects=len(e['Event'].get('Object')),
|
||||
|
@ -356,7 +381,7 @@ def galaxycluster_to_entity(c, link_label=None, link_direction=LinkDirection.Inp
|
|||
else:
|
||||
synonyms = ''
|
||||
|
||||
galaxy_cluster = get_galaxy_cluster(c['uuid'])
|
||||
galaxy_cluster = get_galaxy_cluster(uuid=c['uuid'])
|
||||
# map the 'icon' name from the cluster to the icon filename of the intelligence-icons repository
|
||||
try:
|
||||
icon_url = mapping_galaxy_icon[galaxy_cluster['icon']]
|
||||
|
@ -466,7 +491,7 @@ def galaxy_load_cluster_mapping():
|
|||
return cluster_uuids
|
||||
|
||||
|
||||
def get_galaxy_cluster(uuid=None, tag=None):
|
||||
def get_galaxy_cluster(uuid=None, tag=None, request_entity=None):
|
||||
global galaxy_cluster_uuids
|
||||
if not galaxy_cluster_uuids:
|
||||
galaxy_cluster_uuids = galaxy_load_cluster_mapping()
|
||||
|
@ -477,6 +502,13 @@ def get_galaxy_cluster(uuid=None, tag=None):
|
|||
for item in galaxy_cluster_uuids.values():
|
||||
if item['tag_name'] == tag:
|
||||
return item
|
||||
if request_entity:
|
||||
if request_entity.uuid:
|
||||
return get_galaxy_cluster(uuid=request_entity.uuid)
|
||||
elif request_entity.tag_name:
|
||||
return get_galaxy_cluster(tag=request_entity.tag_name)
|
||||
elif request_entity.name:
|
||||
return get_galaxy_cluster(tag=request_entity.name)
|
||||
|
||||
|
||||
def search_galaxy_cluster(keyword):
|
||||
|
@ -484,8 +516,36 @@ def search_galaxy_cluster(keyword):
|
|||
global galaxy_cluster_uuids
|
||||
if not galaxy_cluster_uuids:
|
||||
galaxy_cluster_uuids = galaxy_load_cluster_mapping()
|
||||
|
||||
# % only at start
|
||||
if keyword.startswith('%') and not keyword.endswith('%'):
|
||||
keyword = keyword.strip('%')
|
||||
for item in galaxy_cluster_uuids.values():
|
||||
if keyword in item['tag_name'].lower():
|
||||
if item['value'].lower().endswith(keyword):
|
||||
yield item
|
||||
else:
|
||||
if 'meta' in item and 'synonyms' in item['meta']:
|
||||
for synonym in item['meta']['synonyms']:
|
||||
if synonym.lower().endswith(keyword):
|
||||
yield item
|
||||
|
||||
# % only at end
|
||||
elif keyword.endswith('%') and not keyword.startswith('%'):
|
||||
keyword = keyword.strip('%')
|
||||
for item in galaxy_cluster_uuids.values():
|
||||
if item['value'].lower().startswith(keyword):
|
||||
yield item
|
||||
else:
|
||||
if 'meta' in item and 'synonyms' in item['meta']:
|
||||
for synonym in item['meta']['synonyms']:
|
||||
if synonym.lower().startswith(keyword):
|
||||
yield item
|
||||
|
||||
# search substring assuming % at start and end
|
||||
else:
|
||||
keyword = keyword.strip('%')
|
||||
for item in galaxy_cluster_uuids.values():
|
||||
if keyword in item['value'].lower():
|
||||
yield item
|
||||
else:
|
||||
if 'meta' in item and 'synonyms' in item['meta']:
|
||||
|
|
|
@ -41,25 +41,20 @@ class GalaxyToTransform(Transform):
|
|||
def do_transform(self, request, response, config, type_filter=MISPGalaxy):
|
||||
response += check_update(config)
|
||||
|
||||
current_cluster = None
|
||||
if request.entity.uuid:
|
||||
current_cluster = get_galaxy_cluster(uuid=request.entity.uuid)
|
||||
elif request.entity.tag_name:
|
||||
current_cluster = get_galaxy_cluster(tag=request.entity.tag_name)
|
||||
elif request.entity.name:
|
||||
current_cluster = get_galaxy_cluster(tag=request.entity.name)
|
||||
current_cluster = get_galaxy_cluster(request_entity=request.entity)
|
||||
|
||||
# legacy - replaced by Search in MISP
|
||||
if not current_cluster and request.entity.name != '-':
|
||||
# maybe the user is searching for a cluster based on a substring.
|
||||
# Search in the list for those that match and return galaxy entities
|
||||
potential_clusters = search_galaxy_cluster(request.entity.name)
|
||||
# TODO check if duplicates are possible
|
||||
if potential_clusters:
|
||||
for potential_cluster in potential_clusters:
|
||||
new_entity = galaxycluster_to_entity(potential_cluster, link_label='Search result')
|
||||
if isinstance(new_entity, type_filter):
|
||||
response += new_entity
|
||||
return response
|
||||
# end of legacy
|
||||
|
||||
if not current_cluster:
|
||||
response += UIMessage("Galaxy Cluster UUID not in local mapping. Please update local cache; non-public UUID are not supported yet.", type=UIMessageType.Inform)
|
||||
|
@ -67,7 +62,7 @@ class GalaxyToTransform(Transform):
|
|||
c = current_cluster
|
||||
|
||||
# update existing object
|
||||
galaxy_cluster = get_galaxy_cluster(c['uuid'])
|
||||
galaxy_cluster = get_galaxy_cluster(uuid=c['uuid'])
|
||||
icon_url = None
|
||||
if 'icon' in galaxy_cluster: # map the 'icon' name from the cluster to the icon filename of the intelligence-icons repository
|
||||
try:
|
||||
|
@ -92,7 +87,7 @@ class GalaxyToTransform(Transform):
|
|||
# find related objects
|
||||
if 'related' in current_cluster:
|
||||
for related in current_cluster['related']:
|
||||
related_cluster = get_galaxy_cluster(related['dest-uuid'])
|
||||
related_cluster = get_galaxy_cluster(uuid=related['dest-uuid'])
|
||||
if related_cluster:
|
||||
new_entity = galaxycluster_to_entity(related_cluster, link_label=related['type'])
|
||||
if isinstance(new_entity, type_filter):
|
||||
|
|
Loading…
Reference in New Issue