mirror of https://github.com/MISP/misp-modules
				
				
				
			
		
			
				
	
	
		
			305 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			305 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
| """vt_graph_parser.helpers.rules.
 | |
| 
 | |
| This module provides rules that helps MISP importers to connect MISP attributes
 | |
| between them using VirusTotal relationship. Check all available relationship
 | |
| here:
 | |
| 
 | |
| - File: https://developers.virustotal.com/v3/reference/#files-relationships
 | |
| - URL: https://developers.virustotal.com/v3/reference/#urls-relationships
 | |
| - Domain: https://developers.virustotal.com/v3/reference/#domains-relationships
 | |
| - IP: https://developers.virustotal.com/v3/reference/#ip-relationships
 | |
| """
 | |
| 
 | |
| 
 | |
| import abc
 | |
| 
 | |
| 
 | |
| class MispEventRule(object):
 | |
|     """Rules for MISP event nodes connection object wrapper."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         """Create a MispEventRule instance.
 | |
| 
 | |
|         MispEventRule is a collection of rules that can infer the relationships
 | |
|         between nodes from MISP events.
 | |
| 
 | |
|         Args:
 | |
|           last_rule (MispEventRule): previous rule.
 | |
|           node (Node): actual node.
 | |
|         """
 | |
|         self.last_rule = last_rule
 | |
|         self.node = node
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def get_last_different_rule(self):
 | |
|         """Search the last rule whose event was different from actual.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the last different rule.
 | |
|         """
 | |
|         if not isinstance(self, self.last_rule.__class__):
 | |
|             return self.last_rule
 | |
|         else:
 | |
|             return self.last_rule.get_last_different_rule()
 | |
| 
 | |
|     def resolve_relation(self, graph, node, misp_category):
 | |
|         """Try to infer a relationship between two nodes.
 | |
| 
 | |
|         This method is based on a non-deterministic finite automaton for
 | |
|         this reason the future rule only depends on the actual rule and the input
 | |
|         node.
 | |
| 
 | |
|         For example if the actual rule is a MISPEventDomainRule and the given node
 | |
|         is an ip_address node, the connection type between them will be
 | |
|         `resolutions` and the this rule will transit to MISPEventIPRule.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
|           misp_category: (str): MISP category of the given node.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         if node.node_type in self.relation_event:
 | |
|             return self.relation_event[node.node_type](graph, node, misp_category)
 | |
|         else:
 | |
|             return self.manual_link(graph, node)
 | |
| 
 | |
|     def manual_link(self, graph, node):
 | |
|         """Creates a manual link between self.node and the given node.
 | |
| 
 | |
|         We accept MISP types that VirusTotal does not know how to link, so we create
 | |
|         a end to end relationship instead of create an unknown relationship node.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         graph.add_link(self.node.node_id, node.node_id, "manual")
 | |
|         return self
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         """Make a new transition due to file attribute event.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
|           misp_category: (str): MISP category of the given node.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         """Make a new transition due to ip attribute event.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
|           misp_category: (str): MISP category of the given node.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         """Make a new transition due to url attribute event.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
|           misp_category: (str): MISP category of the given node.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         """Make a new transition due to domain attribute event.
 | |
| 
 | |
|         Args:
 | |
|           graph (VTGraph): graph to be computed.
 | |
|           node (Node): the node to be linked.
 | |
|           misp_category: (str): MISP category of the given node.
 | |
| 
 | |
|         Returns:
 | |
|           MispEventRule: the transited rule.
 | |
|         """
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class MispEventURLRule(MispEventRule):
 | |
|     """Rule for URL event."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         super(MispEventURLRule, self).__init__(last_rule, node)
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "downloaded_files")
 | |
|         return MispEventFileRule(self, node)
 | |
| 
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "contacted_ips")
 | |
|         return MispEventIPRule(self, node)
 | |
| 
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         suitable_rule = self.get_last_different_rule()
 | |
|         if not isinstance(suitable_rule, MispEventInitialRule):
 | |
|             return suitable_rule.resolve_relation(graph, node, misp_category)
 | |
|         else:
 | |
|             return MispEventURLRule(self, node)
 | |
| 
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "contacted_domains")
 | |
|         return MispEventDomainRule(self, node)
 | |
| 
 | |
| 
 | |
| class MispEventIPRule(MispEventRule):
 | |
|     """Rule for IP event."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         super(MispEventIPRule, self).__init__(last_rule, node)
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         connection_type = "communicating_files"
 | |
|         if misp_category == "Artifacts dropped":
 | |
|             connection_type = "downloaded_files"
 | |
|         graph.add_link(self.node.node_id, node.node_id, connection_type)
 | |
|         return MispEventFileRule(self, node)
 | |
| 
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         suitable_rule = self.get_last_different_rule()
 | |
|         if not isinstance(suitable_rule, MispEventInitialRule):
 | |
|             return suitable_rule.resolve_relation(graph, node, misp_category)
 | |
|         else:
 | |
|             return MispEventIPRule(self, node)
 | |
| 
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "urls")
 | |
|         return MispEventURLRule(self, node)
 | |
| 
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "resolutions")
 | |
|         return MispEventDomainRule(self, node)
 | |
| 
 | |
| 
 | |
| class MispEventDomainRule(MispEventRule):
 | |
|     """Rule for domain event."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         super(MispEventDomainRule, self).__init__(last_rule, node)
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         connection_type = "communicating_files"
 | |
|         if misp_category == "Artifacts dropped":
 | |
|             connection_type = "downloaded_files"
 | |
|         graph.add_link(self.node.node_id, node.node_id, connection_type)
 | |
|         return MispEventFileRule(self, node)
 | |
| 
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "resolutions")
 | |
|         return MispEventIPRule(self, node)
 | |
| 
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "urls")
 | |
|         return MispEventURLRule(self, node)
 | |
| 
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         suitable_rule = self.get_last_different_rule()
 | |
|         if not isinstance(suitable_rule, MispEventInitialRule):
 | |
|             return suitable_rule.resolve_relation(graph, node, misp_category)
 | |
|         else:
 | |
|             graph.add_link(self.node.node_id, node.node_id, "siblings")
 | |
|             return MispEventDomainRule(self, node)
 | |
| 
 | |
| 
 | |
| class MispEventFileRule(MispEventRule):
 | |
|     """Rule for File event."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         super(MispEventFileRule, self).__init__(last_rule, node)
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         suitable_rule = self.get_last_different_rule()
 | |
|         if not isinstance(suitable_rule, MispEventInitialRule):
 | |
|             return suitable_rule.resolve_relation(graph, node, misp_category)
 | |
|         else:
 | |
|             return MispEventFileRule(self, node)
 | |
| 
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "contacted_ips")
 | |
|         return MispEventIPRule(self, node)
 | |
| 
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "contacted_urls")
 | |
|         return MispEventURLRule(self, node)
 | |
| 
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         graph.add_link(self.node.node_id, node.node_id, "contacted_domains")
 | |
|         return MispEventDomainRule(self, node)
 | |
| 
 | |
| 
 | |
| class MispEventInitialRule(MispEventRule):
 | |
|     """Initial rule."""
 | |
| 
 | |
|     def __init__(self, last_rule=None, node=None):
 | |
|         super(MispEventInitialRule, self).__init__(last_rule, node)
 | |
|         self.relation_event = {
 | |
|             "ip_address": self.__ip_transition,
 | |
|             "url": self.__url_transition,
 | |
|             "domain": self.__domain_transition,
 | |
|             "file": self.__file_transition
 | |
|         }
 | |
| 
 | |
|     def __file_transition(self, graph, node, misp_category):
 | |
|         return MispEventFileRule(self, node)
 | |
| 
 | |
|     def __ip_transition(self, graph, node, misp_category):
 | |
|         return MispEventIPRule(self, node)
 | |
| 
 | |
|     def __url_transition(self, graph, node, misp_category):
 | |
|         return MispEventURLRule(self, node)
 | |
| 
 | |
|     def __domain_transition(self, graph, node, misp_category):
 | |
|         return MispEventDomainRule(self, node)
 |