fix logging messages, typos and add tests for the semantic equivalence method

master
Emmanuelle Vargas-Gonzalez 2019-09-16 14:35:14 -04:00
parent 6fa77adfe3
commit e8eb7bcca2
2 changed files with 183 additions and 11 deletions

View File

@ -203,7 +203,7 @@ class Environment(DataStoreMixin):
in the semantic equivalence process
Returns:
float: A number between 0.0 and 1.0 as a measurement of equivalence.
float: A number between 0.0 and 100.0 as a measurement of equivalence.
Warnings:
Not all objects are supported.
@ -256,7 +256,7 @@ class Environment(DataStoreMixin):
"external_references": 70,
},
"_internal": {
"tdelta": 1,
"tdelta": 1, # One day interval
},
}
@ -270,7 +270,7 @@ class Environment(DataStoreMixin):
if type1 != type2:
raise ValueError('The objects to compare must be of the same type!')
if obj1.get("spec_version", "") != obj2.get("spec_version", ""):
if obj1.get("spec_version", "2.0") != obj2.get("spec_version", "2.0"):
raise ValueError('The objects to compare must be of the same spec version!')
if type1 == "attack-pattern":
@ -297,7 +297,8 @@ class Environment(DataStoreMixin):
matching_score += w * _partial_list_based(obj1["aliases"], obj2["aliases"])
elif type1 == "course-of-action":
logger.warning("%s type is not supported for semantic equivalence", type1)
logger.warning("%s type has no semantic equivalence implementation", type1)
return 0
elif type1 == "identity":
if _check_property_present("name", obj1, obj2):
@ -330,8 +331,9 @@ class Environment(DataStoreMixin):
_partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weigths["_internal"]["tdelta"])
)
elif type1 == "instrusion-set":
logger.warning("%s type is not supported for semantic equivalence", type1)
elif type1 == "intrusion-set":
logger.warning("%s type has no semantic equivalence implementation", type1)
return 0
elif type1 == "location":
if _check_property_present("latitude", obj1, obj2) and _check_property_present("longitude", obj1, obj2):
@ -361,10 +363,12 @@ class Environment(DataStoreMixin):
matching_score += w * _partial_string_based(obj1["name"], obj2["name"])
elif type1 == "observed-data":
logger.warning("%s type is not supported for semantic equivalence", type1)
logger.warning("%s type has no semantic equivalence implementation", type1)
return 0
elif type1 == "report":
logger.warning("%s type is not supported for semantic equivalence", type1)
logger.warning("%s type has no semantic equivalence implementation", type1)
return 0
elif type1 == "threat-actor":
if _check_property_present("name", obj1, obj2):
@ -400,6 +404,9 @@ class Environment(DataStoreMixin):
sum_weights += w
matching_score += w * _partial_external_reference_based(obj1["external_references"], obj2["external_references"])
if sum_weights <= 0:
return 0
equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score
@ -433,6 +440,7 @@ def _partial_string_based(str1, str2):
def _custom_pattern_based(pattern1, pattern2):
logger.warning("Checking for Indicator pattern equivalence is currently not implemented!")
return 0 # TODO: Needs to be implemented

View File

@ -3,9 +3,13 @@ import pytest
import stix2
from .constants import (
CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
RELATIONSHIP_IDS,
ATTACK_PATTERN_ID, ATTACK_PATTERN_KWARGS, CAMPAIGN_ID, CAMPAIGN_KWARGS,
COURSE_OF_ACTION_ID, COURSE_OF_ACTION_KWARGS, FAKE_TIME, IDENTITY_ID,
IDENTITY_KWARGS, INDICATOR_ID, INDICATOR_KWARGS, INTRUSION_SET_ID,
INTRUSION_SET_KWARGS, LOCATION_ID, MALWARE_ID, MALWARE_KWARGS,
OBSERVED_DATA_ID, OBSERVED_DATA_KWARGS, RELATIONSHIP_IDS, REPORT_ID,
REPORT_KWARGS, THREAT_ACTOR_ID, THREAT_ACTOR_KWARGS, TOOL_ID, TOOL_KWARGS,
VULNERABILITY_ID, VULNERABILITY_KWARGS,
)
@ -372,3 +376,163 @@ def test_related_to_by_target(ds):
assert len(resp) == 2
assert any(x['id'] == CAMPAIGN_ID for x in resp)
assert any(x['id'] == INDICATOR_ID for x in resp)
def test_semantic_equivalence_on_same_attack_pattern():
ap1 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
ap2 = stix2.v21.AttackPattern(id=ATTACK_PATTERN_ID, **ATTACK_PATTERN_KWARGS)
env = stix2.Environment().semantically_equivalent(ap1, ap2)
assert round(env) == 100
def test_semantic_equivalence_on_same_campaign():
camp1 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
camp2 = stix2.v21.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
env = stix2.Environment().semantically_equivalent(camp1, camp2)
assert round(env) == 100
def test_semantic_equivalence_on_same_identity():
iden1 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
iden2 = stix2.v21.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
env = stix2.Environment().semantically_equivalent(iden1, iden2)
assert round(env) == 100
def test_semantic_equivalence_on_same_indicator():
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2)
assert round(env) == 20 # No support for pattern, hence the 20
def test_semantic_equivalence_on_same_location():
LOCATION_KWARGS = dict(latitude=45, longitude=179)
loc1 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
loc2 = stix2.v21.Location(id=LOCATION_ID, **LOCATION_KWARGS)
env = stix2.Environment().semantically_equivalent(loc1, loc2)
assert round(env) == 100
def test_semantic_equivalence_on_same_malware():
malw1 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
malw2 = stix2.v21.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
env = stix2.Environment().semantically_equivalent(malw1, malw2)
assert round(env) == 100
def test_semantic_equivalence_on_same_threat_actor():
ta1 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
ta2 = stix2.v21.ThreatActor(id=THREAT_ACTOR_ID, **THREAT_ACTOR_KWARGS)
env = stix2.Environment().semantically_equivalent(ta1, ta2)
assert round(env) == 100
def test_semantic_equivalence_on_same_tool():
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2)
assert round(env) == 100
def test_semantic_equivalence_on_same_vulnerability():
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
vul2 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
env = stix2.Environment().semantically_equivalent(vul1, vul2)
assert round(env) == 100
def test_semantic_equivalence_different_type_raises():
with pytest.raises(ValueError) as excinfo:
vul1 = stix2.v21.Vulnerability(id=VULNERABILITY_ID, **VULNERABILITY_KWARGS)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
stix2.Environment().semantically_equivalent(vul1, ind1)
assert str(excinfo.value) == "The objects to compare must be of the same type!"
def test_semantic_equivalence_different_spec_version_raises():
with pytest.raises(ValueError) as excinfo:
V20_KWARGS = dict(
labels=['malicious-activity'],
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
)
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v20.Indicator(id=INDICATOR_ID, **V20_KWARGS)
stix2.Environment().semantically_equivalent(ind1, ind2)
assert str(excinfo.value) == "The objects to compare must be of the same spec version!"
def test_semantic_equivalence_on_unsupported_types():
coa1 = stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
ints1 = stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
obs1 = stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
rep1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
coa2 = stix2.v21.CourseOfAction(id=COURSE_OF_ACTION_ID, **COURSE_OF_ACTION_KWARGS)
ints2 = stix2.v21.IntrusionSet(id=INTRUSION_SET_ID, **INTRUSION_SET_KWARGS)
obs2 = stix2.v21.ObservedData(id=OBSERVED_DATA_ID, **OBSERVED_DATA_KWARGS)
rep2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
obj_list = [(coa1, coa2), (ints1, ints2), (obs1, obs2), (rep1, rep2)]
for obj1, obj2 in obj_list:
env = stix2.Environment().semantically_equivalent(obj1, obj2)
assert round(env) == 0
def test_semantic_equivalence_zero_match():
IND_KWARGS = dict(
indicator_types=["APTX"],
pattern="[ipv4-addr:value = '192.168.1.1']",
)
weigths = {
"attack-pattern": {
"name": 30,
"external_references": 70,
},
"campaign": {
"name": 60,
"aliases": 40,
},
"identity": {
"name": 60,
"identity_class": 20,
"sectors": 20,
},
"indicator": {
"indicator_types": 15,
"pattern": 85,
"valid_from": 0,
},
"location": {
"longitude_latitude": 34,
"region": 33,
"country": 33,
},
"malware": {
"malware_types": 20,
"name": 80,
},
"threat-actor": {
"name": 60,
"threat_actor_types": 20,
"aliases": 20,
},
"tool": {
"tool_types": 20,
"name": 80,
},
"vulnerability": {
"name": 30,
"external_references": 70,
},
"_internal": {
"tdelta": 1,
},
}
ind1 = stix2.v21.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
ind2 = stix2.v21.Indicator(id=INDICATOR_ID, **IND_KWARGS)
env = stix2.Environment().semantically_equivalent(ind1, ind2, **weigths)
assert round(env) == 0