Fix call to update(), add a register_marking decorator. Add type to Markings.

stix2.1
Emmanuelle Vargas-Gonzalez 2017-08-14 11:52:34 -04:00
parent 68afd6b38e
commit 26297f9730
3 changed files with 65 additions and 57 deletions

View File

@ -10,7 +10,7 @@ class Bundle(_STIXBase):
_type = 'bundle' _type = 'bundle'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('id', IDProperty(_type)), ('id', IDProperty(_type)),
('spec_version', Property(fixed="2.0")), ('spec_version', Property(fixed="2.0")),

View File

@ -78,7 +78,7 @@ class ExtensionsProperty(DictionaryProperty):
class Artifact(_Observable): class Artifact(_Observable):
_type = 'artifact' _type = 'artifact'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('mime_type', StringProperty()), ('mime_type', StringProperty()),
('payload_bin', BinaryProperty()), ('payload_bin', BinaryProperty()),
@ -95,7 +95,7 @@ class Artifact(_Observable):
class AutonomousSystem(_Observable): class AutonomousSystem(_Observable):
_type = 'autonomous-system' _type = 'autonomous-system'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('number', IntegerProperty()), ('number', IntegerProperty()),
('name', StringProperty()), ('name', StringProperty()),
@ -106,7 +106,7 @@ class AutonomousSystem(_Observable):
class Directory(_Observable): class Directory(_Observable):
_type = 'directory' _type = 'directory'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('path', StringProperty(required=True)), ('path', StringProperty(required=True)),
('path_enc', StringProperty()), ('path_enc', StringProperty()),
@ -121,7 +121,7 @@ class Directory(_Observable):
class DomainName(_Observable): class DomainName(_Observable):
_type = 'domain-name' _type = 'domain-name'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types=['ipv4-addr', 'ipv6-addr', 'domain-name']))),
@ -131,7 +131,7 @@ class DomainName(_Observable):
class EmailAddress(_Observable): class EmailAddress(_Observable):
_type = 'email-addr' _type = 'email-addr'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('display_name', StringProperty()), ('display_name', StringProperty()),
@ -141,7 +141,7 @@ class EmailAddress(_Observable):
class EmailMIMEComponent(_STIXBase): class EmailMIMEComponent(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('body', StringProperty()), ('body', StringProperty()),
('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])), ('body_raw_ref', ObjectReferenceProperty(valid_types=['artifact', 'file'])),
('content_type', StringProperty()), ('content_type', StringProperty()),
@ -156,7 +156,7 @@ class EmailMIMEComponent(_STIXBase):
class EmailMessage(_Observable): class EmailMessage(_Observable):
_type = 'email-message' _type = 'email-message'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('is_multipart', BooleanProperty(required=True)), ('is_multipart', BooleanProperty(required=True)),
('date', TimestampProperty()), ('date', TimestampProperty()),
@ -184,7 +184,7 @@ class EmailMessage(_Observable):
class ArchiveExt(_Extension): class ArchiveExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)), ('contains_refs', ListProperty(ObjectReferenceProperty(valid_types='file'), required=True)),
('version', StringProperty()), ('version', StringProperty()),
('comment', StringProperty()), ('comment', StringProperty()),
@ -193,7 +193,7 @@ class ArchiveExt(_Extension):
class AlternateDataStream(_STIXBase): class AlternateDataStream(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('name', StringProperty(required=True)), ('name', StringProperty(required=True)),
('hashes', HashesProperty()), ('hashes', HashesProperty()),
('size', IntegerProperty()), ('size', IntegerProperty()),
@ -202,7 +202,7 @@ class AlternateDataStream(_STIXBase):
class NTFSExt(_Extension): class NTFSExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('sid', StringProperty()), ('sid', StringProperty()),
('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))), ('alternate_data_streams', ListProperty(EmbeddedObjectProperty(type=AlternateDataStream))),
]) ])
@ -210,7 +210,7 @@ class NTFSExt(_Extension):
class PDFExt(_Extension): class PDFExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('version', StringProperty()), ('version', StringProperty()),
('is_optimized', BooleanProperty()), ('is_optimized', BooleanProperty()),
('document_info_dict', DictionaryProperty()), ('document_info_dict', DictionaryProperty()),
@ -221,7 +221,7 @@ class PDFExt(_Extension):
class RasterImageExt(_Extension): class RasterImageExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('image_height', IntegerProperty()), ('image_height', IntegerProperty()),
('image_weight', IntegerProperty()), ('image_weight', IntegerProperty()),
('bits_per_pixel', IntegerProperty()), ('bits_per_pixel', IntegerProperty()),
@ -232,7 +232,7 @@ class RasterImageExt(_Extension):
class WindowsPEOptionalHeaderType(_STIXBase): class WindowsPEOptionalHeaderType(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('magic_hex', HexProperty()), ('magic_hex', HexProperty()),
('major_linker_version', IntegerProperty()), ('major_linker_version', IntegerProperty()),
('minor_linker_version', IntegerProperty()), ('minor_linker_version', IntegerProperty()),
@ -273,7 +273,7 @@ class WindowsPEOptionalHeaderType(_STIXBase):
class WindowsPESection(_STIXBase): class WindowsPESection(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('name', StringProperty(required=True)), ('name', StringProperty(required=True)),
('size', IntegerProperty()), ('size', IntegerProperty()),
('entropy', FloatProperty()), ('entropy', FloatProperty()),
@ -283,7 +283,7 @@ class WindowsPESection(_STIXBase):
class WindowsPEBinaryExt(_Extension): class WindowsPEBinaryExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('pe_type', StringProperty(required=True)), # open_vocab ('pe_type', StringProperty(required=True)), # open_vocab
('imphash', StringProperty()), ('imphash', StringProperty()),
('machine_hex', HexProperty()), ('machine_hex', HexProperty()),
@ -302,7 +302,7 @@ class WindowsPEBinaryExt(_Extension):
class File(_Observable): class File(_Observable):
_type = 'file' _type = 'file'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)),
('hashes', HashesProperty()), ('hashes', HashesProperty()),
@ -332,7 +332,7 @@ class File(_Observable):
class IPv4Address(_Observable): class IPv4Address(_Observable):
_type = 'ipv4-addr' _type = 'ipv4-addr'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -343,7 +343,7 @@ class IPv4Address(_Observable):
class IPv6Address(_Observable): class IPv6Address(_Observable):
_type = 'ipv6-addr' _type = 'ipv6-addr'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))), ('resolves_to_refs', ListProperty(ObjectReferenceProperty(valid_types='mac-addr'))),
@ -354,7 +354,7 @@ class IPv6Address(_Observable):
class MACAddress(_Observable): class MACAddress(_Observable):
_type = 'mac-addr' _type = 'mac-addr'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
]) ])
@ -363,7 +363,7 @@ class MACAddress(_Observable):
class Mutex(_Observable): class Mutex(_Observable):
_type = 'mutex' _type = 'mutex'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('name', StringProperty()), ('name', StringProperty()),
]) ])
@ -371,7 +371,7 @@ class Mutex(_Observable):
class HTTPRequestExt(_Extension): class HTTPRequestExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('request_method', StringProperty(required=True)), ('request_method', StringProperty(required=True)),
('request_value', StringProperty(required=True)), ('request_value', StringProperty(required=True)),
('request_version', StringProperty()), ('request_version', StringProperty()),
@ -383,7 +383,7 @@ class HTTPRequestExt(_Extension):
class ICMPExt(_Extension): class ICMPExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('icmp_type_hex', HexProperty(required=True)), ('icmp_type_hex', HexProperty(required=True)),
('icmp_code_hex', HexProperty(required=True)), ('icmp_code_hex', HexProperty(required=True)),
]) ])
@ -391,7 +391,7 @@ class ICMPExt(_Extension):
class SocketExt(_Extension): class SocketExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('address_family', EnumProperty([ ('address_family', EnumProperty([
"AF_UNSPEC", "AF_UNSPEC",
"AF_INET", "AF_INET",
@ -425,7 +425,7 @@ class SocketExt(_Extension):
class TCPExt(_Extension): class TCPExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('src_flags_hex', HexProperty()), ('src_flags_hex', HexProperty()),
('dst_flags_hex', HexProperty()), ('dst_flags_hex', HexProperty()),
]) ])
@ -434,7 +434,7 @@ class TCPExt(_Extension):
class NetworkTraffic(_Observable): class NetworkTraffic(_Observable):
_type = 'network-traffic' _type = 'network-traffic'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)),
('start', TimestampProperty()), ('start', TimestampProperty()),
@ -463,7 +463,7 @@ class NetworkTraffic(_Observable):
class WindowsProcessExt(_Extension): class WindowsProcessExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('aslr_enabled', BooleanProperty()), ('aslr_enabled', BooleanProperty()),
('dep_enabled', BooleanProperty()), ('dep_enabled', BooleanProperty()),
('priority', StringProperty()), ('priority', StringProperty()),
@ -475,7 +475,7 @@ class WindowsProcessExt(_Extension):
class WindowsServiceExt(_Extension): class WindowsServiceExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('service_name', StringProperty(required=True)), ('service_name', StringProperty(required=True)),
('descriptions', ListProperty(StringProperty)), ('descriptions', ListProperty(StringProperty)),
('display_name', StringProperty()), ('display_name', StringProperty()),
@ -509,7 +509,7 @@ class WindowsServiceExt(_Extension):
class Process(_Observable): class Process(_Observable):
_type = 'process' _type = 'process'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)),
('is_hidden', BooleanProperty()), ('is_hidden', BooleanProperty()),
@ -546,7 +546,7 @@ class Process(_Observable):
class Software(_Observable): class Software(_Observable):
_type = 'software' _type = 'software'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('name', StringProperty(required=True)), ('name', StringProperty(required=True)),
('cpe', StringProperty()), ('cpe', StringProperty()),
@ -559,7 +559,7 @@ class Software(_Observable):
class URL(_Observable): class URL(_Observable):
_type = 'url' _type = 'url'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('value', StringProperty(required=True)), ('value', StringProperty(required=True)),
]) ])
@ -567,7 +567,7 @@ class URL(_Observable):
class UNIXAccountExt(_Extension): class UNIXAccountExt(_Extension):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('gid', IntegerProperty()), ('gid', IntegerProperty()),
('groups', ListProperty(StringProperty)), ('groups', ListProperty(StringProperty)),
('home_dir', StringProperty()), ('home_dir', StringProperty()),
@ -578,7 +578,7 @@ class UNIXAccountExt(_Extension):
class UserAccount(_Observable): class UserAccount(_Observable):
_type = 'user-account' _type = 'user-account'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('extensions', ExtensionsProperty(enclosing_type=_type)), ('extensions', ExtensionsProperty(enclosing_type=_type)),
('user_id', StringProperty(required=True)), ('user_id', StringProperty(required=True)),
@ -600,7 +600,7 @@ class UserAccount(_Observable):
class WindowsRegistryValueType(_STIXBase): class WindowsRegistryValueType(_STIXBase):
_type = 'windows-registry-value-type' _type = 'windows-registry-value-type'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('name', StringProperty(required=True)), ('name', StringProperty(required=True)),
('data', StringProperty()), ('data', StringProperty()),
('data_type', EnumProperty([ ('data_type', EnumProperty([
@ -624,7 +624,7 @@ class WindowsRegistryValueType(_STIXBase):
class WindowsRegistryKey(_Observable): class WindowsRegistryKey(_Observable):
_type = 'windows-registry-key' _type = 'windows-registry-key'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('key', StringProperty(required=True)), ('key', StringProperty(required=True)),
('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))), ('values', ListProperty(EmbeddedObjectProperty(type=WindowsRegistryValueType))),
@ -643,7 +643,7 @@ class WindowsRegistryKey(_Observable):
class X509V3ExtenstionsType(_STIXBase): class X509V3ExtenstionsType(_STIXBase):
_type = 'x509-v3-extensions-type' _type = 'x509-v3-extensions-type'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('basic_constraints', StringProperty()), ('basic_constraints', StringProperty()),
('name_constraints', StringProperty()), ('name_constraints', StringProperty()),
('policy_constraints', StringProperty()), ('policy_constraints', StringProperty()),
@ -666,7 +666,7 @@ class X509V3ExtenstionsType(_STIXBase):
class X509Certificate(_Observable): class X509Certificate(_Observable):
_type = 'x509-certificate' _type = 'x509-certificate'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('is_self_signed', BooleanProperty()), ('is_self_signed', BooleanProperty()),
('hashes', HashesProperty()), ('hashes', HashesProperty()),
@ -795,7 +795,7 @@ def CustomObservable(type='x-custom-observable', properties=None):
class _Custom(cls, _Observable): class _Custom(cls, _Observable):
_type = type _type = type
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
]) ])

View File

@ -11,7 +11,7 @@ from .utils import NOW, get_dict
class ExternalReference(_STIXBase): class ExternalReference(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('source_name', StringProperty(required=True)), ('source_name', StringProperty(required=True)),
('description', StringProperty()), ('description', StringProperty()),
('url', StringProperty()), ('url', StringProperty()),
@ -25,7 +25,7 @@ class ExternalReference(_STIXBase):
class KillChainPhase(_STIXBase): class KillChainPhase(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('kill_chain_name', StringProperty(required=True)), ('kill_chain_name', StringProperty(required=True)),
('phase_name', StringProperty(required=True)), ('phase_name', StringProperty(required=True)),
]) ])
@ -33,23 +33,24 @@ class KillChainPhase(_STIXBase):
class GranularMarking(_STIXBase): class GranularMarking(_STIXBase):
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('marking_ref', ReferenceProperty(required=True, type="marking-definition")), ('marking_ref', ReferenceProperty(required=True, type="marking-definition")),
('selectors', ListProperty(SelectorProperty, required=True)), ('selectors', ListProperty(SelectorProperty, required=True)),
]) ])
class TLPMarking(_STIXBase): class TLPMarking(_STIXBase):
# TODO: don't allow the creation of any other TLPMarkings than the ones below _type = 'tlp'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('tlp', Property(required=True)) ('tlp', Property(required=True))
]) ])
class StatementMarking(_STIXBase): class StatementMarking(_STIXBase):
_type = 'statement'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('statement', StringProperty(required=True)) ('statement', StringProperty(required=True))
]) ])
@ -67,36 +68,32 @@ class MarkingProperty(Property):
""" """
def clean(self, value): def clean(self, value):
if type(value) in [TLPMarking, StatementMarking]: if type(value) in OBJ_MAP_MARKING.values():
return value return value
else: else:
raise ValueError("must be a Statement or TLP Marking.") raise ValueError("must be a Statement, TLP Marking or a registered marking.")
class MarkingDefinition(_STIXBase): class MarkingDefinition(_STIXBase):
_type = 'marking-definition' _type = 'marking-definition'
_properties = OrderedDict() _properties = OrderedDict()
_properties = _properties.update([ _properties.update([
('created', TimestampProperty(default=lambda: NOW)),
('external_references', ListProperty(ExternalReference)),
('created_by_ref', ReferenceProperty(type="identity")),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
('type', TypeProperty(_type)), ('type', TypeProperty(_type)),
('id', IDProperty(_type)), ('id', IDProperty(_type)),
('created_by_ref', ReferenceProperty(type="identity")),
('created', TimestampProperty(default=lambda: NOW)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(type="marking-definition"))),
('granular_markings', ListProperty(GranularMarking)),
('definition_type', StringProperty(required=True)), ('definition_type', StringProperty(required=True)),
('definition', MarkingProperty(required=True)), ('definition', MarkingProperty(required=True)),
]) ])
marking_map = {
'tlp': TLPMarking,
'statement': StatementMarking,
}
def __init__(self, **kwargs): def __init__(self, **kwargs):
if set(('definition_type', 'definition')).issubset(kwargs.keys()): if set(('definition_type', 'definition')).issubset(kwargs.keys()):
# Create correct marking type object # Create correct marking type object
try: try:
marking_type = self.marking_map[kwargs['definition_type']] marking_type = OBJ_MAP_MARKING[kwargs['definition_type']]
except KeyError: except KeyError:
raise ValueError("definition_type must be a valid marking type") raise ValueError("definition_type must be a valid marking type")
@ -107,6 +104,17 @@ class MarkingDefinition(_STIXBase):
super(MarkingDefinition, self).__init__(**kwargs) super(MarkingDefinition, self).__init__(**kwargs)
def register_marking(new_marking):
"""Register a custom STIX Marking Definition type.
"""
OBJ_MAP_MARKING[new_marking._type] = new_marking
OBJ_MAP_MARKING = {
'tlp': TLPMarking,
'statement': StatementMarking,
}
TLP_WHITE = MarkingDefinition( TLP_WHITE = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z", created="2017-01-20T00:00:00.000Z",