commit
						a55666f1a5
					
				|  | @ -4,7 +4,9 @@ not_skip = __init__.py | |||
| known_third_party = | ||||
|     antlr4, | ||||
|     dateutil, | ||||
|     haversine, | ||||
|     medallion, | ||||
|     pyjarowinkler, | ||||
|     pytest, | ||||
|     pytz, | ||||
|     requests, | ||||
|  |  | |||
|  | @ -1,16 +1,13 @@ | |||
| sudo: false | ||||
| language: python | ||||
| cache: pip | ||||
| dist: xenial | ||||
| python: | ||||
|   - "2.7" | ||||
|   - "3.4" | ||||
|   - "3.5" | ||||
|   - "3.6" | ||||
| matrix: | ||||
|   include: | ||||
|     - python: 3.7 # https://github.com/travis-ci/travis-ci/issues/9069#issuecomment-425720905 | ||||
|       dist: xenial | ||||
|       sudo: true | ||||
|   - "3.7" | ||||
| install: | ||||
|   - pip install -U pip setuptools | ||||
|   - pip install tox-travis pre-commit | ||||
|  |  | |||
							
								
								
									
										1
									
								
								setup.py
								
								
								
								
							
							
						
						
									
										1
									
								
								setup.py
								
								
								
								
							|  | @ -64,5 +64,6 @@ setup( | |||
|     }, | ||||
|     extras_require={ | ||||
|         'taxii': ['taxii2-client'], | ||||
|         'semantic': ['haversine', 'pyjarowinkler'], | ||||
|     }, | ||||
| ) | ||||
|  |  | |||
|  | @ -1,9 +1,15 @@ | |||
| """Python STIX2 Environment API.""" | ||||
| 
 | ||||
| import copy | ||||
| import logging | ||||
| import time | ||||
| 
 | ||||
| from .core import parse as _parse | ||||
| from .datastore import CompositeDataSource, DataStoreMixin | ||||
| from .exceptions import SemanticEquivalenceUnsupportedTypeError | ||||
| from .utils import STIXdatetime, parse_into_datetime | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class ObjectFactory(object): | ||||
|  | @ -186,3 +192,448 @@ class Environment(DataStoreMixin): | |||
|             return self.get(creator_id) | ||||
|         else: | ||||
|             return None | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def semantically_equivalent(obj1, obj2, **weight_dict): | ||||
|         """This method is meant to verify if two objects of the same type are | ||||
|         semantically equivalent. | ||||
| 
 | ||||
|         Args: | ||||
|             obj1: A stix2 object instance | ||||
|             obj2: A stix2 object instance | ||||
|             weight_dict: A dictionary that can be used to override settings | ||||
|                 in the semantic equivalence process | ||||
| 
 | ||||
|         Returns: | ||||
|             float: A number between 0.0 and 100.0 as a measurement of equivalence. | ||||
| 
 | ||||
|         Warning: | ||||
|             Course of Action, Intrusion-Set, Observed-Data, Report are not supported | ||||
|             by this implementation. Indicator pattern check is also limited. | ||||
| 
 | ||||
|         Note: | ||||
|             This implementation follows the Committee Note on semantic equivalence. | ||||
|             see `the Committee Note <link here>`__. | ||||
| 
 | ||||
|         """ | ||||
|         # default weights used for the semantic equivalence process | ||||
|         weights = { | ||||
|             "attack-pattern": { | ||||
|                 "name": 30, | ||||
|                 "external_references": 70, | ||||
|                 "method": _attack_pattern_checks, | ||||
|             }, | ||||
|             "campaign": { | ||||
|                 "name": 60, | ||||
|                 "aliases": 40, | ||||
|                 "method": _campaign_checks, | ||||
|             }, | ||||
|             "course-of-action": { | ||||
|                 "method": _course_of_action_checks, | ||||
|             }, | ||||
|             "identity": { | ||||
|                 "name": 60, | ||||
|                 "identity_class": 20, | ||||
|                 "sectors": 20, | ||||
|                 "method": _identity_checks, | ||||
|             }, | ||||
|             "indicator": { | ||||
|                 "indicator_types": 15, | ||||
|                 "pattern": 80, | ||||
|                 "valid_from": 5, | ||||
|                 "tdelta": 1,  # One day interval | ||||
|                 "method": _indicator_checks, | ||||
|             }, | ||||
|             "intrusion-set": { | ||||
|                 "method": _intrusion_set_checks, | ||||
|             }, | ||||
|             "location": { | ||||
|                 "longitude_latitude": 34, | ||||
|                 "region": 33, | ||||
|                 "country": 33, | ||||
|                 "threshold": 1000.0, | ||||
|                 "method": _location_checks, | ||||
|             }, | ||||
|             "malware": { | ||||
|                 "malware_types": 20, | ||||
|                 "name": 80, | ||||
|                 "method": _malware_checks, | ||||
|             }, | ||||
|             "observed-data": { | ||||
|                 "method": _observed_data_checks, | ||||
|             }, | ||||
|             "report": { | ||||
|                 "method": _report_checks, | ||||
|             }, | ||||
|             "threat-actor": { | ||||
|                 "name": 60, | ||||
|                 "threat_actor_types": 20, | ||||
|                 "aliases": 20, | ||||
|                 "method": _threat_actor_checks, | ||||
|             }, | ||||
|             "tool": { | ||||
|                 "tool_types": 20, | ||||
|                 "name": 80, | ||||
|                 "method": _tool_checks, | ||||
|             }, | ||||
|             "vulnerability": { | ||||
|                 "name": 30, | ||||
|                 "external_references": 70, | ||||
|                 "method": _vulnerability_checks, | ||||
|             }, | ||||
|             "_internal": { | ||||
|                 "ignore_spec_version": False, | ||||
|             }, | ||||
|         } | ||||
| 
 | ||||
|         if weight_dict: | ||||
|             weights.update(weight_dict) | ||||
| 
 | ||||
|         type1, type2 = obj1["type"], obj2["type"] | ||||
|         ignore_spec_version = weights["_internal"]["ignore_spec_version"] | ||||
| 
 | ||||
|         if type1 != type2: | ||||
|             raise ValueError('The objects to compare must be of the same type!') | ||||
| 
 | ||||
|         if ignore_spec_version is False and obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"): | ||||
|             raise ValueError('The objects to compare must be of the same spec version!') | ||||
| 
 | ||||
|         method = weights[type1]["method"] | ||||
|         matching_score, sum_weights = method(obj1, obj2, **weights[type1]) | ||||
| 
 | ||||
|         if sum_weights <= 0: | ||||
|             return 0 | ||||
| 
 | ||||
|         equivalence_score = (matching_score / sum_weights) * 100.0 | ||||
|         return equivalence_score | ||||
| 
 | ||||
| 
 | ||||
| def check_property_present(prop, obj1, obj2): | ||||
|     """Helper method checks if a property is present on both objects.""" | ||||
|     if prop in obj1 and prop in obj2: | ||||
|         return True | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| def partial_timestamp_based(t1, t2, tdelta): | ||||
|     """Performs a timestamp-based matching via checking how close one timestamp is to another. | ||||
| 
 | ||||
|     Args: | ||||
|         t1: A datetime string or STIXdatetime object. | ||||
|         t2: A datetime string or STIXdatetime object. | ||||
|         tdelta (float): A given time delta. This number is multiplied by 86400 (1 day) to | ||||
|             extend or shrink your time change tolerance. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: Number between 0.0 and 1.0 depending on match criteria. | ||||
| 
 | ||||
|     """ | ||||
|     if not isinstance(t1, STIXdatetime): | ||||
|         t1 = parse_into_datetime(t1) | ||||
|     if not isinstance(t2, STIXdatetime): | ||||
|         t2 = parse_into_datetime(t2) | ||||
|     t1, t2 = time.mktime(t1.timetuple()), time.mktime(t2.timetuple()) | ||||
|     return 1 - min(abs(t1 - t2) / (86400 * tdelta), 1) | ||||
| 
 | ||||
| 
 | ||||
| def partial_list_based(l1, l2): | ||||
|     """Performs a partial list matching via finding the intersection between common values. | ||||
| 
 | ||||
|     Args: | ||||
|         l1: A list of values. | ||||
|         l2: A list of values. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: 1.0 if the value matches exactly, 0.0 otherwise. | ||||
| 
 | ||||
|     """ | ||||
|     l1_set, l2_set = set(l1), set(l2) | ||||
|     return len(l1_set.intersection(l2_set)) / max(len(l1), len(l2)) | ||||
| 
 | ||||
| 
 | ||||
| def exact_match(val1, val2): | ||||
|     """Performs an exact value match based on two values | ||||
| 
 | ||||
|     Args: | ||||
|         val1: A value suitable for an equality test. | ||||
|         val2: A value suitable for an equality test. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: 1.0 if the value matches exactly, 0.0 otherwise. | ||||
| 
 | ||||
|     """ | ||||
|     if val1 == val2: | ||||
|         return 1.0 | ||||
|     return 0.0 | ||||
| 
 | ||||
| 
 | ||||
| def partial_string_based(str1, str2): | ||||
|     """Performs a partial string match using the Jaro-Winkler distance algorithm. | ||||
| 
 | ||||
|     Args: | ||||
|         str1: A string value to check. | ||||
|         str2: A string value to check. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: Number between 0.0 and 1.0 depending on match criteria. | ||||
| 
 | ||||
|     """ | ||||
|     from pyjarowinkler import distance | ||||
|     return distance.get_jaro_distance(str1, str2) | ||||
| 
 | ||||
| 
 | ||||
| def custom_pattern_based(pattern1, pattern2): | ||||
|     """Performs a matching on Indicator Patterns. | ||||
| 
 | ||||
|     Args: | ||||
|         pattern1: An Indicator pattern | ||||
|         pattern2: An Indicator pattern | ||||
| 
 | ||||
|     Returns: | ||||
|         float: Number between 0.0 and 1.0 depending on match criteria. | ||||
| 
 | ||||
|     """ | ||||
|     logger.warning("Indicator pattern equivalence is not fully defined; will default to zero if not completely identical") | ||||
|     return exact_match(pattern1, pattern2)  # TODO: Implement pattern based equivalence | ||||
| 
 | ||||
| 
 | ||||
| def partial_external_reference_based(refs1, refs2): | ||||
|     """Performs a matching on External References. | ||||
| 
 | ||||
|     Args: | ||||
|         refs1: A list of external references. | ||||
|         refs2: A list of external references. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: Number between 0.0 and 1.0 depending on matches. | ||||
| 
 | ||||
|     """ | ||||
|     allowed = set(("veris", "cve", "capec", "mitre-attack")) | ||||
|     matches = 0 | ||||
| 
 | ||||
|     if len(refs1) >= len(refs2): | ||||
|         l1 = refs1 | ||||
|         l2 = refs2 | ||||
|     else: | ||||
|         l1 = refs2 | ||||
|         l2 = refs1 | ||||
| 
 | ||||
|     for ext_ref1 in l1: | ||||
|         for ext_ref2 in l2: | ||||
|             sn_match = False | ||||
|             ei_match = False | ||||
|             url_match = False | ||||
|             source_name = None | ||||
| 
 | ||||
|             if check_property_present("source_name", ext_ref1, ext_ref2): | ||||
|                 if ext_ref1["source_name"] == ext_ref2["source_name"]: | ||||
|                     source_name = ext_ref1["source_name"] | ||||
|                     sn_match = True | ||||
|             if check_property_present("external_id", ext_ref1, ext_ref2): | ||||
|                 if ext_ref1["external_id"] == ext_ref2["external_id"]: | ||||
|                     ei_match = True | ||||
|             if check_property_present("url", ext_ref1, ext_ref2): | ||||
|                 if ext_ref1["url"] == ext_ref2["url"]: | ||||
|                     url_match = True | ||||
| 
 | ||||
|             # Special case: if source_name is a STIX defined name and either | ||||
|             # external_id or url match then its a perfect match and other entries | ||||
|             # can be ignored. | ||||
|             if sn_match and (ei_match or url_match) and source_name in allowed: | ||||
|                 return 1.0 | ||||
| 
 | ||||
|             # Regular check. If the source_name (not STIX-defined) or external_id or | ||||
|             # url matches then we consider the entry a match. | ||||
|             if (sn_match or ei_match or url_match) and source_name not in allowed: | ||||
|                 matches += 1 | ||||
| 
 | ||||
|     return matches / max(len(refs1), len(refs2)) | ||||
| 
 | ||||
| 
 | ||||
| def partial_location_distance(lat1, long1, lat2, long2, threshold): | ||||
|     """Given two coordinates perform a matching based on its distance using the Haversine Formula. | ||||
| 
 | ||||
|     Args: | ||||
|         lat1: Latitude value for first coordinate point. | ||||
|         lat2: Latitude value for second coordinate point. | ||||
|         long1: Longitude value for first coordinate point. | ||||
|         long2: Longitude value for second coordinate point. | ||||
|         threshold (float): A kilometer measurement for the threshold distance between these two points. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: Number between 0.0 and 1.0 depending on match. | ||||
| 
 | ||||
|     """ | ||||
|     from haversine import haversine, Unit | ||||
|     distance = haversine((lat1, long1), (lat2, long2), unit=Unit.KILOMETERS) | ||||
|     return 1 - (distance / threshold) | ||||
| 
 | ||||
| 
 | ||||
| def _attack_pattern_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     if check_property_present("external_references", obj1, obj2): | ||||
|         w = weights["external_references"] | ||||
|         sum_weights += w | ||||
|         matching_score += ( | ||||
|                 w * | ||||
|                 partial_external_reference_based(obj1["external_references"], obj2["external_references"]) | ||||
|         ) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _campaign_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     if check_property_present("aliases", obj1, obj2): | ||||
|         w = weights["aliases"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _identity_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * exact_match(obj1["name"], obj2["name"]) | ||||
|     if check_property_present("identity_class", obj1, obj2): | ||||
|         w = weights["identity_class"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * exact_match(obj1["identity_class"], obj2["identity_class"]) | ||||
|     if check_property_present("sectors", obj1, obj2): | ||||
|         w = weights["sectors"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["sectors"], obj2["sectors"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _indicator_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("indicator_types", obj1, obj2): | ||||
|         w = weights["indicator_types"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"]) | ||||
|     if check_property_present("pattern", obj1, obj2): | ||||
|         w = weights["pattern"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * custom_pattern_based(obj1["pattern"], obj2["pattern"]) | ||||
|     if check_property_present("valid_from", obj1, obj2): | ||||
|         w = weights["valid_from"] | ||||
|         sum_weights += w | ||||
|         matching_score += ( | ||||
|                 w * | ||||
|                 partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"]) | ||||
|         ) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _location_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2): | ||||
|         w = weights["longitude_latitude"] | ||||
|         sum_weights += w | ||||
|         matching_score += ( | ||||
|                 w * | ||||
|                 partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"]) | ||||
|         ) | ||||
|     if check_property_present("region", obj1, obj2): | ||||
|         w = weights["region"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * exact_match(obj1["region"], obj2["region"]) | ||||
|     if check_property_present("country", obj1, obj2): | ||||
|         w = weights["country"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * exact_match(obj1["country"], obj2["country"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _malware_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("malware_types", obj1, obj2): | ||||
|         w = weights["malware_types"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["malware_types"], obj2["malware_types"]) | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _threat_actor_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     if check_property_present("threat_actor_types", obj1, obj2): | ||||
|         w = weights["threat_actor_types"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"]) | ||||
|     if check_property_present("aliases", obj1, obj2): | ||||
|         w = weights["aliases"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["aliases"], obj2["aliases"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _tool_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("tool_types", obj1, obj2): | ||||
|         w = weights["tool_types"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_list_based(obj1["tool_types"], obj2["tool_types"]) | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _vulnerability_checks(obj1, obj2, **weights): | ||||
|     matching_score = 0.0 | ||||
|     sum_weights = 0.0 | ||||
|     if check_property_present("name", obj1, obj2): | ||||
|         w = weights["name"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_string_based(obj1["name"], obj2["name"]) | ||||
|     if check_property_present("external_references", obj1, obj2): | ||||
|         w = weights["external_references"] | ||||
|         sum_weights += w | ||||
|         matching_score += w * partial_external_reference_based( | ||||
|             obj1["external_references"], | ||||
|             obj2["external_references"], | ||||
|         ) | ||||
|     return matching_score, sum_weights | ||||
| 
 | ||||
| 
 | ||||
| def _course_of_action_checks(obj1, obj2, **weights): | ||||
|     raise SemanticEquivalenceUnsupportedTypeError("course-of-action type has no semantic equivalence implementation!") | ||||
| 
 | ||||
| 
 | ||||
| def _intrusion_set_checks(obj1, obj2, **weights): | ||||
|     raise SemanticEquivalenceUnsupportedTypeError("intrusion-set type has no semantic equivalence implementation!") | ||||
| 
 | ||||
| 
 | ||||
| def _observed_data_checks(obj1, obj2, **weights): | ||||
|     raise SemanticEquivalenceUnsupportedTypeError("observed-data type has no semantic equivalence implementation!") | ||||
| 
 | ||||
| 
 | ||||
| def _report_checks(obj1, obj2, **weights): | ||||
|     raise SemanticEquivalenceUnsupportedTypeError("report type has no semantic equivalence implementation!") | ||||
|  |  | |||
|  | @ -233,3 +233,10 @@ class STIXDeprecationWarning(DeprecationWarning): | |||
|     Represents usage of a deprecated component of a STIX specification. | ||||
|     """ | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| class SemanticEquivalenceUnsupportedTypeError(STIXError, TypeError): | ||||
|     """STIX object type not supported by the semantic equivalence approach.""" | ||||
| 
 | ||||
|     def __init__(self, msg): | ||||
|         super(SemanticEquivalenceUnsupportedTypeError, self).__init__(msg) | ||||
|  |  | |||
|  | @ -1,11 +1,17 @@ | |||
| import pytest | ||||
| 
 | ||||
| import stix2 | ||||
| import stix2.environment | ||||
| import stix2.exceptions | ||||
| 
 | ||||
| from .constants import ( | ||||
|     CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, | ||||
|     INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, | ||||
|     RELATIONSHIP_IDS, | ||||
|     ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS, | ||||
|     COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, FAKE_TIME, IDENTITY_ID, | ||||
|     IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID, | ||||
|     INTRUSION_SET_KWARGS, LOCATION_ID, MALWARE_ID, MALWARE_KWARGS, | ||||
|     OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, RELATIONSHIP_IDS, REPORT_ID, | ||||
|     REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID, TOOL_KWARGS, | ||||
|     VULNERABILITY_ID, VULNERABILITY_KWARGS, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -375,3 +381,399 @@ def test_related_to_by_target(ds): | |||
|     assert len(resp) == 2 | ||||
|     assert any(x['id'] == CAMPAIGN_ID for x in resp) | ||||
|     assert any(x['id'] == INDICATOR_ID for x in resp) | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_attack_pattern1(): | ||||
|     ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) | ||||
|     ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ap1, ap2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_attack_pattern2(): | ||||
|     ATTACK_KWARGS = dict( | ||||
|         name="Phishing", | ||||
|         external_references=[ | ||||
|             { | ||||
|                 "url": "https://example2", | ||||
|                 "source_name": "some-source2", | ||||
|             }, | ||||
|         ], | ||||
|     ) | ||||
|     ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS) | ||||
|     ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ap1, ap2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_campaign1(): | ||||
|     camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) | ||||
|     camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(camp1, camp2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_campaign2(): | ||||
|     CAMP_KWARGS = dict( | ||||
|         name="Green Group Attacks Against Finance", | ||||
|         description="Campaign by Green Group against a series of targets in the financial services sector.", | ||||
|         aliases=["super-green", "some-green"], | ||||
|     ) | ||||
|     camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS) | ||||
|     camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMP_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(camp1, camp2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_identity1(): | ||||
|     iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) | ||||
|     iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(iden1, iden2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_identity2(): | ||||
|     IDEN_KWARGS = dict( | ||||
|         name="John Smith", | ||||
|         identity_class="individual", | ||||
|         sectors=["government", "critical-infrastructure"], | ||||
|     ) | ||||
|     iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS) | ||||
|     iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDEN_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(iden1, iden2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_indicator(): | ||||
|     ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|     ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ind1, ind2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_location1(): | ||||
|     LOCATION_KWARGS = dict(latitude=45, longitude=179) | ||||
|     loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) | ||||
|     loc2 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(loc1, loc2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_location2(): | ||||
|     LOCATION_KWARGS = dict( | ||||
|         latitude=38.889, | ||||
|         longitude=-77.023, | ||||
|         region="northern-america", | ||||
|         country="us", | ||||
|     ) | ||||
|     loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) | ||||
|     loc2 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(loc1, loc2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_malware(): | ||||
|     malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS) | ||||
|     malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(malw1, malw2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_threat_actor1(): | ||||
|     ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) | ||||
|     ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ta1, ta2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_threat_actor2(): | ||||
|     THREAT_KWARGS = dict( | ||||
|         threat_actor_types=["crime-syndicate"], | ||||
|         aliases=["super-evil"], | ||||
|         name="Evil Org", | ||||
|     ) | ||||
|     ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS) | ||||
|     ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ta1, ta2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_tool(): | ||||
|     tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS) | ||||
|     tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(tool1, tool2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_vulnerability1(): | ||||
|     vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) | ||||
|     vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(vul1, vul2) | ||||
|     assert round(env) == 100 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_same_vulnerability2(): | ||||
|     VULN_KWARGS1 = dict( | ||||
|         name="Heartbleed", | ||||
|         external_references=[ | ||||
|             { | ||||
|                 "url": "https://example", | ||||
|                 "source_name": "some-source", | ||||
|             }, | ||||
|         ], | ||||
|     ) | ||||
|     VULN_KWARGS2 = dict( | ||||
|         name="Zot", | ||||
|         external_references=[ | ||||
|             { | ||||
|                 "url": "https://example2", | ||||
|                 "source_name": "some-source2", | ||||
|             }, | ||||
|         ], | ||||
|     ) | ||||
|     vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS1) | ||||
|     vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULN_KWARGS2) | ||||
|     env = stix2.Environment().semantically_equivalent(vul1, vul2) | ||||
|     assert round(env) == 0.0 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_on_unknown_object(): | ||||
|     CUSTOM_KWARGS1 = dict( | ||||
|         type="x-foobar", | ||||
|         id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", | ||||
|         name="Heartbleed", | ||||
|         external_references=[ | ||||
|             { | ||||
|                 "url": "https://example", | ||||
|                 "source_name": "some-source", | ||||
|             }, | ||||
|         ], | ||||
|     ) | ||||
|     CUSTOM_KWARGS2 = dict( | ||||
|         type="x-foobar", | ||||
|         id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", | ||||
|         name="Zot", | ||||
|         external_references=[ | ||||
|             { | ||||
|                 "url": "https://example2", | ||||
|                 "source_name": "some-source2", | ||||
|             }, | ||||
|         ], | ||||
|     ) | ||||
| 
 | ||||
|     def _x_foobar_checks(obj1, obj2, **weights): | ||||
|         matching_score = 0.0 | ||||
|         sum_weights = 0.0 | ||||
|         if stix2.environment.check_property_present("external_references", obj1, obj2): | ||||
|             w = weights["external_references"] | ||||
|             sum_weights += w | ||||
|             matching_score += w * stix2.environment.partial_external_reference_based( | ||||
|                 obj1["external_references"], | ||||
|                 obj2["external_references"], | ||||
|             ) | ||||
|         if stix2.environment.check_property_present("name", obj1, obj2): | ||||
|             w = weights["name"] | ||||
|             sum_weights += w | ||||
|             matching_score += w * stix2.environment.partial_string_based(obj1["name"], obj2["name"]) | ||||
|         return matching_score, sum_weights | ||||
| 
 | ||||
|     weights = { | ||||
|         "x-foobar": { | ||||
|             "external_references": 40, | ||||
|             "name": 60, | ||||
|             "method": _x_foobar_checks, | ||||
|         }, | ||||
|         "_internal": { | ||||
|             "ignore_spec_version": False, | ||||
|         }, | ||||
|     } | ||||
|     cust1 = stix2.parse(CUSTOM_KWARGS1, allow_custom=True) | ||||
|     cust2 = stix2.parse(CUSTOM_KWARGS2, allow_custom=True) | ||||
|     env = stix2.Environment().semantically_equivalent(cust1, cust2, **weights) | ||||
|     assert round(env) == 0 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_different_type_raises(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS) | ||||
|         ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|         stix2.Environment().semantically_equivalent(vul1, ind1) | ||||
| 
 | ||||
|     assert str(excinfo.value) == "The objects to compare must be of the same type!" | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_different_spec_version_raises(): | ||||
|     with pytest.raises(ValueError) as excinfo: | ||||
|         V20_KWARGS = dict( | ||||
|             labels=['malicious-activity'], | ||||
|             pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", | ||||
|         ) | ||||
|         ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|         ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS) | ||||
|         stix2.Environment().semantically_equivalent(ind1, ind2) | ||||
| 
 | ||||
|     assert str(excinfo.value) == "The objects to compare must be of the same spec version!" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     "obj1,obj2,ret_val", | ||||
|     [ | ||||
|         ( | ||||
|              stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), | ||||
|              stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS), | ||||
|              "course-of-action type has no semantic equivalence implementation!", | ||||
|         ), | ||||
|         ( | ||||
|              stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), | ||||
|              stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS), | ||||
|              "intrusion-set type has no semantic equivalence implementation!", | ||||
|         ), | ||||
|         ( | ||||
|              stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), | ||||
|              stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS), | ||||
|              "observed-data type has no semantic equivalence implementation!", | ||||
|         ), | ||||
|         ( | ||||
|              stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), | ||||
|              stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS), | ||||
|              "report type has no semantic equivalence implementation!", | ||||
|         ), | ||||
|     ], | ||||
| ) | ||||
| def test_semantic_equivalence_on_unsupported_types(obj1, obj2, ret_val): | ||||
|     with pytest.raises(stix2.exceptions.SemanticEquivalenceUnsupportedTypeError) as excinfo: | ||||
|         stix2.Environment().semantically_equivalent(obj1, obj2) | ||||
|     assert ret_val == str(excinfo.value) | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_zero_match(): | ||||
|     IND_KWARGS = dict( | ||||
|         indicator_types=["APTX"], | ||||
|         pattern="[ipv4-addr:value = '192.168.1.1']", | ||||
|         pattern_type="stix", | ||||
|         valid_from="2019-01-01T12:34:56Z", | ||||
|     ) | ||||
|     weights = { | ||||
|         "indicator": { | ||||
|             "indicator_types": 15, | ||||
|             "pattern": 80, | ||||
|             "valid_from": 0, | ||||
|             "tdelta": 1,  # One day interval | ||||
|             "method": stix2.environment._indicator_checks, | ||||
|         }, | ||||
|         "_internal": { | ||||
|             "ignore_spec_version": False, | ||||
|         }, | ||||
|     } | ||||
|     ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|     ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights) | ||||
|     assert round(env) == 0 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_different_spec_version(): | ||||
|     IND_KWARGS = dict( | ||||
|         labels=["APTX"], | ||||
|         pattern="[ipv4-addr:value = '192.168.1.1']", | ||||
|     ) | ||||
|     weights = { | ||||
|         "indicator": { | ||||
|             "indicator_types": 15, | ||||
|             "pattern": 80, | ||||
|             "valid_from": 0, | ||||
|             "tdelta": 1,  # One day interval | ||||
|             "method": stix2.environment._indicator_checks, | ||||
|         }, | ||||
|         "_internal": { | ||||
|             "ignore_spec_version": True,  # Disables spec_version check. | ||||
|         }, | ||||
|     } | ||||
|     ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) | ||||
|     ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **IND_KWARGS) | ||||
|     env = stix2.Environment().semantically_equivalent(ind1, ind2, **weights) | ||||
|     assert round(env) == 0 | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     "refs1,refs2,ret_val", [ | ||||
|         ( | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://attack.mitre.org/techniques/T1150", | ||||
|                     "source_name": "mitre-attack", | ||||
|                     "external_id": "T1150", | ||||
|                 }, | ||||
|                 { | ||||
|                     "url": "https://researchcenter.paloaltonetworks.com/2016/09/unit42-sofacys-komplex-os-x-trojan/", | ||||
|                     "source_name": "Sofacy Komplex Trojan", | ||||
|                     "description": "Dani Creus, Tyler Halfpop, Robert Falcone. (2016, September 26). Sofacy's 'Komplex' OS X Trojan. Retrieved July 8, 2017.", | ||||
|                 }, | ||||
|             ], | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://attack.mitre.org/techniques/T1129", | ||||
|                     "source_name": "mitre-attack", | ||||
|                     "external_id": "T1129", | ||||
|                 }, | ||||
|                 { | ||||
|                     "url": "https://en.wikipedia.org/wiki/Microsoft_Windows_library_files", | ||||
|                     "source_name": "Wikipedia Windows Library Files", | ||||
|                     "description": "Wikipedia. (2017, January 31). Microsoft Windows library files. Retrieved February 13, 2017.", | ||||
|                 }, | ||||
|             ], | ||||
|             0.0, | ||||
|         ), | ||||
|         ( | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://attack.mitre.org/techniques/T1129", | ||||
|                     "source_name": "mitre-attack", | ||||
|                     "external_id": "T1129", | ||||
|                 }, | ||||
|             ], | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://attack.mitre.org/techniques/T1129", | ||||
|                     "source_name": "mitre-attack", | ||||
|                     "external_id": "T1129", | ||||
|                 }, | ||||
|                 { | ||||
|                     "url": "https://en.wikipedia.org/wiki/Microsoft_Windows_library_files", | ||||
|                     "source_name": "Wikipedia Windows Library Files", | ||||
|                     "description": "Wikipedia. (2017, January 31). Microsoft Windows library files. Retrieved February 13, 2017.", | ||||
|                 }, | ||||
|             ], | ||||
|             1.0, | ||||
|         ), | ||||
|         ( | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://example", | ||||
|                     "source_name": "some-source", | ||||
|                 }, | ||||
|             ], | ||||
|             [ | ||||
|                 { | ||||
|                     "url": "https://example", | ||||
|                     "source_name": "some-source", | ||||
|                 }, | ||||
|             ], | ||||
|             1.0, | ||||
|         ), | ||||
|     ], | ||||
| ) | ||||
| def test_semantic_equivalence_external_references(refs1, refs2, ret_val): | ||||
|     value = stix2.environment.partial_external_reference_based(refs1, refs2) | ||||
|     assert value == ret_val | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_timetamp(): | ||||
|     t1 = "2018-10-17T00:14:20.652Z" | ||||
|     t2 = "2018-10-17T12:14:20.652Z" | ||||
|     assert stix2.environment.partial_timestamp_based(t1, t2, 1) == 0.5 | ||||
| 
 | ||||
| 
 | ||||
| def test_semantic_equivalence_exact_match(): | ||||
|     t1 = "2018-10-17T00:14:20.652Z" | ||||
|     t2 = "2018-10-17T12:14:20.652Z" | ||||
|     assert stix2.environment.exact_match(t1, t2) == 0.0 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 Chris Lenk
						Chris Lenk