all changes from add-trailing-commas v2.0.2

pull/1/head
Emmanuelle Vargas-Gonzalez 2021-01-13 17:52:15 -05:00
parent acc90c2f4c
commit 85c14d1502
26 changed files with 413 additions and 311 deletions

View File

@ -164,8 +164,10 @@ class _STIXBase(Mapping):
defaulted = []
for name, prop in self._properties.items():
try:
if (not prop.required and not hasattr(prop, '_fixed_value') and
prop.default() == setting_kwargs[name]):
if (
not prop.required and not hasattr(prop, '_fixed_value') and
prop.default() == setting_kwargs[name]
):
defaulted.append(name)
except (AttributeError, KeyError):
continue
@ -194,8 +196,10 @@ class _STIXBase(Mapping):
unpickling = '_inner' not in self.__dict__
if not unpickling and name in self:
return self.__getitem__(name)
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
raise AttributeError(
"'%s' object has no attribute '%s'" %
(self.__class__.__name__, name),
)
def __setattr__(self, name, value):
if not name.startswith("_"):

View File

@ -75,8 +75,10 @@ class _ObjectFamily(object):
def add(self, obj):
self.all_versions[obj["modified"]] = obj
if (self.latest_version is None or
obj["modified"] > self.latest_version["modified"]):
if (
self.latest_version is None or
obj["modified"] > self.latest_version["modified"]
):
self.latest_version = obj
def __str__(self):
@ -188,11 +190,13 @@ class MemorySink(DataSink):
def save_to_file(self, path, encoding="utf-8"):
path = os.path.abspath(path)
all_objs = list(itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
))
all_objs = list(
itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
),
)
if any("spec_version" in x for x in all_objs):
bundle = v21.Bundle(all_objs, allow_custom=self.allow_custom)

View File

@ -144,7 +144,7 @@ class ComparisonExpressionTransformer(Transformer):
class OrderDedupeTransformer(
ComparisonExpressionTransformer
ComparisonExpressionTransformer,
):
"""
Canonically order the children of all nodes in the AST. Because the
@ -247,7 +247,7 @@ class FlattenTransformer(ComparisonExpressionTransformer):
class AbsorptionTransformer(
ComparisonExpressionTransformer
ComparisonExpressionTransformer,
):
"""
Applies boolean "absorption" rules for AST simplification. E.g.:

View File

@ -152,9 +152,11 @@ class ObservationExpressionTransformer(Transformer):
changed = True
else:
raise TypeError("Not an observation expression: {}: {}".format(
type(ast).__name__, str(ast),
))
raise TypeError(
"Not an observation expression: {}: {}".format(
type(ast).__name__, str(ast),
),
)
return result, changed
@ -229,7 +231,7 @@ class FlattenTransformer(ObservationExpressionTransformer):
class OrderDedupeTransformer(
ObservationExpressionTransformer
ObservationExpressionTransformer,
):
"""
Canonically order AND/OR expressions, and dedupe ORs. E.g.:
@ -272,7 +274,7 @@ class OrderDedupeTransformer(
class AbsorptionTransformer(
ObservationExpressionTransformer
ObservationExpressionTransformer,
):
"""
Applies boolean "absorption" rules for observation expressions, for AST
@ -479,7 +481,7 @@ class DNFTransformer(ObservationExpressionTransformer):
class CanonicalizeComparisonExpressionsTransformer(
ObservationExpressionTransformer
ObservationExpressionTransformer,
):
"""
Canonicalize all comparison expressions.

View File

@ -186,8 +186,10 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None):
# flag allows for unknown custom objects too, but will not
# be parsed into STIX observable object, just returned as is
return obj
raise ParseError("Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'])
raise ParseError(
"Can't parse unknown observable type '%s'! For custom observables, "
"use the CustomObservable decorator." % obj['type'],
)
return obj_class(allow_custom=allow_custom, **obj)
@ -283,8 +285,12 @@ def _register_observable(new_observable, version=stix2.DEFAULT_VERSION):
"'%s' is named like an object reference property but "
"is not an ObjectReferenceProperty." % prop_name,
)
elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))):
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like an object reference list property but "
"is not a ListProperty containing ObjectReferenceProperty." % prop_name,
@ -299,8 +305,12 @@ def _register_observable(new_observable, version=stix2.DEFAULT_VERSION):
"'%s' is named like a reference property but "
"is not a ReferenceProperty." % prop_name,
)
elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained))):
elif (
prop_name.endswith('_refs') and (
'ListProperty' not in get_class_hierarchy_names(prop) or
'ReferenceProperty' not in get_class_hierarchy_names(prop.contained)
)
):
raise ValueError(
"'%s' is named like a reference list property but "
"is not a ListProperty containing ReferenceProperty." % prop_name,

View File

@ -261,11 +261,13 @@ class STIXPatternVisitorForSTIX2():
property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText()))
i += 2
elif isinstance(next, IntegerConstant):
property_path.append(self.instantiate(
"ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
next.value,
))
property_path.append(
self.instantiate(
"ListObjectPathComponent",
current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current),
next.value,
),
)
i += 2
else:
property_path.append(current)

View File

@ -247,9 +247,11 @@ class ListProperty(Property):
valid = self.contained(**item)
else:
raise ValueError("Can't create a {} out of {}".format(
self.contained._type, str(item),
))
raise ValueError(
"Can't create a {} out of {}".format(
self.contained._type, str(item),
),
)
result.append(valid)
@ -688,8 +690,10 @@ class STIXObjectProperty(Property):
def clean(self, value):
# Any STIX Object (SDO, SRO, or Marking Definition) can be added to
# a bundle with no further checks.
if any(x in ('_DomainObject', '_RelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)):
if any(
x in ('_DomainObject', '_RelationshipObject', 'MarkingDefinition')
for x in get_class_hierarchy_names(value)
):
# A simple "is this a spec version 2.1+ object" test. For now,
# limit 2.0 bundles to 2.0 objects. It's not possible yet to
# have validation co-constraints among properties, e.g. have

View File

@ -175,12 +175,14 @@ def test_memory_source_get_nonexistant_object(mem_source):
def test_memory_store_all_versions(mem_store):
# Add bundle of items to sink
mem_store.add(dict(
id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
spec_version="2.0",
type="bundle",
))
mem_store.add(
dict(
id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
spec_version="2.0",
type="bundle",
),
)
resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(resp) == 3

View File

@ -39,15 +39,19 @@ def ds2():
cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
indv2 = ind.new_version(external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite.com/",
}])
indv2 = ind.new_version(
external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite.com/",
}],
)
mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS)
malv2 = mal.new_version(external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
}])
malv2 = mal.new_version(
external_references=[{
"source_name": "unknown",
"url": "https://examplewebsite2.com/",
}],
)
rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])

View File

@ -20,7 +20,8 @@ EXPECTED_INDICATOR = """{
]
}"""
EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(
"""
type='indicator',
id='indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7',
created='2017-01-01T00:00:01.000Z',
@ -28,7 +29,8 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']",
valid_from='1970-01-01T00:00:01Z',
labels=['malicious-activity']
""".split()) + ")"
""".split()
) + ")"
def test_indicator_with_all_required_properties():

View File

@ -1180,50 +1180,56 @@ def test_process_example_extensions_empty():
def test_process_example_with_WindowsProcessExt_Object():
p = stix2.v20.Process(extensions={
"windows-process-ext": stix2.v20.WindowsProcessExt(
aslr_enabled=True,
dep_enabled=True,
priority="HIGH_PRIORITY_CLASS",
owner_sid="S-1-5-21-186985262-1144665072-74031268-1309",
), # noqa
})
p = stix2.v20.Process(
extensions={
"windows-process-ext": stix2.v20.WindowsProcessExt(
aslr_enabled=True,
dep_enabled=True,
priority="HIGH_PRIORITY_CLASS",
owner_sid="S-1-5-21-186985262-1144665072-74031268-1309",
), # noqa
},
)
assert p.extensions["windows-process-ext"].dep_enabled
assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309"
def test_process_example_with_WindowsServiceExt():
p = stix2.v20.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
p = stix2.v20.Process(
extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
},
},
})
)
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"
def test_process_example_with_WindowsProcessServiceExt():
p = stix2.v20.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
p = stix2.v20.Process(
extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
},
"windows-process-ext": {
"aslr_enabled": True,
"dep_enabled": True,
"priority": "HIGH_PRIORITY_CLASS",
"owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309",
},
},
"windows-process-ext": {
"aslr_enabled": True,
"dep_enabled": True,
"priority": "HIGH_PRIORITY_CLASS",
"owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309",
},
})
)
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"

View File

@ -306,10 +306,12 @@ def test_multiple_qualifiers():
def test_set_op():
exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression(
"network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64",
))
exp = stix2.ObservationExpression(
stix2.IsSubsetComparisonExpression(
"network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64",
),
)
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"

View File

@ -71,7 +71,7 @@ def test_parse_datetime_invalid(ts):
{"a": 1},
'{"a": 1}',
StringIO(u'{"a": 1}'),
[("a", 1,)],
[("a", 1)],
],
)
def test_get_dict(data):

View File

@ -46,10 +46,12 @@ def test_making_new_version_with_embedded_object():
**CAMPAIGN_MORE_KWARGS
)
campaign_v2 = campaign_v1.new_version(external_references=[{
"source_name": "capec",
"external_id": "CAPEC-164",
}])
campaign_v2 = campaign_v1.new_version(
external_references=[{
"source_name": "capec",
"external_id": "CAPEC-164",
}],
)
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.created_by_ref == campaign_v2.created_by_ref
@ -237,8 +239,10 @@ def test_remove_custom_stix_property():
mal_nc = stix2.versioning.remove_custom_stix(mal)
assert "x_custom" not in mal_nc
assert (stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") <
stix2.utils.parse_into_datetime(mal_nc["modified"], precision="millisecond"))
assert (
stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") <
stix2.utils.parse_into_datetime(mal_nc["modified"], precision="millisecond")
)
def test_remove_custom_stix_object():

View File

@ -191,11 +191,13 @@ def test_memory_source_get_nonexistant_object(mem_source):
def test_memory_store_all_versions(mem_store):
# Add bundle of items to sink
mem_store.add(dict(
id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
type="bundle",
))
mem_store.add(
dict(
id="bundle--%s" % make_id(),
objects=STIX_OBJS2,
type="bundle",
),
)
resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001")
assert len(resp) == 3

View File

@ -20,7 +20,8 @@ EXPECTED_INDICATOR = """{
"valid_from": "1970-01-01T00:00:01Z"
}"""
EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(
"""
type='indicator',
spec_version='2.1',
id='indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7',
@ -30,7 +31,8 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join("""
pattern_type='stix',
pattern_version='2.1',
valid_from='1970-01-01T00:00:01Z'
""".split()) + ")"
""".split()
) + ")"
def test_indicator_with_all_required_properties():

View File

@ -19,14 +19,16 @@ EXPECTED_LOCATION_1 = """{
"longitude": 2.3522
}"""
EXPECTED_LOCATION_1_REPR = "Location(" + " ".join("""
EXPECTED_LOCATION_1_REPR = "Location(" + " ".join(
"""
type='location',
spec_version='2.1',
id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64',
created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z',
latitude=48.8566,
longitude=2.3522""".split()) + ")"
longitude=2.3522""".split()
) + ")"
EXPECTED_LOCATION_2 = """{
"type": "location",
@ -38,13 +40,15 @@ EXPECTED_LOCATION_2 = """{
}
"""
EXPECTED_LOCATION_2_REPR = "Location(" + " ".join("""
EXPECTED_LOCATION_2_REPR = "Location(" + " ".join(
"""
type='location',
spec_version='2.1',
id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64',
created='2016-04-06T20:03:00.000Z',
modified='2016-04-06T20:03:00.000Z',
region='north-america'""".split()) + ")"
region='north-america'""".split()
) + ")"
def test_location_with_some_required_properties():

View File

@ -496,12 +496,14 @@ def test_parse_email_message_not_multipart(data):
def test_parse_file_archive(data):
odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED)
odata = stix2.parse(odata_str, version="2.1")
assert all(x in odata.objects["3"].extensions['archive-ext'].contains_refs
for x in [
"file--ecd47d73-15e4-5250-afda-ef8897b22340",
"file--65f2873d-38c2-56b4-bfa5-e3ef21e8a3c3",
"file--ef2d6dca-ec7d-5ab7-8dd9-ec9c0dee0eac",
])
assert all(
x in odata.objects["3"].extensions['archive-ext'].contains_refs
for x in [
"file--ecd47d73-15e4-5250-afda-ef8897b22340",
"file--65f2873d-38c2-56b4-bfa5-e3ef21e8a3c3",
"file--ef2d6dca-ec7d-5ab7-8dd9-ec9c0dee0eac",
]
)
@pytest.mark.parametrize(
@ -904,14 +906,14 @@ def test_file_with_archive_ext_object():
f_obj = stix2.v21.File(
name="foo", extensions={
"archive-ext": {
"contains_refs": [ad, ],
"contains_refs": [ad],
},
},
)
f_ref = stix2.v21.File(
name="foo", extensions={
"archive-ext": {
"contains_refs": [ad.id, ],
"contains_refs": [ad.id],
},
},
)
@ -1229,9 +1231,11 @@ def test_process_example_empty_error():
def test_process_example_empty_with_extensions():
with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo:
stix2.v21.Process(extensions={
"windows-process-ext": {},
})
stix2.v21.Process(
extensions={
"windows-process-ext": {},
},
)
assert excinfo.value.cls == stix2.v21.Process
@ -1276,50 +1280,56 @@ def test_process_example_extensions_empty():
def test_process_example_with_WindowsProcessExt_Object():
p = stix2.v21.Process(extensions={
"windows-process-ext": stix2.v21.WindowsProcessExt(
aslr_enabled=True,
dep_enabled=True,
priority="HIGH_PRIORITY_CLASS",
owner_sid="S-1-5-21-186985262-1144665072-74031268-1309",
), # noqa
})
p = stix2.v21.Process(
extensions={
"windows-process-ext": stix2.v21.WindowsProcessExt(
aslr_enabled=True,
dep_enabled=True,
priority="HIGH_PRIORITY_CLASS",
owner_sid="S-1-5-21-186985262-1144665072-74031268-1309",
), # noqa
},
)
assert p.extensions["windows-process-ext"].dep_enabled
assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309"
def test_process_example_with_WindowsServiceExt():
p = stix2.v21.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
p = stix2.v21.Process(
extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
},
},
})
)
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"
def test_process_example_with_WindowsProcessServiceExt():
p = stix2.v21.Process(extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
p = stix2.v21.Process(
extensions={
"windows-service-ext": {
"service_name": "sirvizio",
"display_name": "Sirvizio",
"start_type": "SERVICE_AUTO_START",
"service_type": "SERVICE_WIN32_OWN_PROCESS",
"service_status": "SERVICE_RUNNING",
},
"windows-process-ext": {
"aslr_enabled": True,
"dep_enabled": True,
"priority": "HIGH_PRIORITY_CLASS",
"owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309",
},
},
"windows-process-ext": {
"aslr_enabled": True,
"dep_enabled": True,
"priority": "HIGH_PRIORITY_CLASS",
"owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309",
},
})
)
assert p.extensions["windows-service-ext"].service_name == "sirvizio"
assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS"

View File

@ -444,10 +444,12 @@ def test_multiple_qualifiers():
def test_set_op():
exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression(
"network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64",
))
exp = stix2.ObservationExpression(
stix2.IsSubsetComparisonExpression(
"network-traffic:dst_ref.value",
"2001:0db8:dead:beef:0000:0000:0000:0000/64",
),
)
assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']"
@ -712,12 +714,12 @@ def test_parsing_boolean():
def test_parsing_mixed_boolean_expression_1():
patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]",)
patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]")
assert str(patt_obj) == "[a:b = 1 AND a:b = 2 OR a:b = 3]"
def test_parsing_mixed_boolean_expression_2():
patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]",)
patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]")
assert str(patt_obj) == "[a:b = 1 OR a:b = 2 AND a:b = 3]"

View File

@ -71,7 +71,7 @@ def test_parse_datetime_invalid(ts):
{"a": 1},
'{"a": 1}',
StringIO(u'{"a": 1}'),
[("a", 1,)],
[("a", 1)],
],
)
def test_get_dict(data):

View File

@ -50,10 +50,12 @@ def test_making_new_version_with_embedded_object():
**CAMPAIGN_MORE_KWARGS
)
campaign_v2 = campaign_v1.new_version(external_references=[{
"source_name": "capec",
"external_id": "CAPEC-164",
}])
campaign_v2 = campaign_v1.new_version(
external_references=[{
"source_name": "capec",
"external_id": "CAPEC-164",
}],
)
assert campaign_v1.id == campaign_v2.id
assert campaign_v1.spec_version == campaign_v2.spec_version

View File

@ -71,9 +71,11 @@ def _to_enum(value, enum_type, enum_default=None):
elif isinstance(value, six.string_types):
value = enum_type[value.upper()]
else:
raise TypeError("Not a valid {}: {}".format(
enum_type.__name__, value,
))
raise TypeError(
"Not a valid {}: {}".format(
enum_type.__name__, value,
),
)
return value

View File

@ -440,24 +440,28 @@ class SocketExt(_Extension):
('is_blocking', BooleanProperty()),
('is_listening', BooleanProperty()),
(
'protocol_family', EnumProperty(allowed=[
"PF_INET",
"PF_IPX",
"PF_APPLETALK",
"PF_INET6",
"PF_AX25",
"PF_NETROM",
]),
'protocol_family', EnumProperty(
allowed=[
"PF_INET",
"PF_IPX",
"PF_APPLETALK",
"PF_INET6",
"PF_AX25",
"PF_NETROM",
],
),
),
('options', DictionaryProperty(spec_version="2.0")),
(
'socket_type', EnumProperty(allowed=[
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
]),
'socket_type', EnumProperty(
allowed=[
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
],
),
),
('socket_descriptor', IntegerProperty()),
('socket_handle', IntegerProperty()),
@ -537,33 +541,39 @@ class WindowsServiceExt(_Extension):
('display_name', StringProperty()),
('group_name', StringProperty()),
(
'start_type', EnumProperty(allowed=[
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
]),
'start_type', EnumProperty(
allowed=[
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
],
),
),
('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))),
(
'service_type', EnumProperty(allowed=[
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
]),
'service_type', EnumProperty(
allowed=[
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
],
),
),
(
'service_status', EnumProperty(allowed=[
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
]),
'service_status', EnumProperty(
allowed=[
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
],
),
),
])
@ -687,21 +697,23 @@ class WindowsRegistryValueType(_STIXBase20):
('name', StringProperty(required=True)),
('data', StringProperty()),
(
'data_type', EnumProperty(allowed=[
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
]),
'data_type', EnumProperty(
allowed=[
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
],
),
),
])
@ -790,11 +802,13 @@ def CustomObservable(type='x-custom-observable', properties=None):
"""
def wrapper(cls):
_properties = list(itertools.chain.from_iterable([
[('type', TypeProperty(type, spec_version='2.0'))],
properties,
[('extensions', ExtensionsProperty(spec_version="2.0", enclosing_type=type))],
]))
_properties = list(
itertools.chain.from_iterable([
[('type', TypeProperty(type, spec_version='2.0'))],
properties,
[('extensions', ExtensionsProperty(spec_version="2.0", enclosing_type=type))],
]),
)
return _custom_observable_builder(cls, type, _properties, '2.0', _Observable)
return wrapper

View File

@ -356,23 +356,25 @@ def CustomObject(type='x-custom-type', properties=None):
"""
def wrapper(cls):
_properties = list(itertools.chain.from_iterable([
[
('type', TypeProperty(type, spec_version='2.0')),
('id', IDProperty(type, spec_version='2.0')),
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.0')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
],
[x for x in properties if not x[0].startswith('x_')],
[
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.0'))),
('granular_markings', ListProperty(GranularMarking)),
],
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
]))
_properties = list(
itertools.chain.from_iterable([
[
('type', TypeProperty(type, spec_version='2.0')),
('id', IDProperty(type, spec_version='2.0')),
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.0')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')),
],
[x for x in properties if not x[0].startswith('x_')],
[
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.0'))),
('granular_markings', ListProperty(GranularMarking)),
],
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
]),
)
return _custom_object_builder(cls, type, _properties, '2.0', _DomainObject)
return wrapper

View File

@ -505,13 +505,15 @@ class SocketExt(_Extension):
('is_listening', BooleanProperty()),
('options', DictionaryProperty(spec_version='2.1')),
(
'socket_type', EnumProperty(allowed=[
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
]),
'socket_type', EnumProperty(
allowed=[
"SOCK_STREAM",
"SOCK_DGRAM",
"SOCK_RAW",
"SOCK_RDM",
"SOCK_SEQPACKET",
],
),
),
('socket_descriptor', IntegerProperty(min=0)),
('socket_handle', IntegerProperty()),
@ -612,12 +614,14 @@ class WindowsProcessExt(_Extension):
('window_title', StringProperty()),
('startup_info', DictionaryProperty(spec_version='2.1')),
(
'integrity_level', EnumProperty(allowed=[
"low",
"medium",
"high",
"system",
]),
'integrity_level', EnumProperty(
allowed=[
"low",
"medium",
"high",
"system",
],
),
),
])
@ -634,33 +638,39 @@ class WindowsServiceExt(_Extension):
('display_name', StringProperty()),
('group_name', StringProperty()),
(
'start_type', EnumProperty(allowed=[
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
]),
'start_type', EnumProperty(
allowed=[
"SERVICE_AUTO_START",
"SERVICE_BOOT_START",
"SERVICE_DEMAND_START",
"SERVICE_DISABLED",
"SERVICE_SYSTEM_ALERT",
],
),
),
('service_dll_refs', ListProperty(ReferenceProperty(valid_types='file', spec_version="2.1"))),
(
'service_type', EnumProperty(allowed=[
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
]),
'service_type', EnumProperty(
allowed=[
"SERVICE_KERNEL_DRIVER",
"SERVICE_FILE_SYSTEM_DRIVER",
"SERVICE_WIN32_OWN_PROCESS",
"SERVICE_WIN32_SHARE_PROCESS",
],
),
),
(
'service_status', EnumProperty(allowed=[
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
]),
'service_status', EnumProperty(
allowed=[
"SERVICE_CONTINUE_PENDING",
"SERVICE_PAUSE_PENDING",
"SERVICE_PAUSED",
"SERVICE_RUNNING",
"SERVICE_START_PENDING",
"SERVICE_STOP_PENDING",
"SERVICE_STOPPED",
],
),
),
])
@ -808,21 +818,23 @@ class WindowsRegistryValueType(_STIXBase21):
('name', StringProperty()),
('data', StringProperty()),
(
'data_type', EnumProperty(allowed=[
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
]),
'data_type', EnumProperty(
allowed=[
"REG_NONE",
"REG_SZ",
"REG_EXPAND_SZ",
"REG_BINARY",
"REG_DWORD",
"REG_DWORD_BIG_ENDIAN",
"REG_LINK",
"REG_MULTI_SZ",
"REG_RESOURCE_LIST",
"REG_FULL_RESOURCE_DESCRIPTION",
"REG_RESOURCE_REQUIREMENTS_LIST",
"REG_QWORD",
"REG_INVALID_TYPE",
],
),
),
])
@ -935,13 +947,15 @@ def CustomObservable(type='x-custom-observable', properties=None, id_contrib_pro
"""
def wrapper(cls):
_properties = list(itertools.chain.from_iterable([
[('type', TypeProperty(type, spec_version='2.1'))],
[('spec_version', StringProperty(fixed='2.1'))],
[('id', IDProperty(type, spec_version='2.1'))],
properties,
[('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))],
]))
_properties = list(
itertools.chain.from_iterable([
[('type', TypeProperty(type, spec_version='2.1'))],
[('spec_version', StringProperty(fixed='2.1'))],
[('id', IDProperty(type, spec_version='2.1'))],
properties,
[('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))],
]),
)
return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props)
return wrapper

View File

@ -789,27 +789,29 @@ def CustomObject(type='x-custom-type', properties=None):
"""
def wrapper(cls):
_properties = list(itertools.chain.from_iterable([
[
('type', TypeProperty(type, spec_version='2.1')),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(type, spec_version='2.1')),
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
],
[x for x in properties if not x[0].startswith('x_')],
[
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
('granular_markings', ListProperty(GranularMarking)),
],
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
]))
_properties = list(
itertools.chain.from_iterable([
[
('type', TypeProperty(type, spec_version='2.1')),
('spec_version', StringProperty(fixed='2.1')),
('id', IDProperty(type, spec_version='2.1')),
('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')),
('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')),
],
[x for x in properties if not x[0].startswith('x_')],
[
('revoked', BooleanProperty(default=lambda: False)),
('labels', ListProperty(StringProperty)),
('confidence', IntegerProperty()),
('lang', StringProperty()),
('external_references', ListProperty(ExternalReference)),
('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))),
('granular_markings', ListProperty(GranularMarking)),
],
sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]),
]),
)
return _custom_object_builder(cls, type, _properties, '2.1', _DomainObject)
return wrapper