Merge branch 'parse-cyber-observables'
						commit
						f4c813d84b
					
				|  | @ -3,8 +3,10 @@ | |||
| # flake8: noqa | ||||
| 
 | ||||
| from .bundle import Bundle | ||||
| from .observables import Artifact, AutonomousSystem, EmailAddress, \ | ||||
|     EmailMessage, File | ||||
| from .observables import Artifact, AutonomousSystem, Directory, DomainName, \ | ||||
|     EmailAddress, EmailMessage, File, IPv4Address, IPv6Address, MACAddress, \ | ||||
|     Mutex, NetworkTraffic, Process, Software, URL, UserAccount, WindowsRegistryKey, \ | ||||
|     X509Certificate | ||||
| from .other import ExternalReference, KillChainPhase, MarkingDefinition, \ | ||||
|     GranularMarking, StatementMarking, TLPMarking | ||||
| from .sdo import AttackPattern, Campaign, CourseOfAction, Identity, Indicator, \ | ||||
|  | @ -36,9 +38,22 @@ OBJ_MAP = { | |||
| OBJ_MAP_OBSERVABLE = { | ||||
|     'artifact': Artifact, | ||||
|     'autonomous-system': AutonomousSystem, | ||||
|     'directory': Directory, | ||||
|     'domain-name': DomainName, | ||||
|     'email-address': EmailAddress, | ||||
|     'email-message': EmailMessage, | ||||
|     'file': File, | ||||
|     'ipv4-addr': IPv4Address, | ||||
|     'ipv6-addr': IPv6Address, | ||||
|     'mac-addr': MACAddress, | ||||
|     'mutex': Mutex, | ||||
|     'network-traffic': NetworkTraffic, | ||||
|     'process': Process, | ||||
|     'software': Software, | ||||
|     'url': URL, | ||||
|     'user-account': UserAccount, | ||||
|     'windows-registry-key': WindowsRegistryKey, | ||||
|     'x509-certificate': X509Certificate, | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -148,7 +148,11 @@ class _STIXBase(collections.Mapping): | |||
| class Observable(_STIXBase): | ||||
| 
 | ||||
|     def __init__(self, **kwargs): | ||||
|         self._STIXBase__valid_refs = kwargs.pop('_valid_refs') | ||||
|         # the constructor might be called independently of an observed data object | ||||
|         if '_valid_refs' in kwargs: | ||||
|             self._STIXBase__valid_refs = kwargs.pop('_valid_refs') | ||||
|         else: | ||||
|             self._STIXBase__valid_refs = [] | ||||
|         super(Observable, self).__init__(**kwargs) | ||||
| 
 | ||||
|     def _check_property(self, prop_name, prop, kwargs): | ||||
|  |  | |||
|  | @ -6,11 +6,10 @@ and do not have a '_type' attribute. | |||
| """ | ||||
| 
 | ||||
| from .base import _STIXBase, Observable | ||||
| from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty, | ||||
|                          EmbeddedObjectProperty, HashesProperty, | ||||
|                          IntegerProperty, ListProperty, | ||||
|                          ObjectReferenceProperty, StringProperty, | ||||
|                          TimestampProperty, TypeProperty) | ||||
| from .properties import BinaryProperty, BooleanProperty, DictionaryProperty, \ | ||||
|     EmbeddedObjectProperty, HashesProperty, HexProperty, IntegerProperty, \ | ||||
|     ListProperty, ObjectReferenceProperty, Property, StringProperty, \ | ||||
|     TimestampProperty, TypeProperty | ||||
| 
 | ||||
| 
 | ||||
| class Artifact(Observable): | ||||
|  | @ -34,6 +33,29 @@ class AutonomousSystem(Observable): | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Directory(Observable): | ||||
|     _type = 'directory' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'path': StringProperty(required=True), | ||||
|         'path_enc': StringProperty(), | ||||
|         # these are not the created/modified timestamps of the object itself | ||||
|         'created': TimestampProperty(), | ||||
|         'modified': TimestampProperty(), | ||||
|         'accessed': TimestampProperty(), | ||||
|         'contains_refs': ListProperty(ObjectReferenceProperty), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class DomainName(Observable): | ||||
|     _type = 'domain-name' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|         'resolves_to_refs': ListProperty(ObjectReferenceProperty), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class EmailAddress(Observable): | ||||
|     _type = 'email-address' | ||||
|     _properties = { | ||||
|  | @ -78,4 +100,187 @@ class File(Observable): | |||
|     _type = 'file' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         # extensions | ||||
|         'hashes': HashesProperty(), | ||||
|         'size': IntegerProperty(), | ||||
|         'name': StringProperty(), | ||||
|         'name_enc': StringProperty(), | ||||
|         'magic_number_hex': HexProperty(), | ||||
|         'mime_type': StringProperty(), | ||||
|         # these are not the created/modified timestamps of the object itself | ||||
|         'created': TimestampProperty(), | ||||
|         'modified': TimestampProperty(), | ||||
|         'accessed': TimestampProperty(), | ||||
|         'parent_directory_ref': ObjectReferenceProperty(), | ||||
|         'is_encrypted': BooleanProperty(), | ||||
|         'encyption_algorithm': StringProperty(), | ||||
|         'decryption_key': StringProperty(), | ||||
|         'contains_refs': ListProperty(ObjectReferenceProperty), | ||||
|         'content_ref': ObjectReferenceProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class IPv4Address(Observable): | ||||
|     _type = 'ipv4-addr' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|         'resolves_to_refs': ListProperty(ObjectReferenceProperty), | ||||
|         'belongs_to_refs': ListProperty(ObjectReferenceProperty), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class IPv6Address(Observable): | ||||
|     _type = 'ipv6-addr' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|         'resolves_to_refs': ListProperty(ObjectReferenceProperty), | ||||
|         'belongs_to_refs': ListProperty(ObjectReferenceProperty), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class MACAddress(Observable): | ||||
|     _type = 'mac-addr' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Mutex(Observable): | ||||
|     _type = 'mutex' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'name': StringProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class NetworkTraffic(Observable): | ||||
|     _type = 'network-traffic' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         # extensions | ||||
|         'start': TimestampProperty(), | ||||
|         'end': TimestampProperty(), | ||||
|         'is_active': BooleanProperty(), | ||||
|         'src_ref': ObjectReferenceProperty(), | ||||
|         'dst_ref': ObjectReferenceProperty(), | ||||
|         'src_port': IntegerProperty(), | ||||
|         'dst_port': IntegerProperty(), | ||||
|         'protocols': ListProperty(StringProperty), | ||||
|         'src_byte_count': IntegerProperty(), | ||||
|         'dst_byte_count': IntegerProperty(), | ||||
|         'src_packets': IntegerProperty(), | ||||
|         'dst_packets': IntegerProperty(), | ||||
|         'ipfix': DictionaryProperty(), | ||||
|         'src_payload_ref': ObjectReferenceProperty(), | ||||
|         'dst_payload_ref': ObjectReferenceProperty(), | ||||
|         'encapsulates_refs': ListProperty(ObjectReferenceProperty), | ||||
|         'encapsulates_by_ref': ObjectReferenceProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Process(Observable): | ||||
|     _type = 'process' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         # extensions | ||||
|         'is_hidden': BooleanProperty(), | ||||
|         'pid': IntegerProperty(), | ||||
|         'name': StringProperty(), | ||||
|         # this is not the created timestamps of the object itself | ||||
|         'created': TimestampProperty(), | ||||
|         'cwd': StringProperty(), | ||||
|         'arguments': ListProperty(StringProperty), | ||||
|         'command_line': StringProperty(), | ||||
|         'environment_variables': DictionaryProperty(), | ||||
|         'opened_connection_refs': ListProperty(ObjectReferenceProperty), | ||||
|         'creator_user_ref': ObjectReferenceProperty(), | ||||
|         'binary_ref': ObjectReferenceProperty(), | ||||
|         'parent_ref': ObjectReferenceProperty(), | ||||
|         'child_refs': ListProperty(ObjectReferenceProperty), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Software(Observable): | ||||
|     _type = 'software' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'name': StringProperty(required=True), | ||||
|         'cpe': StringProperty(), | ||||
|         'languages': ListProperty(StringProperty), | ||||
|         'vendor': StringProperty(), | ||||
|         'version': StringProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class URL(Observable): | ||||
|     _type = 'url' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'value': StringProperty(required=True), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class UserAccount(Observable): | ||||
|     _type = 'user-account' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         # extensions | ||||
|         'user_id': StringProperty(required=True), | ||||
|         'account_login': StringProperty(), | ||||
|         'account_type': StringProperty(), | ||||
|         'display_name': StringProperty(), | ||||
|         'is_service_account': BooleanProperty(), | ||||
|         'is_privileged': BooleanProperty(), | ||||
|         'can_escalate_privs': BooleanProperty(), | ||||
|         'is_disabled': BooleanProperty(), | ||||
|         'account_created': TimestampProperty(), | ||||
|         'account_expires': TimestampProperty(), | ||||
|         'password_last_changed': TimestampProperty(), | ||||
|         'account_first_login': TimestampProperty(), | ||||
|         'account_last_login': TimestampProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class WindowsRegistryValueType(_STIXBase): | ||||
|     _type = 'windows-registry-value-type' | ||||
|     _properties = { | ||||
|         'name': StringProperty(required=True), | ||||
|         'data': StringProperty(), | ||||
|         'data_type': Property() | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class WindowsRegistryKey(Observable): | ||||
|     _type = 'windows-registry-key' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'key': StringProperty(required=True), | ||||
|         'values': ListProperty(WindowsRegistryValueType), | ||||
|         # this is not the modified timestamps of the object itself | ||||
|         'modified': TimestampProperty(), | ||||
|         'creator_user_ref': ObjectReferenceProperty(), | ||||
|         'number_of_subkeys': IntegerProperty(), | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class X509Certificate(Observable): | ||||
|     _type = 'x509-certificate' | ||||
|     _properties = { | ||||
|         'type': TypeProperty(_type), | ||||
|         'is_self_signed': BooleanProperty(), | ||||
|         'hashes': HashesProperty(), | ||||
|         'version': StringProperty(), | ||||
|         'serial_number': StringProperty(), | ||||
|         'signature_algorithm': StringProperty(), | ||||
|         'issuer': StringProperty(), | ||||
|         'validity_not_before': TimestampProperty(), | ||||
|         'validity_not_after': TimestampProperty(), | ||||
|         'subject': StringProperty(), | ||||
|         'subject_public_key_algorithm': StringProperty(), | ||||
|         'subject_public_key_modulus': StringProperty(), | ||||
|         'subject_public_key_exponent': IntegerProperty(), | ||||
|         'x509_v3_extensions': Property(), | ||||
|     } | ||||
|  |  | |||
|  | @ -44,6 +44,84 @@ def test_observed_data_example(): | |||
|     assert str(observed_data) == EXPECTED | ||||
| 
 | ||||
| 
 | ||||
| EXPECTED_WITH_REF = """{ | ||||
|     "created": "2016-04-06T19:58:16Z", | ||||
|     "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", | ||||
|     "first_observed": "2015-12-21T19:00:00Z", | ||||
|     "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", | ||||
|     "last_observed": "2015-12-21T19:00:00Z", | ||||
|     "modified": "2016-04-06T19:58:16Z", | ||||
|     "number_observed": 50, | ||||
|     "objects": { | ||||
|         "0": { | ||||
|             "name": "foo.exe", | ||||
|             "type": "file" | ||||
|         }, | ||||
|         "1": { | ||||
|             "contains_refs": [ | ||||
|                 "0" | ||||
|             ], | ||||
|             "path": "/usr/home", | ||||
|             "type": "directory" | ||||
|         } | ||||
|     }, | ||||
|     "type": "observed-data" | ||||
| }""" | ||||
| 
 | ||||
| 
 | ||||
| def test_observed_data_example_with_refs(): | ||||
|     observed_data = stix2.ObservedData( | ||||
|         id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", | ||||
|         created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", | ||||
|         created="2016-04-06T19:58:16Z", | ||||
|         modified="2016-04-06T19:58:16Z", | ||||
|         first_observed="2015-12-21T19:00:00Z", | ||||
|         last_observed="2015-12-21T19:00:00Z", | ||||
|         number_observed=50, | ||||
|         objects={ | ||||
|             "0": { | ||||
|                 "type": "file", | ||||
|                 "name": "foo.exe" | ||||
|             }, | ||||
|             "1": { | ||||
|                 "type": "directory", | ||||
|                 "path": "/usr/home", | ||||
|                 "contains_refs": ["0"] | ||||
|             } | ||||
|         }, | ||||
|     ) | ||||
| 
 | ||||
|     assert str(observed_data) == EXPECTED_WITH_REF | ||||
| 
 | ||||
| 
 | ||||
| def test_observed_data_example_with_bad_refs(): | ||||
|     with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: | ||||
|         stix2.ObservedData( | ||||
|             id="observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", | ||||
|             created_by_ref="identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", | ||||
|             created="2016-04-06T19:58:16Z", | ||||
|             modified="2016-04-06T19:58:16Z", | ||||
|             first_observed="2015-12-21T19:00:00Z", | ||||
|             last_observed="2015-12-21T19:00:00Z", | ||||
|             number_observed=50, | ||||
|             objects={ | ||||
|                 "0": { | ||||
|                     "type": "file", | ||||
|                     "name": "foo.exe" | ||||
|                 }, | ||||
|                 "1": { | ||||
|                     "type": "directory", | ||||
|                     "path": "/usr/home", | ||||
|                     "contains_refs": ["2"] | ||||
|                 } | ||||
|             }, | ||||
|         ) | ||||
| 
 | ||||
|     assert excinfo.value.cls == stix2.ObservedData | ||||
|     assert excinfo.value.prop_name == "objects" | ||||
|     assert excinfo.value.reason == "Invalid object reference for 'Directory:contains_refs': '2' is not a valid object in local scope" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize("data", [ | ||||
|     EXPECTED, | ||||
|     { | ||||
|  | @ -198,4 +276,124 @@ def test_parse_email_message(data): | |||
|     assert odata.body_multipart[0].content_disposition == "inline" | ||||
| 
 | ||||
| 
 | ||||
| # TODO: Add other examples | ||||
| #  creating cyber observables directly | ||||
| 
 | ||||
| def test_directory_example(): | ||||
|     dir = stix2.Directory(_valid_refs=["1"], | ||||
|                           path='/usr/lib', | ||||
|                           created="2015-12-21T19:00:00Z", | ||||
|                           modified="2015-12-24T19:00:00Z", | ||||
|                           accessed="2015-12-21T20:00:00Z", | ||||
|                           contains_refs=["1"]) | ||||
| 
 | ||||
|     assert dir.path == '/usr/lib' | ||||
|     assert dir.created == dt.datetime(2015, 12, 21, 19, 0, 0, tzinfo=pytz.utc) | ||||
|     assert dir.modified == dt.datetime(2015, 12, 24, 19, 0, 0, tzinfo=pytz.utc) | ||||
|     assert dir.accessed == dt.datetime(2015, 12, 21, 20, 0, 0, tzinfo=pytz.utc) | ||||
|     assert dir.contains_refs == ["1"] | ||||
| 
 | ||||
| 
 | ||||
| def test_directory_example_ref_error(): | ||||
|     with pytest.raises(stix2.exceptions.InvalidObjRefError) as excinfo: | ||||
|         stix2.Directory(_valid_refs=[], | ||||
|                         path='/usr/lib', | ||||
|                         created="2015-12-21T19:00:00Z", | ||||
|                         modified="2015-12-24T19:00:00Z", | ||||
|                         accessed="2015-12-21T20:00:00Z", | ||||
|                         contains_refs=["1"]) | ||||
| 
 | ||||
|     assert excinfo.value.cls == stix2.Directory | ||||
|     assert excinfo.value.prop_name == "contains_refs" | ||||
| 
 | ||||
| 
 | ||||
| def test_domain_name_example(): | ||||
|     dn = stix2.DomainName(_valid_refs=["1"], | ||||
|                           value="example.com", | ||||
|                           resolves_to_refs=["1"]) | ||||
| 
 | ||||
|     assert dn.value == "example.com" | ||||
|     assert dn.resolves_to_refs == ["1"] | ||||
| 
 | ||||
| 
 | ||||
| def test_file_example(): | ||||
|     f = stix2.File(name="qwerty.dll", | ||||
|                    hashes={ | ||||
|                     "SHA-256": "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a"}, | ||||
|                    size=100, | ||||
|                    magic_number_hex="1C", | ||||
|                    mime_type="application/msword", | ||||
|                    created="2016-12-21T19:00:00Z", | ||||
|                    modified="2016-12-24T19:00:00Z", | ||||
|                    accessed="2016-12-21T20:00:00Z", | ||||
|                    is_encrypted=True, | ||||
|                    encyption_algorithm="AES128-CBC", | ||||
|                    decryption_key="fred" | ||||
|                    ) | ||||
| 
 | ||||
|     assert f.name == "qwerty.dll" | ||||
|     assert f.size == 100 | ||||
|     assert f.magic_number_hex == "1C" | ||||
|     assert f.hashes["SHA-256"] == "ceafbfd424be2ca4a5f0402cae090dda2fb0526cf521b60b60077c0f622b285a" | ||||
|     assert f.mime_type == "application/msword" | ||||
|     assert f.created == dt.datetime(2016, 12, 21, 19, 0, 0, tzinfo=pytz.utc) | ||||
|     assert f.modified == dt.datetime(2016, 12, 24, 19, 0, 0, tzinfo=pytz.utc) | ||||
|     assert f.accessed == dt.datetime(2016, 12, 21, 20, 0, 0, tzinfo=pytz.utc) | ||||
|     assert f.is_encrypted | ||||
|     assert f.encyption_algorithm == "AES128-CBC" | ||||
|     assert f.decryption_key == "fred"   # does the key have a format we can test for? | ||||
| 
 | ||||
| 
 | ||||
| # def test_file_example_encyption_error(): | ||||
| #     f = stix2.File(name="qwerty.dll", | ||||
| #                    is_encrypted=False, | ||||
| #                    encyption_algorithm="AES128-CBC" | ||||
| #                    ) | ||||
| # | ||||
| #     assert f.name == "qwerty.dll" | ||||
| #     assert f.is_encrypted == False | ||||
| #     assert f.encyption_algorithm == "AES128-CBC" | ||||
| 
 | ||||
| 
 | ||||
| def test_ip4_address_example(): | ||||
|     ip4 = stix2.IPv4Address(_valid_refs=["1", "4", "5"], | ||||
|                             value="198.51.100.3", | ||||
|                             resolves_to_refs=["4", "5"]) | ||||
| 
 | ||||
|     assert ip4.value == "198.51.100.3" | ||||
|     assert ip4.resolves_to_refs == ["4", "5"] | ||||
| 
 | ||||
| 
 | ||||
| def test_ip4_address_example_cidr(): | ||||
|     ip4 = stix2.IPv4Address(value="198.51.100.0/24") | ||||
| 
 | ||||
|     assert ip4.value == "198.51.100.0/24" | ||||
| 
 | ||||
| 
 | ||||
| def test_ip6_address_example(): | ||||
|     ip6 = stix2.IPv6Address(value="2001:0db8:85a3:0000:0000:8a2e:0370:7334") | ||||
| 
 | ||||
|     assert ip6.value == "2001:0db8:85a3:0000:0000:8a2e:0370:7334" | ||||
| 
 | ||||
| 
 | ||||
| def test_mac_address_example(): | ||||
|     ip6 = stix2.MACAddress(value="d2:fb:49:24:37:18") | ||||
| 
 | ||||
|     assert ip6.value == "d2:fb:49:24:37:18" | ||||
| 
 | ||||
| 
 | ||||
| def test_mutex_example(): | ||||
|     m = stix2.Mutex(name="barney") | ||||
| 
 | ||||
|     assert m.name == "barney" | ||||
| 
 | ||||
| 
 | ||||
| def test_software_example(): | ||||
|     s = stix2.Software(name="Word", | ||||
|                        cpe="cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*", | ||||
|                        version="2002", | ||||
|                        vendor="Microsoft") | ||||
| 
 | ||||
|     assert s.name == "Word" | ||||
|     assert s.cpe == "cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*" | ||||
|     assert s.version == "2002" | ||||
|     assert s.vendor == "Microsoft" | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	 clenk
						clenk