Merge branch 'parse-cyber-observables' of https://github.com/oasis-open/cti-python-stix2 into parse-cyber-observables

stix2.1
clenk 2017-05-18 11:08:12 -04:00
commit 931de31a10
4 changed files with 305 additions and 19 deletions

View File

@ -56,9 +56,14 @@ class _STIXBase(collections.Mapping):
if count > 1 or (at_least_one and count == 0):
raise MutuallyExclusivePropertiesError(self.__class__, list_of_properties)
def _check_at_least_one_property(self, list_of_properties):
def _check_at_least_one_property(self, list_of_properties=None):
if not list_of_properties:
list_of_properties = sorted(list(self.__class__._properties.keys()))
if "type" in list_of_properties:
list_of_properties.remove("type")
current_properties = self.properties_populated()
if not set(list_of_properties).intersection(current_properties):
list_of_properties_populated = set(list_of_properties).intersection(current_properties)
if list_of_properties and (not list_of_properties_populated or list_of_properties_populated == set(["extensions"])):
raise AtLeastOnePropertyError(self.__class__, list_of_properties)
def _check_properties_dependency(self, list_of_properties, list_of_dependent_properties, values=[]):
@ -72,7 +77,7 @@ class _STIXBase(collections.Mapping):
if failed_dependency_pairs:
raise DependentPropertiestError(self.__class__, failed_dependency_pairs)
def _check_object_constaints(self):
def _check_object_constraints(self):
if self.granular_markings:
for m in self.granular_markings:
# TODO: check selectors
@ -106,7 +111,7 @@ class _STIXBase(collections.Mapping):
self._inner = setting_kwargs
self._check_object_constaints()
self._check_object_constraints()
def __getitem__(self, key):
return self._inner[key]

View File

@ -6,9 +6,11 @@ and do not have a '_type' attribute.
"""
from .base import _Observable, _STIXBase
from .exceptions import AtLeastOnePropertyError
from .properties import (BinaryProperty, BooleanProperty, DictionaryProperty,
EmbeddedObjectProperty, EnumProperty, ExtensionsProperty, FloatProperty,
HashesProperty, HexProperty, IntegerProperty, ListProperty,
EmbeddedObjectProperty, EnumProperty,
ExtensionsProperty, FloatProperty, HashesProperty,
HexProperty, IntegerProperty, ListProperty,
ObjectReferenceProperty, StringProperty,
TimestampProperty, TypeProperty)
@ -23,8 +25,8 @@ class Artifact(_Observable):
'hashes': HashesProperty(),
}
def _check_object_constaints(self):
super(Artifact, self)._check_object_constaints()
def _check_object_constraints(self):
super(Artifact, self)._check_object_constraints()
self._check_mutually_exclusive_properties(["payload_bin", "url"])
self._check_properties_dependency(["hashes"], ["url"])
@ -80,8 +82,8 @@ class EmailMIMEComponent(_STIXBase):
'content_disposition': StringProperty(),
}
def _check_object_constaints(self):
super(EmailMIMEComponent, self)._check_object_constaints()
def _check_object_constraints(self):
super(EmailMIMEComponent, self)._check_object_constraints()
self._check_at_least_one_property(["body", "body_raw_ref"])
@ -105,8 +107,8 @@ class EmailMessage(_Observable):
'raw_email_ref': ObjectReferenceProperty(valid_types='artifact'),
}
def _check_object_constaints(self):
super(EmailMessage, self)._check_object_constaints()
def _check_object_constraints(self):
super(EmailMessage, self)._check_object_constraints()
self._check_properties_dependency(["is_multipart"], ["body_multipart"])
# self._dependency(["is_multipart"], ["body"], [False])
@ -133,6 +135,10 @@ class NTFSExt(_STIXBase):
'alternate_data_streams': ListProperty(EmbeddedObjectProperty(type=AlternateDataStream)),
}
def _check_object_constraints(self):
super(NTFSExt, self)._check_object_constraints()
self._check_at_least_one_property()
class PDFExt(_STIXBase):
_properties = {
@ -143,6 +149,10 @@ class PDFExt(_STIXBase):
'pdfid1': StringProperty(),
}
def _check_object_constraints(self):
super(PDFExt, self)._check_object_constraints()
self._check_at_least_one_property()
class RasterImageExt(_STIXBase):
_properties = {
@ -153,6 +163,10 @@ class RasterImageExt(_STIXBase):
'exif_tags': DictionaryProperty(),
}
def _check_object_constraints(self):
super(RasterImageExt, self)._check_object_constraints()
self._check_at_least_one_property()
class WindowsPEOptionalHeaderType(_STIXBase):
_properties = {
@ -184,11 +198,15 @@ class WindowsPEOptionalHeaderType(_STIXBase):
'size_of_stack_commit': IntegerProperty(),
'size_of_heap_reserve': IntegerProperty(),
'size_of_heap_commit': IntegerProperty(),
'loader_fkags_hex': HexProperty(),
'loader_flags_hex': HexProperty(),
'number_of_rva_and_sizes': IntegerProperty(),
'hashes': HashesProperty(),
}
def _check_object_constraints(self):
super(WindowsPEOptionalHeaderType, self)._check_object_constraints()
self._check_at_least_one_property()
class WindowsPESection(_STIXBase):
_properties = {
@ -239,8 +257,8 @@ class File(_Observable):
'content_ref': ObjectReferenceProperty(valid_types='artifact'),
}
def _check_object_constaints(self):
super(File, self)._check_object_constaints()
def _check_object_constraints(self):
super(File, self)._check_object_constraints()
self._check_properties_dependency(["is_encrypted"], ["encryption_algorithm", "decryption_key"])
self._check_at_least_one_property(["hashes", "name"])
@ -338,6 +356,10 @@ class TCPExt(_STIXBase):
'dst_flags_hex': HexProperty(),
}
def _check_object_constraints(self):
super(TCPExt, self)._check_object_constraints()
self._check_at_least_one_property()
class NetworkTraffic(_Observable):
_type = 'network-traffic'
@ -363,8 +385,8 @@ class NetworkTraffic(_Observable):
'encapsulates_by_ref': ObjectReferenceProperty(valid_types='network-traffic'),
}
def _check_object_constaints(self):
super(NetworkTraffic, self)._check_object_constaints()
def _check_object_constraints(self):
super(NetworkTraffic, self)._check_object_constraints()
self._check_at_least_one_property(["src_ref", "dst_ref"])
@ -432,6 +454,20 @@ class Process(_Observable):
'child_refs': ListProperty(ObjectReferenceProperty('process')),
}
def _check_object_constraints(self):
# no need to check windows-service-ext, since it has a required property
super(Process, self)._check_object_constraints()
try:
self._check_at_least_one_property()
if self.extensions and "windows-process-ext" in self.extensions:
self.extensions["windows-process-ext"]._check_at_least_one_property()
except AtLeastOnePropertyError as enclosing_exc:
if not self.extensions:
raise enclosing_exc
else:
if "windows-process-ext" in self.extensions:
self.extensions["windows-process-ext"]._check_at_least_one_property()
class Software(_Observable):
_type = 'software'

View File

@ -15,8 +15,8 @@ class ExternalReference(_STIXBase):
'external_id': StringProperty(),
}
def _check_object_constaints(self):
super(ExternalReference, self)._check_object_constaints()
def _check_object_constraints(self):
super(ExternalReference, self)._check_object_constraints()
self._check_at_least_one_property(["description", "external_id", "url"])

View File

@ -600,6 +600,35 @@ def test_file_example():
assert f.decryption_key == "fred" # does the key have a format we can test for?
def test_file_example_with_NTFSExt():
f = stix2.File(name="abc.txt",
extensions={
"ntfs-ext": {
"alternate_data_streams": [
{
"name": "second.stream",
"size": 25536
}
]
}
})
assert f.name == "abc.txt"
assert f.extensions["ntfs-ext"].alternate_data_streams[0].size == 25536
def test_file_example_with_empty_NTFSExt():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.File(name="abc.txt",
extensions={
"ntfs-ext": {
}
})
assert excinfo.value.cls == stix2.NTFSExt
assert excinfo.value.fields == sorted(list(stix2.NTFSExt._properties.keys()))
def test_file_example_with_PDFExt():
f = stix2.File(name="qwerty.dll",
extensions={
@ -622,6 +651,115 @@ def test_file_example_with_PDFExt():
assert f.extensions["pdf-ext"].document_info_dict["Title"] == "Sample document"
def test_file_example_with_PDFExt_Object():
f = stix2.File(name="qwerty.dll",
extensions={
"pdf-ext":
stix2.PDFExt(version="1.7",
document_info_dict={
"Title": "Sample document",
"Author": "Adobe Systems Incorporated",
"Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
"Producer": "Acrobat Distiller 3.01 for Power Macintosh",
"CreationDate": "20070412090123-02"
},
pdfid0="DFCE52BD827ECF765649852119D",
pdfid1="57A1E0F9ED2AE523E313C")
})
assert f.name == "qwerty.dll"
assert f.extensions["pdf-ext"].version == "1.7"
assert f.extensions["pdf-ext"].document_info_dict["Title"] == "Sample document"
def test_file_example_with_RasterImageExt_Object():
f = stix2.File(name="qwerty.jpeg",
extensions={
"raster-image-ext": {
"bits_per_pixel": 123,
"exif_tags": {
"Make": "Nikon",
"Model": "D7000",
"XResolution": 4928,
"YResolution": 3264
}
}
})
assert f.name == "qwerty.jpeg"
assert f.extensions["raster-image-ext"].bits_per_pixel == 123
assert f.extensions["raster-image-ext"].exif_tags["XResolution"] == 4928
def test_file_example_with_WindowsPEBinaryExt():
f = stix2.File(name="qwerty.dll",
extensions={
"windows-pebinary-ext": {
"pe_type": "exe",
"machine_hex": "014c",
"number_of_sections": 4,
"time_date_stamp": "2016-01-22T12:31:12Z",
"pointer_to_symbol_table_hex": "74726144",
"number_of_symbols": 4542568,
"size_of_optional_header": 224,
"characteristics_hex": "818f",
"optional_header": {
"magic_hex": "010b",
"major_linker_version": 2,
"minor_linker_version": 25,
"size_of_code": 512,
"size_of_initialized_data": 283648,
"size_of_uninitialized_data": 0,
"address_of_entry_point": 4096,
"base_of_code": 4096,
"base_of_data": 8192,
"image_base": 14548992,
"section_alignment": 4096,
"file_alignment": 4096,
"major_os_version": 1,
"minor_os_version": 0,
"major_image_version": 0,
"minor_image_version": 0,
"major_subsystem_version": 4,
"minor_subsystem_version": 0,
"win32_version_value_hex": "00",
"size_of_image": 299008,
"size_of_headers": 4096,
"checksum_hex": "00",
"subsystem_hex": "03",
"dll_characteristics_hex": "00",
"size_of_stack_reserve": 100000,
"size_of_stack_commit": 8192,
"size_of_heap_reserve": 100000,
"size_of_heap_commit": 4096,
"loader_flags_hex": "abdbffde",
"number_of_rva_and_sizes": 3758087646
},
"sections": [
{
"name": "CODE",
"entropy": 0.061089
},
{
"name": "DATA",
"entropy": 7.980693
},
{
"name": "NicolasB",
"entropy": 0.607433
},
{
"name": ".idata",
"entropy": 0.607433
}
]
}
})
assert f.name == "qwerty.dll"
assert f.extensions["windows-pebinary-ext"].sections[2].entropy == 0.607433
def test_file_example_encryption_error():
with pytest.raises(stix2.exceptions.DependentPropertiestError) as excinfo:
stix2.File(name="qwerty.dll",
@ -732,6 +870,113 @@ def test_mutex_example():
assert m.name == "barney"
def test_process_example():
p = stix2.Process(_valid_refs=["0"],
pid=1221,
name="gedit-bin",
created="2016-01-20T14:11:25.55Z",
arguments=["--new-window"],
binary_ref="0")
assert p.name == "gedit-bin"
assert p.arguments == ["--new-window"]
def test_process_example_empty_error():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.Process()
assert excinfo.value.cls == stix2.Process
properties_of_process = list(stix2.Process._properties.keys())
properties_of_process.remove("type")
assert excinfo.value.fields == sorted(properties_of_process)
def test_process_example_empty_with_extensions():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.Process(extensions={
"windows-process-ext": {}
})
assert excinfo.value.cls == stix2.WindowsProcessExt
properties_of_extension = list(stix2.WindowsProcessExt._properties.keys())
assert excinfo.value.fields == sorted(properties_of_extension)
def test_process_example_windows_process_ext_empty():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.Process(pid=1221,
name="gedit-bin",
extensions={
"windows-process-ext": {}
})
assert excinfo.value.cls == stix2.WindowsProcessExt
properties_of_extension = list(stix2.WindowsProcessExt._properties.keys())
assert excinfo.value.fields == sorted(properties_of_extension)
def test_process_example_extensions_empty():
with pytest.raises(stix2.exceptions.AtLeastOnePropertyError) as excinfo:
stix2.Process(extensions={
})
assert excinfo.value.cls == stix2.Process
properties_of_process = list(stix2.Process._properties.keys())
properties_of_process.remove("type")
assert excinfo.value.fields == sorted(properties_of_process)
def test_process_example_with_WindowsProcessExt_Object():
p = stix2.Process(extensions={
"windows-process-ext": stix2.WindowsProcessExt(aslr_enabled=True,
dep_enabled=True,
priority="HIGH_PRIORITY_CLASS",
owner_sid="S-1-5-21-186985262-1144665072-74031268-1309") # noqa
})
assert p.extensions["windows-process-ext"].dep_enabled
assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309"
def test_process_example_with_WindowsServiceExt():
p = stix2.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING"
}
})
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"
def test_process_example_with_WindowsProcessServiceExt():
p = stix2.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING"
},
"windows-process-ext": {
"aslr_enabled": True,
"dep_enabled": True,
"priority": "HIGH_PRIORITY_CLASS",
"owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309"
}
})
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"
assert p.extensions["windows-process-ext"].dep_enabled
assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309"
def test_software_example():
s = stix2.Software(name="Word",
cpe="cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*",