Merge branch 'master' of github.com:oasis-open/cti-python-stix2

master
chrisr3d 2020-01-08 14:53:36 +01:00
commit c8cd84925b
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
22 changed files with 1915 additions and 468 deletions

1
.gitignore vendored
View File

@ -55,6 +55,7 @@ coverage.xml
# Sphinx documentation # Sphinx documentation
docs/_build/ docs/_build/
.ipynb_checkpoints .ipynb_checkpoints
default_sem_eq_weights.rst
# PyBuilder # PyBuilder
target/ target/

View File

@ -8,8 +8,11 @@ python:
- "3.5" - "3.5"
- "3.6" - "3.6"
- "3.7" - "3.7"
- "3.8"
install: install:
- pip install -U pip setuptools - pip install -U pip setuptools
# remove pyyaml line when we drop py3.4 support
- pip install "pyyaml<5.3"
- pip install tox-travis pre-commit - pip install tox-travis pre-commit
- pip install codecov - pip install codecov
script: script:

View File

@ -1,6 +1,18 @@
CHANGELOG CHANGELOG
========= =========
1.3.0 - 2020-01-04
* #305 Updates support of STIX 2.1 to WD06
* #304 Updates semantic equivalence to latest draft, and allows programmatic
detailed logging
* Adds Python 3.8 support
* #297 Fixes bug with File.contains_refs
* #311 Fixes several DeprecationWarnings
* #315 Fixes parsing embedded external references with custom properties
* #316 Fix socket extension key checking
* #317 Fixes checking of Indicator's pattern property based on pattern_version
1.2.1 - 2019-10-16 1.2.1 - 2019-10-16
* #301 Adds more detailed debugging semantic equivalence output * #301 Adds more detailed debugging semantic equivalence output

View File

@ -1,4 +1,5 @@
import datetime import datetime
import json
import os import os
import re import re
import sys import sys
@ -7,6 +8,7 @@ from six import class_types
from sphinx.ext.autodoc import ClassDocumenter from sphinx.ext.autodoc import ClassDocumenter
from stix2.base import _STIXBase from stix2.base import _STIXBase
from stix2.environment import WEIGHTS
from stix2.version import __version__ from stix2.version import __version__
sys.path.insert(0, os.path.abspath('..')) sys.path.insert(0, os.path.abspath('..'))
@ -59,6 +61,14 @@ latex_documents = [
(master_doc, 'stix2.tex', 'stix2 Documentation', 'OASIS', 'manual'), (master_doc, 'stix2.tex', 'stix2 Documentation', 'OASIS', 'manual'),
] ]
# Add a formatted version of environment.WEIGHTS
default_sem_eq_weights = json.dumps(WEIGHTS, indent=4, default=lambda o: o.__name__)
default_sem_eq_weights = default_sem_eq_weights.replace('\n', '\n ')
default_sem_eq_weights = default_sem_eq_weights.replace(' "', ' ')
default_sem_eq_weights = default_sem_eq_weights.replace('"\n', '\n')
with open('default_sem_eq_weights.rst', 'w') as f:
f.write(".. code-block:: py\n\n {}\n\n".format(default_sem_eq_weights))
def get_property_type(prop): def get_property_type(prop):
"""Convert property classname into pretty string name of property. """Convert property classname into pretty string name of property.

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 1.2.1 current_version = 1.3.0
commit = True commit = True
tag = True tag = True

View File

@ -46,6 +46,7 @@ setup(
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
], ],
keywords='stix stix2 json cti cyber threat intelligence', keywords='stix stix2 json cti cyber threat intelligence',
packages=find_packages(exclude=['*.test', '*.test.*']), packages=find_packages(exclude=['*.test', '*.test.*']),
@ -64,6 +65,6 @@ setup(
}, },
extras_require={ extras_require={
'taxii': ['taxii2-client'], 'taxii': ['taxii2-client'],
'semantic': ['haversine', 'pyjarowinkler'], 'semantic': ['haversine', 'fuzzywuzzy'],
}, },
) )

View File

@ -1,6 +1,5 @@
"""Base classes for type definitions in the STIX2 library.""" """Base classes for type definitions in the STIX2 library."""
import collections
import copy import copy
import datetime as dt import datetime as dt
import uuid import uuid
@ -20,6 +19,12 @@ from .utils import NOW, find_property_index, format_datetime, get_timestamp
from .utils import new_version as _new_version from .utils import new_version as _new_version
from .utils import revoke as _revoke from .utils import revoke as _revoke
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
__all__ = ['STIXJSONEncoder', '_STIXBase'] __all__ = ['STIXJSONEncoder', '_STIXBase']
DEFAULT_ERROR = "{type} must have {property}='{expected}'." DEFAULT_ERROR = "{type} must have {property}='{expected}'."
@ -68,7 +73,7 @@ def get_required_properties(properties):
return (k for k, v in properties.items() if v.required) return (k for k, v in properties.items() if v.required)
class _STIXBase(collections.Mapping): class _STIXBase(Mapping):
"""Base class for STIX object types""" """Base class for STIX object types"""
def object_properties(self): def object_properties(self):
@ -143,7 +148,7 @@ class _STIXBase(collections.Mapping):
def __init__(self, allow_custom=False, interoperability=False, **kwargs): def __init__(self, allow_custom=False, interoperability=False, **kwargs):
cls = self.__class__ cls = self.__class__
self.__allow_custom = allow_custom self._allow_custom = allow_custom
self.__interoperability = interoperability self.__interoperability = interoperability
# Use the same timestamp for any auto-generated datetimes # Use the same timestamp for any auto-generated datetimes
@ -153,12 +158,12 @@ class _STIXBase(collections.Mapping):
custom_props = kwargs.pop('custom_properties', {}) custom_props = kwargs.pop('custom_properties', {})
if custom_props and not isinstance(custom_props, dict): if custom_props and not isinstance(custom_props, dict):
raise ValueError("'custom_properties' must be a dictionary") raise ValueError("'custom_properties' must be a dictionary")
if not self.__allow_custom: if not self._allow_custom:
extra_kwargs = list(set(kwargs) - set(self._properties)) extra_kwargs = list(set(kwargs) - set(self._properties))
if extra_kwargs: if extra_kwargs:
raise ExtraPropertiesError(cls, extra_kwargs) raise ExtraPropertiesError(cls, extra_kwargs)
if custom_props: if custom_props:
self.__allow_custom = True self._allow_custom = True
# Remove any keyword arguments whose value is None or [] (i.e. empty list) # Remove any keyword arguments whose value is None or [] (i.e. empty list)
setting_kwargs = {} setting_kwargs = {}
@ -236,7 +241,7 @@ class _STIXBase(collections.Mapping):
if isinstance(self, _Observable): if isinstance(self, _Observable):
# Assume: valid references in the original object are still valid in the new version # Assume: valid references in the original object are still valid in the new version
new_inner['_valid_refs'] = {'*': '*'} new_inner['_valid_refs'] = {'*': '*'}
new_inner['allow_custom'] = self.__allow_custom new_inner['allow_custom'] = self._allow_custom
new_inner['interoperability'] = self.__interoperability new_inner['interoperability'] = self.__interoperability
return cls(**new_inner) return cls(**new_inner)
@ -308,7 +313,7 @@ class _Observable(_STIXBase):
# the constructor might be called independently of an observed data object # the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', []) self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])
self.__allow_custom = kwargs.get('allow_custom', False) self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False) self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
try: try:

View File

@ -193,7 +193,7 @@ class Environment(DataStoreMixin):
return None return None
@staticmethod @staticmethod
def semantically_equivalent(obj1, obj2, **weight_dict): def semantically_equivalent(obj1, obj2, prop_scores={}, **weight_dict):
"""This method is meant to verify if two objects of the same type are """This method is meant to verify if two objects of the same type are
semantically equivalent. semantically equivalent.
@ -210,68 +210,17 @@ class Environment(DataStoreMixin):
Course of Action, Intrusion-Set, Observed-Data, Report are not supported Course of Action, Intrusion-Set, Observed-Data, Report are not supported
by this implementation. Indicator pattern check is also limited. by this implementation. Indicator pattern check is also limited.
Note:
Default weights_dict:
.. include:: ../default_sem_eq_weights.rst
Note: Note:
This implementation follows the Committee Note on semantic equivalence. This implementation follows the Committee Note on semantic equivalence.
see `the Committee Note <link here>`__. see `the Committee Note <link here>`__.
""" """
# default weights used for the semantic equivalence process weights = WEIGHTS.copy()
weights = {
"attack-pattern": {
"name": 30,
"external_references": 70,
"method": _attack_pattern_checks,
},
"campaign": {
"name": 60,
"aliases": 40,
"method": _campaign_checks,
},
"identity": {
"name": 60,
"identity_class": 20,
"sectors": 20,
"method": _identity_checks,
},
"indicator": {
"indicator_types": 15,
"pattern": 80,
"valid_from": 5,
"tdelta": 1, # One day interval
"method": _indicator_checks,
},
"location": {
"longitude_latitude": 34,
"region": 33,
"country": 33,
"threshold": 1000.0,
"method": _location_checks,
},
"malware": {
"malware_types": 20,
"name": 80,
"method": _malware_checks,
},
"threat-actor": {
"name": 60,
"threat_actor_types": 20,
"aliases": 20,
"method": _threat_actor_checks,
},
"tool": {
"tool_types": 20,
"name": 80,
"method": _tool_checks,
},
"vulnerability": {
"name": 30,
"external_references": 70,
"method": _vulnerability_checks,
},
"_internal": {
"ignore_spec_version": False,
},
}
if weight_dict: if weight_dict:
weights.update(weight_dict) weights.update(weight_dict)
@ -286,17 +235,54 @@ class Environment(DataStoreMixin):
raise ValueError('The objects to compare must be of the same spec version!') raise ValueError('The objects to compare must be of the same spec version!')
try: try:
method = weights[type1]["method"] weights[type1]
except KeyError: except KeyError:
logger.warning("'%s' type has no semantic equivalence method to call!", type1) logger.warning("'%s' type has no 'weights' dict specified & thus no semantic equivalence method to call!", type1)
sum_weights = matching_score = 0 sum_weights = matching_score = 0
else: else:
try:
method = weights[type1]["method"]
except KeyError:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"]) logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
matching_score = 0.0
sum_weights = 0.0
for prop in weights[type1]:
if check_property_present(prop, obj1, obj2) or prop == "longitude_latitude":
w = weights[type1][prop][0]
comp_funct = weights[type1][prop][1]
if comp_funct == partial_timestamp_based:
contributing_score = w * comp_funct(obj1[prop], obj2[prop], weights[type1]["tdelta"])
elif comp_funct == partial_location_distance:
threshold = weights[type1]["threshold"]
contributing_score = w * comp_funct(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], threshold)
else:
contributing_score = w * comp_funct(obj1[prop], obj2[prop])
sum_weights += w
matching_score += contributing_score
prop_scores[prop] = {
"weight": w,
"contributing_score": contributing_score,
}
logger.debug("'%s' check -- weight: %s, contributing score: %s", prop, w, contributing_score)
prop_scores["matching_score"] = matching_score
prop_scores["sum_weights"] = sum_weights
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
else:
logger.debug("Starting semantic equivalence process between: '%s' and '%s'", obj1["id"], obj2["id"])
try:
matching_score, sum_weights = method(obj1, obj2, prop_scores, **weights[type1])
except TypeError:
# method doesn't support detailed output with prop_scores
matching_score, sum_weights = method(obj1, obj2, **weights[type1]) matching_score, sum_weights = method(obj1, obj2, **weights[type1])
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
if sum_weights <= 0: if sum_weights <= 0:
return 0 return 0
equivalence_score = (matching_score / sum_weights) * 100.0 equivalence_score = (matching_score / sum_weights) * 100.0
return equivalence_score return equivalence_score
@ -377,10 +363,10 @@ def partial_string_based(str1, str2):
float: Number between 0.0 and 1.0 depending on match criteria. float: Number between 0.0 and 1.0 depending on match criteria.
""" """
from pyjarowinkler import distance from fuzzywuzzy import fuzz
result = distance.get_jaro_distance(str1, str2) result = fuzz.token_sort_ratio(str1, str2, force_ascii=False)
logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result) logger.debug("--\t\tpartial_string_based '%s' '%s'\tresult: '%s'", str1, str2, result)
return result return result / 100.0
def custom_pattern_based(pattern1, pattern2): def custom_pattern_based(pattern1, pattern2):
@ -485,207 +471,51 @@ def partial_location_distance(lat1, long1, lat2, long2, threshold):
return result return result
def _attack_pattern_checks(obj1, obj2, **weights): # default weights used for the semantic equivalence process
matching_score = 0.0 WEIGHTS = {
sum_weights = 0.0 "attack-pattern": {
if check_property_present("name", obj1, obj2): "name": (30, partial_string_based),
w = weights["name"] "external_references": (70, partial_external_reference_based),
contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) },
sum_weights += w "campaign": {
matching_score += contributing_score "name": (60, partial_string_based),
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) "aliases": (40, partial_list_based),
if check_property_present("external_references", obj1, obj2): },
w = weights["external_references"] "identity": {
contributing_score = ( "name": (60, partial_string_based),
w * partial_external_reference_based(obj1["external_references"], obj2["external_references"]) "identity_class": (20, exact_match),
) "sectors": (20, partial_list_based),
sum_weights += w },
matching_score += contributing_score "indicator": {
logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score) "indicator_types": (15, partial_list_based),
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) "pattern": (80, custom_pattern_based),
return matching_score, sum_weights "valid_from": (5, partial_timestamp_based),
"tdelta": 1, # One day interval
},
def _campaign_checks(obj1, obj2, **weights): "location": {
matching_score = 0.0 "longitude_latitude": (34, partial_location_distance),
sum_weights = 0.0 "region": (33, exact_match),
if check_property_present("name", obj1, obj2): "country": (33, exact_match),
w = weights["name"] "threshold": 1000.0,
contributing_score = w * partial_string_based(obj1["name"], obj2["name"]) },
sum_weights += w "malware": {
matching_score += contributing_score "malware_types": (20, partial_list_based),
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score) "name": (80, partial_string_based),
if check_property_present("aliases", obj1, obj2): },
w = weights["aliases"] "threat-actor": {
contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"]) "name": (60, partial_string_based),
sum_weights += w "threat_actor_types": (20, partial_list_based),
matching_score += contributing_score "aliases": (20, partial_list_based),
logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score) },
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights) "tool": {
return matching_score, sum_weights "tool_types": (20, partial_list_based),
"name": (80, partial_string_based),
},
def _identity_checks(obj1, obj2, **weights): "vulnerability": {
matching_score = 0.0 "name": (30, partial_string_based),
sum_weights = 0.0 "external_references": (70, partial_external_reference_based),
if check_property_present("name", obj1, obj2): },
w = weights["name"] "_internal": {
contributing_score = w * exact_match(obj1["name"], obj2["name"]) "ignore_spec_version": False,
sum_weights += w },
matching_score += contributing_score } #: :autodoc-skip:
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("identity_class", obj1, obj2):
w = weights["identity_class"]
contributing_score = w * exact_match(obj1["identity_class"], obj2["identity_class"])
sum_weights += w
matching_score += contributing_score
logger.debug("'identity_class' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("sectors", obj1, obj2):
w = weights["sectors"]
contributing_score = w * partial_list_based(obj1["sectors"], obj2["sectors"])
sum_weights += w
matching_score += contributing_score
logger.debug("'sectors' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _indicator_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("indicator_types", obj1, obj2):
w = weights["indicator_types"]
contributing_score = w * partial_list_based(obj1["indicator_types"], obj2["indicator_types"])
sum_weights += w
matching_score += contributing_score
logger.debug("'indicator_types' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("pattern", obj1, obj2):
w = weights["pattern"]
contributing_score = w * custom_pattern_based(obj1["pattern"], obj2["pattern"])
sum_weights += w
matching_score += contributing_score
logger.debug("'pattern' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("valid_from", obj1, obj2):
w = weights["valid_from"]
contributing_score = (
w *
partial_timestamp_based(obj1["valid_from"], obj2["valid_from"], weights["tdelta"])
)
sum_weights += w
matching_score += contributing_score
logger.debug("'valid_from' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _location_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("latitude", obj1, obj2) and check_property_present("longitude", obj1, obj2):
w = weights["longitude_latitude"]
contributing_score = (
w *
partial_location_distance(obj1["latitude"], obj1["longitude"], obj2["latitude"], obj2["longitude"], weights["threshold"])
)
sum_weights += w
matching_score += contributing_score
logger.debug("'longitude_latitude' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("region", obj1, obj2):
w = weights["region"]
contributing_score = w * exact_match(obj1["region"], obj2["region"])
sum_weights += w
matching_score += contributing_score
logger.debug("'region' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("country", obj1, obj2):
w = weights["country"]
contributing_score = w * exact_match(obj1["country"], obj2["country"])
sum_weights += w
matching_score += contributing_score
logger.debug("'country' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _malware_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("malware_types", obj1, obj2):
w = weights["malware_types"]
contributing_score = w * partial_list_based(obj1["malware_types"], obj2["malware_types"])
sum_weights += w
matching_score += contributing_score
logger.debug("'malware_types' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("name", obj1, obj2):
w = weights["name"]
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
sum_weights += w
matching_score += contributing_score
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _threat_actor_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("name", obj1, obj2):
w = weights["name"]
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
sum_weights += w
matching_score += contributing_score
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("threat_actor_types", obj1, obj2):
w = weights["threat_actor_types"]
contributing_score = w * partial_list_based(obj1["threat_actor_types"], obj2["threat_actor_types"])
sum_weights += w
matching_score += contributing_score
logger.debug("'threat_actor_types' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("aliases", obj1, obj2):
w = weights["aliases"]
contributing_score = w * partial_list_based(obj1["aliases"], obj2["aliases"])
sum_weights += w
matching_score += contributing_score
logger.debug("'aliases' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _tool_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("tool_types", obj1, obj2):
w = weights["tool_types"]
contributing_score = w * partial_list_based(obj1["tool_types"], obj2["tool_types"])
sum_weights += w
matching_score += contributing_score
logger.debug("'tool_types' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("name", obj1, obj2):
w = weights["name"]
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
sum_weights += w
matching_score += contributing_score
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights
def _vulnerability_checks(obj1, obj2, **weights):
matching_score = 0.0
sum_weights = 0.0
if check_property_present("name", obj1, obj2):
w = weights["name"]
contributing_score = w * partial_string_based(obj1["name"], obj2["name"])
sum_weights += w
matching_score += contributing_score
logger.debug("'name' check -- weight: %s, contributing score: %s", w, contributing_score)
if check_property_present("external_references", obj1, obj2):
w = weights["external_references"]
contributing_score = w * partial_external_reference_based(
obj1["external_references"],
obj2["external_references"],
)
sum_weights += w
matching_score += contributing_score
logger.debug("'external_references' check -- weight: %s, contributing score: %s", w, contributing_score)
logger.debug("Matching Score: %s, Sum of Weights: %s", matching_score, sum_weights)
return matching_score, sum_weights

View File

@ -2,14 +2,12 @@
import base64 import base64
import binascii import binascii
import collections
import copy import copy
import inspect import inspect
import re import re
import uuid import uuid
from six import string_types, text_type from six import string_types, text_type
from stix2patterns.validator import run_validator
import stix2 import stix2
@ -27,6 +25,11 @@ ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-"
"[0-9a-fA-F]{4}-" "[0-9a-fA-F]{4}-"
"[0-9a-fA-F]{12}$") "[0-9a-fA-F]{12}$")
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
ERROR_INVALID_ID = ( ERROR_INVALID_ID = (
"not a valid STIX identifier, must match <object-type>--<UUID>: {}" "not a valid STIX identifier, must match <object-type>--<UUID>: {}"
) )
@ -208,8 +211,13 @@ class ListProperty(Property):
else: else:
obj_type = self.contained obj_type = self.contained
if isinstance(valid, collections.Mapping): if isinstance(valid, Mapping):
try:
valid._allow_custom
except AttributeError:
result.append(obj_type(**valid)) result.append(obj_type(**valid))
else:
result.append(obj_type(allow_custom=True, **valid))
else: else:
result.append(obj_type(valid)) result.append(obj_type(valid))
@ -403,7 +411,7 @@ class HashesProperty(DictionaryProperty):
def clean(self, value): def clean(self, value):
clean_dict = super(HashesProperty, self).clean(value) clean_dict = super(HashesProperty, self).clean(value)
for k, v in clean_dict.items(): for k, v in copy.deepcopy(clean_dict).items():
key = k.upper().replace('-', '') key = k.upper().replace('-', '')
if key in HASHES_REGEX: if key in HASHES_REGEX:
vocab_key = HASHES_REGEX[key][1] vocab_key = HASHES_REGEX[key][1]
@ -562,14 +570,7 @@ class EnumProperty(StringProperty):
class PatternProperty(StringProperty): class PatternProperty(StringProperty):
pass
def clean(self, value):
cleaned_value = super(PatternProperty, self).clean(value)
errors = run_validator(cleaned_value)
if errors:
raise ValueError(str(errors[0]))
return cleaned_value
class ObservableProperty(Property): class ObservableProperty(Property):

View File

@ -192,3 +192,23 @@ def test_invalid_indicator_pattern():
assert excinfo.value.cls == stix2.v20.Indicator assert excinfo.value.cls == stix2.v20.Indicator
assert excinfo.value.prop_name == 'pattern' assert excinfo.value.prop_name == 'pattern'
assert 'mismatched input' in excinfo.value.reason assert 'mismatched input' in excinfo.value.reason
def test_indicator_stix21_invalid_pattern():
now = dt.datetime(2017, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
epoch = dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
patrn = "[EXISTS windows-registry-key:values]"
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v20.Indicator(
type="indicator",
id=INDICATOR_ID,
created=now,
modified=now,
pattern=patrn,
valid_from=epoch,
labels=["malicious-activity"],
)
assert excinfo.value.cls == stix2.v20.Indicator
assert "FAIL: Error found at line 1:8. no viable alternative at input 'EXISTS" in str(excinfo.value)

View File

@ -521,7 +521,7 @@ def test_semantic_equivalence_on_same_vulnerability2():
], ],
) )
VULN_KWARGS2 = dict( VULN_KWARGS2 = dict(
name="Zot", name="Foo",
external_references=[ external_references=[
{ {
"url": "https://example2", "url": "https://example2",
@ -550,7 +550,7 @@ def test_semantic_equivalence_on_unknown_object():
CUSTOM_KWARGS2 = dict( CUSTOM_KWARGS2 = dict(
type="x-foobar", type="x-foobar",
id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", id="x-foobar--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061",
name="Zot", name="Foo",
external_references=[ external_references=[
{ {
"url": "https://example2", "url": "https://example2",
@ -622,11 +622,10 @@ def test_semantic_equivalence_zero_match():
) )
weights = { weights = {
"indicator": { "indicator": {
"indicator_types": 15, "indicator_types": (15, stix2.environment.partial_list_based),
"pattern": 80, "pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": 0, "valid_from": (5, stix2.environment.partial_timestamp_based),
"tdelta": 1, # One day interval "tdelta": 1, # One day interval
"method": stix2.environment._indicator_checks,
}, },
"_internal": { "_internal": {
"ignore_spec_version": False, "ignore_spec_version": False,
@ -645,11 +644,10 @@ def test_semantic_equivalence_different_spec_version():
) )
weights = { weights = {
"indicator": { "indicator": {
"indicator_types": 15, "indicator_types": (15, stix2.environment.partial_list_based),
"pattern": 80, "pattern": (80, stix2.environment.custom_pattern_based),
"valid_from": 0, "valid_from": (5, stix2.environment.partial_timestamp_based),
"tdelta": 1, # One day interval "tdelta": 1, # One day interval
"method": stix2.environment._indicator_checks,
}, },
"_internal": { "_internal": {
"ignore_spec_version": True, # Disables spec_version check. "ignore_spec_version": True, # Disables spec_version check.
@ -750,3 +748,75 @@ def test_non_existent_config_for_object():
r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r1 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS) r2 = stix2.v21.Report(id=REPORT_ID, **REPORT_KWARGS)
assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0 assert stix2.Environment().semantically_equivalent(r1, r2) == 0.0
def custom_semantic_equivalence_method(obj1, obj2, **weights):
return 96.0, 100.0
def test_semantic_equivalence_method_provided():
# Because `method` is provided, `partial_list_based` will be ignored
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
)
weights = {
"tool": {
"tool_types": (20, stix2.environment.partial_list_based),
"name": (80, stix2.environment.partial_string_based),
"method": custom_semantic_equivalence_method,
},
}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, **weights)
assert round(env) == 96
def test_semantic_equivalence_prop_scores():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
)
prop_scores = {}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores)
assert len(prop_scores) == 4
assert round(prop_scores["matching_score"], 1) == 8.8
assert round(prop_scores["sum_weights"], 1) == 100.0
def custom_semantic_equivalence_method_prop_scores(obj1, obj2, prop_scores, **weights):
prop_scores["matching_score"] = 96.0
prop_scores["sum_weights"] = 100.0
return 96.0, 100.0
def test_semantic_equivalence_prop_scores_method_provided():
TOOL2_KWARGS = dict(
name="Random Software",
tool_types=["information-gathering"],
)
weights = {
"tool": {
"tool_types": 20,
"name": 80,
"method": custom_semantic_equivalence_method_prop_scores,
},
}
prop_scores = {}
tool1 = stix2.v21.Tool(id=TOOL_ID, **TOOL_KWARGS)
tool2 = stix2.v21.Tool(id=TOOL_ID, **TOOL2_KWARGS)
env = stix2.Environment().semantically_equivalent(tool1, tool2, prop_scores, **weights)
assert round(env) == 96
assert len(prop_scores) == 2
assert prop_scores["matching_score"] == 96.0
assert prop_scores["sum_weights"] == 100.0

View File

@ -207,3 +207,86 @@ def test_invalid_indicator_pattern():
assert excinfo.value.cls == stix2.v21.Indicator assert excinfo.value.cls == stix2.v21.Indicator
assert excinfo.value.prop_name == 'pattern' assert excinfo.value.prop_name == 'pattern'
assert 'mismatched input' in excinfo.value.reason assert 'mismatched input' in excinfo.value.reason
def test_indicator_with_custom_embedded_objs():
now = dt.datetime(2017, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
epoch = dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
ext_ref = stix2.v21.ExternalReference(
source_name="Test",
description="Example Custom Ext Ref",
random_custom_prop="This is a custom property",
allow_custom=True,
)
ind = stix2.v21.Indicator(
type="indicator",
id=INDICATOR_ID,
created=now,
modified=now,
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
pattern_type="stix",
valid_from=epoch,
indicator_types=['malicious-activity'],
external_references=[ext_ref],
)
assert ind.indicator_types == ['malicious-activity']
assert len(ind.external_references) == 1
assert ind.external_references[0] == ext_ref
def test_indicator_with_custom_embed_objs_extra_props_error():
ext_ref = stix2.v21.ExternalReference(
source_name="Test",
description="Example Custom Ext Ref",
random_custom_prop="This is a custom property",
allow_custom=True,
)
with pytest.raises(stix2.exceptions.ExtraPropertiesError) as excinfo:
stix2.v21.Indicator(external_references=[ext_ref], bad_custom_prop="shouldn't be here", **INDICATOR_KWARGS)
assert excinfo.value.cls == stix2.v21.Indicator
assert excinfo.value.properties == ['bad_custom_prop']
assert str(excinfo.value) == "Unexpected properties for Indicator: (bad_custom_prop)."
def test_indicator_stix20_invalid_pattern():
now = dt.datetime(2017, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
epoch = dt.datetime(1970, 1, 1, 0, 0, 1, tzinfo=pytz.utc)
patrn = "[win-registry-key:key = 'hkey_local_machine\\\\foo\\\\bar'] WITHIN 5 SECONDS WITHIN 6 SECONDS"
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v21.Indicator(
type="indicator",
id=INDICATOR_ID,
created=now,
modified=now,
pattern=patrn,
pattern_type="stix",
valid_from=epoch,
indicator_types=['malicious-activity'],
)
assert excinfo.value.cls == stix2.v21.Indicator
assert "FAIL: The same qualifier is used more than once" in str(excinfo.value)
ind = stix2.v21.Indicator(
type="indicator",
id=INDICATOR_ID,
created=now,
modified=now,
pattern=patrn,
pattern_type="stix",
pattern_version="2.0",
valid_from=epoch,
indicator_types=['malicious-activity'],
)
assert ind.id == INDICATOR_ID
assert ind.indicator_types == ['malicious-activity']
assert ind.pattern == patrn
assert ind.pattern_type == "stix"
assert ind.pattern_version == "2.0"

View File

@ -1117,6 +1117,20 @@ def test_network_traffic_socket_example():
assert nt.extensions['socket-ext'].socket_type == "SOCK_STREAM" assert nt.extensions['socket-ext'].socket_type == "SOCK_STREAM"
def test_correct_socket_options():
se1 = stix2.v21.SocketExt(
is_listening=True,
address_family="AF_INET",
protocol_family="PF_INET",
socket_type="SOCK_STREAM",
options={"ICMP6_RCVTIMEO": 100},
)
assert se1.address_family == "AF_INET"
assert se1.socket_type == "SOCK_STREAM"
assert se1.options == {"ICMP6_RCVTIMEO": 100}
def test_incorrect_socket_options(): def test_incorrect_socket_options():
with pytest.raises(ValueError) as excinfo: with pytest.raises(ValueError) as excinfo:
stix2.v21.SocketExt( stix2.v21.SocketExt(

View File

@ -32,7 +32,7 @@ class Bundle(_STIXBase):
kwargs['objects'] = list(args) + kwargs.get('objects', []) kwargs['objects'] = list(args) + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False) allow_custom = kwargs.get('allow_custom', False)
self.__allow_custom = allow_custom self._allow_custom = allow_custom
self._properties['objects'].contained.allow_custom = allow_custom self._properties['objects'].contained.allow_custom = allow_custom
interoperability = kwargs.get('interoperability', False) interoperability = kwargs.get('interoperability', False)
self.__interoperability = interoperability self.__interoperability = interoperability

View File

@ -3,8 +3,11 @@
from collections import OrderedDict from collections import OrderedDict
import itertools import itertools
from stix2patterns.validator import run_validator
from ..core import STIXDomainObject from ..core import STIXDomainObject
from ..custom import _custom_object_builder from ..custom import _custom_object_builder
from ..exceptions import InvalidValueError
from ..properties import ( from ..properties import (
BooleanProperty, IDProperty, IntegerProperty, ListProperty, BooleanProperty, IDProperty, IntegerProperty, ListProperty,
ObservableProperty, PatternProperty, ReferenceProperty, StringProperty, ObservableProperty, PatternProperty, ReferenceProperty, StringProperty,
@ -135,6 +138,11 @@ class Indicator(STIXDomainObject):
('granular_markings', ListProperty(GranularMarking)), ('granular_markings', ListProperty(GranularMarking)),
]) ])
def _check_object_constraints(self):
errors = run_validator(self.get('pattern'), '2.0')
if errors:
raise InvalidValueError(self.__class__, 'pattern', str(errors[0]))
class IntrusionSet(STIXDomainObject): class IntrusionSet(STIXDomainObject):
"""For more detailed information on this object's properties, see """For more detailed information on this object's properties, see
@ -212,7 +220,7 @@ class ObservedData(STIXDomainObject):
]) ])
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False) self._allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False) self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
super(ObservedData, self).__init__(*args, **kwargs) super(ObservedData, self).__init__(*args, **kwargs)

View File

@ -30,7 +30,7 @@ class Bundle(_STIXBase):
kwargs['objects'] = list(args) + kwargs.get('objects', []) kwargs['objects'] = list(args) + kwargs.get('objects', [])
allow_custom = kwargs.get('allow_custom', False) allow_custom = kwargs.get('allow_custom', False)
self.__allow_custom = allow_custom self._allow_custom = allow_custom
self._properties['objects'].contained.allow_custom = allow_custom self._properties['objects'].contained.allow_custom = allow_custom
interoperability = kwargs.get('interoperability', False) interoperability = kwargs.get('interoperability', False)
self.__interoperability = interoperability self.__interoperability = interoperability

View File

@ -598,8 +598,9 @@ class SocketExt(_Extension):
options = self.get('options') options = self.get('options')
if options is not None: if options is not None:
acceptable_prefixes = ["SO_", "ICMP_", "ICMP6_", "IP_", "IPV6_", "MCAST_", "TCP_", "IRLMP_"]
for key, val in options.items(): for key, val in options.items():
if key[:3] != "SO_": if key[:key.find('_') + 1] not in acceptable_prefixes:
raise ValueError("Incorrect options key") raise ValueError("Incorrect options key")
if not isinstance(val, int): if not isinstance(val, int):
raise ValueError("Options value must be an integer") raise ValueError("Options value must be an integer")

View File

@ -5,10 +5,13 @@ import itertools
import warnings import warnings
from six.moves.urllib.parse import quote_plus from six.moves.urllib.parse import quote_plus
from stix2patterns.validator import run_validator
from ..core import STIXDomainObject from ..core import STIXDomainObject
from ..custom import _custom_object_builder from ..custom import _custom_object_builder
from ..exceptions import PropertyPresenceError, STIXDeprecationWarning from ..exceptions import (
InvalidValueError, PropertyPresenceError, STIXDeprecationWarning,
)
from ..properties import ( from ..properties import (
BinaryProperty, BooleanProperty, EmbeddedObjectProperty, EnumProperty, BinaryProperty, BooleanProperty, EmbeddedObjectProperty, EnumProperty,
FloatProperty, IDProperty, IntegerProperty, ListProperty, FloatProperty, IDProperty, IntegerProperty, ListProperty,
@ -232,6 +235,16 @@ class Indicator(STIXDomainObject):
msg = "{0.id} 'valid_until' must be greater than 'valid_from'" msg = "{0.id} 'valid_until' must be greater than 'valid_from'"
raise ValueError(msg.format(self)) raise ValueError(msg.format(self))
if self.get('pattern_type') == "stix":
try:
pat_ver = self.get('pattern_version')
except AttributeError:
pat_ver = '2.1'
errors = run_validator(self.get('pattern'), pat_ver)
if errors:
raise InvalidValueError(self.__class__, 'pattern', str(errors[0]))
class Infrastructure(STIXDomainObject): class Infrastructure(STIXDomainObject):
# TODO: Add link # TODO: Add link
@ -578,7 +591,7 @@ class ObservedData(STIXDomainObject):
]) ])
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.__allow_custom = kwargs.get('allow_custom', False) self._allow_custom = kwargs.get('allow_custom', False)
self._properties['objects'].allow_custom = kwargs.get('allow_custom', False) self._properties['objects'].allow_custom = kwargs.get('allow_custom', False)
if "objects" in kwargs: if "objects" in kwargs:

View File

@ -1 +1 @@
__version__ = "1.2.1" __version__ = "1.3.0"

View File

@ -106,7 +106,7 @@ STIX_OBJ_DOCS = """
""".format( """.format(
_environ.creator_of.__doc__, _environ.creator_of.__doc__,
_environ.relationships.__doc__, _environ.relationships.__doc__,
_environ.related_to.__doc__ _environ.related_to.__doc__,
) )

14
tox.ini
View File

@ -1,5 +1,5 @@
[tox] [tox]
envlist = py27,py34,py35,py36,py37,style,isort-check,packaging envlist = py27,py34,py35,py36,py37,py38,style,isort-check,packaging
[testenv] [testenv]
deps = deps =
@ -9,8 +9,9 @@ deps =
pytest-cov pytest-cov
coverage coverage
taxii2-client taxii2-client
pyjarowinkler fuzzywuzzy
haversine haversine
python-Levenshtein
medallion medallion
commands = commands =
python -m pytest --cov=stix2 stix2/test/ --cov-report term-missing -W ignore::stix2.exceptions.STIXDeprecationWarning python -m pytest --cov=stix2 stix2/test/ --cov-report term-missing -W ignore::stix2.exceptions.STIXDeprecationWarning
@ -42,7 +43,8 @@ commands =
[travis] [travis]
python = python =
2.7: py27, style 2.7: py27, style
3.4: py34, style 3.4: py34
3.5: py35, style 3.5: py35
3.6: py36, style, packaging 3.6: py36
3.7: py37, style 3.7: py37
3.8: py38, style, packaging