mirror of https://github.com/MISP/misp-modules
305 lines
11 KiB
Python
305 lines
11 KiB
Python
"""vt_graph_parser.helpers.rules.
|
|
|
|
This module provides rules that helps MISP importers to connect MISP attributes
|
|
between them using VirusTotal relationship. Check all available relationship
|
|
here:
|
|
|
|
- File: https://docs.virustotal.com/reference/files#relationships
|
|
- URL: https://docs.virustotal.com/reference/url-object#relationships
|
|
- Domain: https://docs.virustotal.com/reference/domains-object#relationships
|
|
- IP: https://docs.virustotal.com/reference/ip-object#relationships
|
|
"""
|
|
|
|
|
|
import abc
|
|
|
|
|
|
class MispEventRule(object):
|
|
"""Rules for MISP event nodes connection object wrapper."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
"""Create a MispEventRule instance.
|
|
|
|
MispEventRule is a collection of rules that can infer the relationships
|
|
between nodes from MISP events.
|
|
|
|
Args:
|
|
last_rule (MispEventRule): previous rule.
|
|
node (Node): actual node.
|
|
"""
|
|
self.last_rule = last_rule
|
|
self.node = node
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def get_last_different_rule(self):
|
|
"""Search the last rule whose event was different from actual.
|
|
|
|
Returns:
|
|
MispEventRule: the last different rule.
|
|
"""
|
|
if not isinstance(self, self.last_rule.__class__):
|
|
return self.last_rule
|
|
else:
|
|
return self.last_rule.get_last_different_rule()
|
|
|
|
def resolve_relation(self, graph, node, misp_category):
|
|
"""Try to infer a relationship between two nodes.
|
|
|
|
This method is based on a non-deterministic finite automaton for
|
|
this reason the future rule only depends on the actual rule and the input
|
|
node.
|
|
|
|
For example if the actual rule is a MISPEventDomainRule and the given node
|
|
is an ip_address node, the connection type between them will be
|
|
`resolutions` and the this rule will transit to MISPEventIPRule.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
misp_category: (str): MISP category of the given node.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
if node.node_type in self.relation_event:
|
|
return self.relation_event[node.node_type](graph, node, misp_category)
|
|
else:
|
|
return self.manual_link(graph, node)
|
|
|
|
def manual_link(self, graph, node):
|
|
"""Creates a manual link between self.node and the given node.
|
|
|
|
We accept MISP types that VirusTotal does not know how to link, so we create
|
|
a end to end relationship instead of create an unknown relationship node.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
graph.add_link(self.node.node_id, node.node_id, "manual")
|
|
return self
|
|
|
|
@abc.abstractmethod
|
|
def __file_transition(self, graph, node, misp_category):
|
|
"""Make a new transition due to file attribute event.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
misp_category: (str): MISP category of the given node.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
"""Make a new transition due to ip attribute event.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
misp_category: (str): MISP category of the given node.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def __url_transition(self, graph, node, misp_category):
|
|
"""Make a new transition due to url attribute event.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
misp_category: (str): MISP category of the given node.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
"""Make a new transition due to domain attribute event.
|
|
|
|
Args:
|
|
graph (VTGraph): graph to be computed.
|
|
node (Node): the node to be linked.
|
|
misp_category: (str): MISP category of the given node.
|
|
|
|
Returns:
|
|
MispEventRule: the transited rule.
|
|
"""
|
|
pass
|
|
|
|
|
|
class MispEventURLRule(MispEventRule):
|
|
"""Rule for URL event."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
super(MispEventURLRule, self).__init__(last_rule, node)
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def __file_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "downloaded_files")
|
|
return MispEventFileRule(self, node)
|
|
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "contacted_ips")
|
|
return MispEventIPRule(self, node)
|
|
|
|
def __url_transition(self, graph, node, misp_category):
|
|
suitable_rule = self.get_last_different_rule()
|
|
if not isinstance(suitable_rule, MispEventInitialRule):
|
|
return suitable_rule.resolve_relation(graph, node, misp_category)
|
|
else:
|
|
return MispEventURLRule(self, node)
|
|
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "contacted_domains")
|
|
return MispEventDomainRule(self, node)
|
|
|
|
|
|
class MispEventIPRule(MispEventRule):
|
|
"""Rule for IP event."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
super(MispEventIPRule, self).__init__(last_rule, node)
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def __file_transition(self, graph, node, misp_category):
|
|
connection_type = "communicating_files"
|
|
if misp_category == "Artifacts dropped":
|
|
connection_type = "downloaded_files"
|
|
graph.add_link(self.node.node_id, node.node_id, connection_type)
|
|
return MispEventFileRule(self, node)
|
|
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
suitable_rule = self.get_last_different_rule()
|
|
if not isinstance(suitable_rule, MispEventInitialRule):
|
|
return suitable_rule.resolve_relation(graph, node, misp_category)
|
|
else:
|
|
return MispEventIPRule(self, node)
|
|
|
|
def __url_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "urls")
|
|
return MispEventURLRule(self, node)
|
|
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "resolutions")
|
|
return MispEventDomainRule(self, node)
|
|
|
|
|
|
class MispEventDomainRule(MispEventRule):
|
|
"""Rule for domain event."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
super(MispEventDomainRule, self).__init__(last_rule, node)
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def __file_transition(self, graph, node, misp_category):
|
|
connection_type = "communicating_files"
|
|
if misp_category == "Artifacts dropped":
|
|
connection_type = "downloaded_files"
|
|
graph.add_link(self.node.node_id, node.node_id, connection_type)
|
|
return MispEventFileRule(self, node)
|
|
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "resolutions")
|
|
return MispEventIPRule(self, node)
|
|
|
|
def __url_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "urls")
|
|
return MispEventURLRule(self, node)
|
|
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
suitable_rule = self.get_last_different_rule()
|
|
if not isinstance(suitable_rule, MispEventInitialRule):
|
|
return suitable_rule.resolve_relation(graph, node, misp_category)
|
|
else:
|
|
graph.add_link(self.node.node_id, node.node_id, "siblings")
|
|
return MispEventDomainRule(self, node)
|
|
|
|
|
|
class MispEventFileRule(MispEventRule):
|
|
"""Rule for File event."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
super(MispEventFileRule, self).__init__(last_rule, node)
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def __file_transition(self, graph, node, misp_category):
|
|
suitable_rule = self.get_last_different_rule()
|
|
if not isinstance(suitable_rule, MispEventInitialRule):
|
|
return suitable_rule.resolve_relation(graph, node, misp_category)
|
|
else:
|
|
return MispEventFileRule(self, node)
|
|
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "contacted_ips")
|
|
return MispEventIPRule(self, node)
|
|
|
|
def __url_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "contacted_urls")
|
|
return MispEventURLRule(self, node)
|
|
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
graph.add_link(self.node.node_id, node.node_id, "contacted_domains")
|
|
return MispEventDomainRule(self, node)
|
|
|
|
|
|
class MispEventInitialRule(MispEventRule):
|
|
"""Initial rule."""
|
|
|
|
def __init__(self, last_rule=None, node=None):
|
|
super(MispEventInitialRule, self).__init__(last_rule, node)
|
|
self.relation_event = {
|
|
"ip_address": self.__ip_transition,
|
|
"url": self.__url_transition,
|
|
"domain": self.__domain_transition,
|
|
"file": self.__file_transition
|
|
}
|
|
|
|
def __file_transition(self, graph, node, misp_category):
|
|
return MispEventFileRule(self, node)
|
|
|
|
def __ip_transition(self, graph, node, misp_category):
|
|
return MispEventIPRule(self, node)
|
|
|
|
def __url_transition(self, graph, node, misp_category):
|
|
return MispEventURLRule(self, node)
|
|
|
|
def __domain_transition(self, graph, node, misp_category):
|
|
return MispEventDomainRule(self, node)
|