diff --git a/stix2/base.py b/stix2/base.py index 21b6011..38a5997 100644 --- a/stix2/base.py +++ b/stix2/base.py @@ -164,8 +164,10 @@ class _STIXBase(Mapping): defaulted = [] for name, prop in self._properties.items(): try: - if (not prop.required and not hasattr(prop, '_fixed_value') and - prop.default() == setting_kwargs[name]): + if ( + not prop.required and not hasattr(prop, '_fixed_value') and + prop.default() == setting_kwargs[name] + ): defaulted.append(name) except (AttributeError, KeyError): continue @@ -194,8 +196,10 @@ class _STIXBase(Mapping): unpickling = '_inner' not in self.__dict__ if not unpickling and name in self: return self.__getitem__(name) - raise AttributeError("'%s' object has no attribute '%s'" % - (self.__class__.__name__, name)) + raise AttributeError( + "'%s' object has no attribute '%s'" % + (self.__class__.__name__, name), + ) def __setattr__(self, name, value): if not name.startswith("_"): diff --git a/stix2/datastore/memory.py b/stix2/datastore/memory.py index f71b763..10fb76d 100644 --- a/stix2/datastore/memory.py +++ b/stix2/datastore/memory.py @@ -75,8 +75,10 @@ class _ObjectFamily(object): def add(self, obj): self.all_versions[obj["modified"]] = obj - if (self.latest_version is None or - obj["modified"] > self.latest_version["modified"]): + if ( + self.latest_version is None or + obj["modified"] > self.latest_version["modified"] + ): self.latest_version = obj def __str__(self): @@ -188,11 +190,13 @@ class MemorySink(DataSink): def save_to_file(self, path, encoding="utf-8"): path = os.path.abspath(path) - all_objs = list(itertools.chain.from_iterable( - value.all_versions.values() if isinstance(value, _ObjectFamily) - else [value] - for value in self._data.values() - )) + all_objs = list( + itertools.chain.from_iterable( + value.all_versions.values() if isinstance(value, _ObjectFamily) + else [value] + for value in self._data.values() + ), + ) if any("spec_version" in x for x in all_objs): bundle = v21.Bundle(all_objs, allow_custom=self.allow_custom) diff --git a/stix2/equivalence/pattern/transform/comparison.py b/stix2/equivalence/pattern/transform/comparison.py index 6db1055..248766d 100644 --- a/stix2/equivalence/pattern/transform/comparison.py +++ b/stix2/equivalence/pattern/transform/comparison.py @@ -144,7 +144,7 @@ class ComparisonExpressionTransformer(Transformer): class OrderDedupeTransformer( - ComparisonExpressionTransformer + ComparisonExpressionTransformer, ): """ Canonically order the children of all nodes in the AST. Because the @@ -247,7 +247,7 @@ class FlattenTransformer(ComparisonExpressionTransformer): class AbsorptionTransformer( - ComparisonExpressionTransformer + ComparisonExpressionTransformer, ): """ Applies boolean "absorption" rules for AST simplification. E.g.: diff --git a/stix2/equivalence/pattern/transform/observation.py b/stix2/equivalence/pattern/transform/observation.py index 8e2a4d2..ee698bd 100644 --- a/stix2/equivalence/pattern/transform/observation.py +++ b/stix2/equivalence/pattern/transform/observation.py @@ -152,9 +152,11 @@ class ObservationExpressionTransformer(Transformer): changed = True else: - raise TypeError("Not an observation expression: {}: {}".format( - type(ast).__name__, str(ast), - )) + raise TypeError( + "Not an observation expression: {}: {}".format( + type(ast).__name__, str(ast), + ), + ) return result, changed @@ -229,7 +231,7 @@ class FlattenTransformer(ObservationExpressionTransformer): class OrderDedupeTransformer( - ObservationExpressionTransformer + ObservationExpressionTransformer, ): """ Canonically order AND/OR expressions, and dedupe ORs. E.g.: @@ -272,7 +274,7 @@ class OrderDedupeTransformer( class AbsorptionTransformer( - ObservationExpressionTransformer + ObservationExpressionTransformer, ): """ Applies boolean "absorption" rules for observation expressions, for AST @@ -479,7 +481,7 @@ class DNFTransformer(ObservationExpressionTransformer): class CanonicalizeComparisonExpressionsTransformer( - ObservationExpressionTransformer + ObservationExpressionTransformer, ): """ Canonicalize all comparison expressions. diff --git a/stix2/parsing.py b/stix2/parsing.py index c0c7bf8..9ea90db 100644 --- a/stix2/parsing.py +++ b/stix2/parsing.py @@ -186,8 +186,10 @@ def parse_observable(data, _valid_refs=None, allow_custom=False, version=None): # flag allows for unknown custom objects too, but will not # be parsed into STIX observable object, just returned as is return obj - raise ParseError("Can't parse unknown observable type '%s'! For custom observables, " - "use the CustomObservable decorator." % obj['type']) + raise ParseError( + "Can't parse unknown observable type '%s'! For custom observables, " + "use the CustomObservable decorator." % obj['type'], + ) return obj_class(allow_custom=allow_custom, **obj) @@ -283,8 +285,12 @@ def _register_observable(new_observable, version=stix2.DEFAULT_VERSION): "'%s' is named like an object reference property but " "is not an ObjectReferenceProperty." % prop_name, ) - elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or - 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained))): + elif ( + prop_name.endswith('_refs') and ( + 'ListProperty' not in get_class_hierarchy_names(prop) or + 'ObjectReferenceProperty' not in get_class_hierarchy_names(prop.contained) + ) + ): raise ValueError( "'%s' is named like an object reference list property but " "is not a ListProperty containing ObjectReferenceProperty." % prop_name, @@ -299,8 +305,12 @@ def _register_observable(new_observable, version=stix2.DEFAULT_VERSION): "'%s' is named like a reference property but " "is not a ReferenceProperty." % prop_name, ) - elif (prop_name.endswith('_refs') and ('ListProperty' not in get_class_hierarchy_names(prop) or - 'ReferenceProperty' not in get_class_hierarchy_names(prop.contained))): + elif ( + prop_name.endswith('_refs') and ( + 'ListProperty' not in get_class_hierarchy_names(prop) or + 'ReferenceProperty' not in get_class_hierarchy_names(prop.contained) + ) + ): raise ValueError( "'%s' is named like a reference list property but " "is not a ListProperty containing ReferenceProperty." % prop_name, diff --git a/stix2/pattern_visitor.py b/stix2/pattern_visitor.py index c4b2ec2..f63dd29 100644 --- a/stix2/pattern_visitor.py +++ b/stix2/pattern_visitor.py @@ -261,11 +261,13 @@ class STIXPatternVisitorForSTIX2(): property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText())) i += 2 elif isinstance(next, IntegerConstant): - property_path.append(self.instantiate( - "ListObjectPathComponent", - current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), - next.value, - )) + property_path.append( + self.instantiate( + "ListObjectPathComponent", + current.property_name if isinstance(current, BasicObjectPathComponent) else text_type(current), + next.value, + ), + ) i += 2 else: property_path.append(current) diff --git a/stix2/properties.py b/stix2/properties.py index 1ca2dbe..0b3d8bf 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -247,9 +247,11 @@ class ListProperty(Property): valid = self.contained(**item) else: - raise ValueError("Can't create a {} out of {}".format( - self.contained._type, str(item), - )) + raise ValueError( + "Can't create a {} out of {}".format( + self.contained._type, str(item), + ), + ) result.append(valid) @@ -688,8 +690,10 @@ class STIXObjectProperty(Property): def clean(self, value): # Any STIX Object (SDO, SRO, or Marking Definition) can be added to # a bundle with no further checks. - if any(x in ('_DomainObject', '_RelationshipObject', 'MarkingDefinition') - for x in get_class_hierarchy_names(value)): + if any( + x in ('_DomainObject', '_RelationshipObject', 'MarkingDefinition') + for x in get_class_hierarchy_names(value) + ): # A simple "is this a spec version 2.1+ object" test. For now, # limit 2.0 bundles to 2.0 objects. It's not possible yet to # have validation co-constraints among properties, e.g. have diff --git a/stix2/test/v20/test_datastore_memory.py b/stix2/test/v20/test_datastore_memory.py index 28d8e52..3357e2c 100644 --- a/stix2/test/v20/test_datastore_memory.py +++ b/stix2/test/v20/test_datastore_memory.py @@ -175,12 +175,14 @@ def test_memory_source_get_nonexistant_object(mem_source): def test_memory_store_all_versions(mem_store): # Add bundle of items to sink - mem_store.add(dict( - id="bundle--%s" % make_id(), - objects=STIX_OBJS2, - spec_version="2.0", - type="bundle", - )) + mem_store.add( + dict( + id="bundle--%s" % make_id(), + objects=STIX_OBJS2, + spec_version="2.0", + type="bundle", + ), + ) resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001") assert len(resp) == 3 diff --git a/stix2/test/v20/test_environment.py b/stix2/test/v20/test_environment.py index 34ce596..e572aee 100644 --- a/stix2/test/v20/test_environment.py +++ b/stix2/test/v20/test_environment.py @@ -39,15 +39,19 @@ def ds2(): cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS) - indv2 = ind.new_version(external_references=[{ - "source_name": "unknown", - "url": "https://examplewebsite.com/", - }]) + indv2 = ind.new_version( + external_references=[{ + "source_name": "unknown", + "url": "https://examplewebsite.com/", + }], + ) mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS) - malv2 = mal.new_version(external_references=[{ - "source_name": "unknown", - "url": "https://examplewebsite2.com/", - }]) + malv2 = mal.new_version( + external_references=[{ + "source_name": "unknown", + "url": "https://examplewebsite2.com/", + }], + ) rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0]) rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1]) rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2]) diff --git a/stix2/test/v20/test_indicator.py b/stix2/test/v20/test_indicator.py index 1ae33ec..3fe1886 100644 --- a/stix2/test/v20/test_indicator.py +++ b/stix2/test/v20/test_indicator.py @@ -20,7 +20,8 @@ EXPECTED_INDICATOR = """{ ] }""" -EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(""" +EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join( + """ type='indicator', id='indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7', created='2017-01-01T00:00:01.000Z', @@ -28,7 +29,8 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(""" pattern="[file:hashes.MD5 = 'd41d8cd98f00b204e9800998ecf8427e']", valid_from='1970-01-01T00:00:01Z', labels=['malicious-activity'] -""".split()) + ")" +""".split() +) + ")" def test_indicator_with_all_required_properties(): diff --git a/stix2/test/v20/test_observed_data.py b/stix2/test/v20/test_observed_data.py index 354d70c..bd60383 100644 --- a/stix2/test/v20/test_observed_data.py +++ b/stix2/test/v20/test_observed_data.py @@ -1180,50 +1180,56 @@ def test_process_example_extensions_empty(): def test_process_example_with_WindowsProcessExt_Object(): - p = stix2.v20.Process(extensions={ - "windows-process-ext": stix2.v20.WindowsProcessExt( - aslr_enabled=True, - dep_enabled=True, - priority="HIGH_PRIORITY_CLASS", - owner_sid="S-1-5-21-186985262-1144665072-74031268-1309", - ), # noqa - }) + p = stix2.v20.Process( + extensions={ + "windows-process-ext": stix2.v20.WindowsProcessExt( + aslr_enabled=True, + dep_enabled=True, + priority="HIGH_PRIORITY_CLASS", + owner_sid="S-1-5-21-186985262-1144665072-74031268-1309", + ), # noqa + }, + ) assert p.extensions["windows-process-ext"].dep_enabled assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309" def test_process_example_with_WindowsServiceExt(): - p = stix2.v20.Process(extensions={ - "windows-service-ext": { - "service_name": "sirvizio", - "display_name": "Sirvizio", - "start_type": "SERVICE_AUTO_START", - "service_type": "SERVICE_WIN32_OWN_PROCESS", - "service_status": "SERVICE_RUNNING", + p = stix2.v20.Process( + extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING", + }, }, - }) + ) assert p.extensions["windows-service-ext"].service_name == "sirvizio" assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" def test_process_example_with_WindowsProcessServiceExt(): - p = stix2.v20.Process(extensions={ - "windows-service-ext": { - "service_name": "sirvizio", - "display_name": "Sirvizio", - "start_type": "SERVICE_AUTO_START", - "service_type": "SERVICE_WIN32_OWN_PROCESS", - "service_status": "SERVICE_RUNNING", + p = stix2.v20.Process( + extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING", + }, + "windows-process-ext": { + "aslr_enabled": True, + "dep_enabled": True, + "priority": "HIGH_PRIORITY_CLASS", + "owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309", + }, }, - "windows-process-ext": { - "aslr_enabled": True, - "dep_enabled": True, - "priority": "HIGH_PRIORITY_CLASS", - "owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309", - }, - }) + ) assert p.extensions["windows-service-ext"].service_name == "sirvizio" assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" diff --git a/stix2/test/v20/test_pattern_expressions.py b/stix2/test/v20/test_pattern_expressions.py index 526fe97..4d0073a 100644 --- a/stix2/test/v20/test_pattern_expressions.py +++ b/stix2/test/v20/test_pattern_expressions.py @@ -306,10 +306,12 @@ def test_multiple_qualifiers(): def test_set_op(): - exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression( - "network-traffic:dst_ref.value", - "2001:0db8:dead:beef:0000:0000:0000:0000/64", - )) + exp = stix2.ObservationExpression( + stix2.IsSubsetComparisonExpression( + "network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64", + ), + ) assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']" diff --git a/stix2/test/v20/test_utils.py b/stix2/test/v20/test_utils.py index a66f3e8..de66332 100644 --- a/stix2/test/v20/test_utils.py +++ b/stix2/test/v20/test_utils.py @@ -71,7 +71,7 @@ def test_parse_datetime_invalid(ts): {"a": 1}, '{"a": 1}', StringIO(u'{"a": 1}'), - [("a", 1,)], + [("a", 1)], ], ) def test_get_dict(data): diff --git a/stix2/test/v20/test_versioning.py b/stix2/test/v20/test_versioning.py index 03d43cc..e2525e4 100644 --- a/stix2/test/v20/test_versioning.py +++ b/stix2/test/v20/test_versioning.py @@ -46,10 +46,12 @@ def test_making_new_version_with_embedded_object(): **CAMPAIGN_MORE_KWARGS ) - campaign_v2 = campaign_v1.new_version(external_references=[{ - "source_name": "capec", - "external_id": "CAPEC-164", - }]) + campaign_v2 = campaign_v1.new_version( + external_references=[{ + "source_name": "capec", + "external_id": "CAPEC-164", + }], + ) assert campaign_v1.id == campaign_v2.id assert campaign_v1.created_by_ref == campaign_v2.created_by_ref @@ -237,8 +239,10 @@ def test_remove_custom_stix_property(): mal_nc = stix2.versioning.remove_custom_stix(mal) assert "x_custom" not in mal_nc - assert (stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") < - stix2.utils.parse_into_datetime(mal_nc["modified"], precision="millisecond")) + assert ( + stix2.utils.parse_into_datetime(mal["modified"], precision="millisecond") < + stix2.utils.parse_into_datetime(mal_nc["modified"], precision="millisecond") + ) def test_remove_custom_stix_object(): diff --git a/stix2/test/v21/test_datastore_memory.py b/stix2/test/v21/test_datastore_memory.py index 60f577e..870f82e 100644 --- a/stix2/test/v21/test_datastore_memory.py +++ b/stix2/test/v21/test_datastore_memory.py @@ -191,11 +191,13 @@ def test_memory_source_get_nonexistant_object(mem_source): def test_memory_store_all_versions(mem_store): # Add bundle of items to sink - mem_store.add(dict( - id="bundle--%s" % make_id(), - objects=STIX_OBJS2, - type="bundle", - )) + mem_store.add( + dict( + id="bundle--%s" % make_id(), + objects=STIX_OBJS2, + type="bundle", + ), + ) resp = mem_store.all_versions("indicator--00000000-0000-4000-8000-000000000001") assert len(resp) == 3 diff --git a/stix2/test/v21/test_indicator.py b/stix2/test/v21/test_indicator.py index 8452f70..1264ac1 100644 --- a/stix2/test/v21/test_indicator.py +++ b/stix2/test/v21/test_indicator.py @@ -20,7 +20,8 @@ EXPECTED_INDICATOR = """{ "valid_from": "1970-01-01T00:00:01Z" }""" -EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(""" +EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join( + """ type='indicator', spec_version='2.1', id='indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7', @@ -30,7 +31,8 @@ EXPECTED_INDICATOR_REPR = "Indicator(" + " ".join(""" pattern_type='stix', pattern_version='2.1', valid_from='1970-01-01T00:00:01Z' -""".split()) + ")" +""".split() +) + ")" def test_indicator_with_all_required_properties(): diff --git a/stix2/test/v21/test_location.py b/stix2/test/v21/test_location.py index 7517fdf..3e064b6 100644 --- a/stix2/test/v21/test_location.py +++ b/stix2/test/v21/test_location.py @@ -19,14 +19,16 @@ EXPECTED_LOCATION_1 = """{ "longitude": 2.3522 }""" -EXPECTED_LOCATION_1_REPR = "Location(" + " ".join(""" +EXPECTED_LOCATION_1_REPR = "Location(" + " ".join( + """ type='location', spec_version='2.1', id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64', created='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z', latitude=48.8566, - longitude=2.3522""".split()) + ")" + longitude=2.3522""".split() +) + ")" EXPECTED_LOCATION_2 = """{ "type": "location", @@ -38,13 +40,15 @@ EXPECTED_LOCATION_2 = """{ } """ -EXPECTED_LOCATION_2_REPR = "Location(" + " ".join(""" +EXPECTED_LOCATION_2_REPR = "Location(" + " ".join( + """ type='location', spec_version='2.1', id='location--a6e9345f-5a15-4c29-8bb3-7dcc5d168d64', created='2016-04-06T20:03:00.000Z', modified='2016-04-06T20:03:00.000Z', - region='north-america'""".split()) + ")" + region='north-america'""".split() +) + ")" def test_location_with_some_required_properties(): diff --git a/stix2/test/v21/test_observed_data.py b/stix2/test/v21/test_observed_data.py index 00ee055..c1cb38e 100644 --- a/stix2/test/v21/test_observed_data.py +++ b/stix2/test/v21/test_observed_data.py @@ -496,12 +496,14 @@ def test_parse_email_message_not_multipart(data): def test_parse_file_archive(data): odata_str = OBJECTS_REGEX.sub('"objects": { %s }' % data, EXPECTED) odata = stix2.parse(odata_str, version="2.1") - assert all(x in odata.objects["3"].extensions['archive-ext'].contains_refs - for x in [ - "file--ecd47d73-15e4-5250-afda-ef8897b22340", - "file--65f2873d-38c2-56b4-bfa5-e3ef21e8a3c3", - "file--ef2d6dca-ec7d-5ab7-8dd9-ec9c0dee0eac", - ]) + assert all( + x in odata.objects["3"].extensions['archive-ext'].contains_refs + for x in [ + "file--ecd47d73-15e4-5250-afda-ef8897b22340", + "file--65f2873d-38c2-56b4-bfa5-e3ef21e8a3c3", + "file--ef2d6dca-ec7d-5ab7-8dd9-ec9c0dee0eac", + ] + ) @pytest.mark.parametrize( @@ -904,14 +906,14 @@ def test_file_with_archive_ext_object(): f_obj = stix2.v21.File( name="foo", extensions={ "archive-ext": { - "contains_refs": [ad, ], + "contains_refs": [ad], }, }, ) f_ref = stix2.v21.File( name="foo", extensions={ "archive-ext": { - "contains_refs": [ad.id, ], + "contains_refs": [ad.id], }, }, ) @@ -1229,9 +1231,11 @@ def test_process_example_empty_error(): def test_process_example_empty_with_extensions(): with pytest.raises(stix2.exceptions.InvalidValueError) as excinfo: - stix2.v21.Process(extensions={ - "windows-process-ext": {}, - }) + stix2.v21.Process( + extensions={ + "windows-process-ext": {}, + }, + ) assert excinfo.value.cls == stix2.v21.Process @@ -1276,50 +1280,56 @@ def test_process_example_extensions_empty(): def test_process_example_with_WindowsProcessExt_Object(): - p = stix2.v21.Process(extensions={ - "windows-process-ext": stix2.v21.WindowsProcessExt( - aslr_enabled=True, - dep_enabled=True, - priority="HIGH_PRIORITY_CLASS", - owner_sid="S-1-5-21-186985262-1144665072-74031268-1309", - ), # noqa - }) + p = stix2.v21.Process( + extensions={ + "windows-process-ext": stix2.v21.WindowsProcessExt( + aslr_enabled=True, + dep_enabled=True, + priority="HIGH_PRIORITY_CLASS", + owner_sid="S-1-5-21-186985262-1144665072-74031268-1309", + ), # noqa + }, + ) assert p.extensions["windows-process-ext"].dep_enabled assert p.extensions["windows-process-ext"].owner_sid == "S-1-5-21-186985262-1144665072-74031268-1309" def test_process_example_with_WindowsServiceExt(): - p = stix2.v21.Process(extensions={ - "windows-service-ext": { - "service_name": "sirvizio", - "display_name": "Sirvizio", - "start_type": "SERVICE_AUTO_START", - "service_type": "SERVICE_WIN32_OWN_PROCESS", - "service_status": "SERVICE_RUNNING", + p = stix2.v21.Process( + extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING", + }, }, - }) + ) assert p.extensions["windows-service-ext"].service_name == "sirvizio" assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" def test_process_example_with_WindowsProcessServiceExt(): - p = stix2.v21.Process(extensions={ - "windows-service-ext": { - "service_name": "sirvizio", - "display_name": "Sirvizio", - "start_type": "SERVICE_AUTO_START", - "service_type": "SERVICE_WIN32_OWN_PROCESS", - "service_status": "SERVICE_RUNNING", + p = stix2.v21.Process( + extensions={ + "windows-service-ext": { + "service_name": "sirvizio", + "display_name": "Sirvizio", + "start_type": "SERVICE_AUTO_START", + "service_type": "SERVICE_WIN32_OWN_PROCESS", + "service_status": "SERVICE_RUNNING", + }, + "windows-process-ext": { + "aslr_enabled": True, + "dep_enabled": True, + "priority": "HIGH_PRIORITY_CLASS", + "owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309", + }, }, - "windows-process-ext": { - "aslr_enabled": True, - "dep_enabled": True, - "priority": "HIGH_PRIORITY_CLASS", - "owner_sid": "S-1-5-21-186985262-1144665072-74031268-1309", - }, - }) + ) assert p.extensions["windows-service-ext"].service_name == "sirvizio" assert p.extensions["windows-service-ext"].service_type == "SERVICE_WIN32_OWN_PROCESS" diff --git a/stix2/test/v21/test_pattern_expressions.py b/stix2/test/v21/test_pattern_expressions.py index 4f365d7..d7afe5c 100644 --- a/stix2/test/v21/test_pattern_expressions.py +++ b/stix2/test/v21/test_pattern_expressions.py @@ -444,10 +444,12 @@ def test_multiple_qualifiers(): def test_set_op(): - exp = stix2.ObservationExpression(stix2.IsSubsetComparisonExpression( - "network-traffic:dst_ref.value", - "2001:0db8:dead:beef:0000:0000:0000:0000/64", - )) + exp = stix2.ObservationExpression( + stix2.IsSubsetComparisonExpression( + "network-traffic:dst_ref.value", + "2001:0db8:dead:beef:0000:0000:0000:0000/64", + ), + ) assert str(exp) == "[network-traffic:dst_ref.value ISSUBSET '2001:0db8:dead:beef:0000:0000:0000:0000/64']" @@ -712,12 +714,12 @@ def test_parsing_boolean(): def test_parsing_mixed_boolean_expression_1(): - patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]",) + patt_obj = create_pattern_object("[a:b = 1 AND a:b = 2 OR a:b = 3]") assert str(patt_obj) == "[a:b = 1 AND a:b = 2 OR a:b = 3]" def test_parsing_mixed_boolean_expression_2(): - patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]",) + patt_obj = create_pattern_object("[a:b = 1 OR a:b = 2 AND a:b = 3]") assert str(patt_obj) == "[a:b = 1 OR a:b = 2 AND a:b = 3]" diff --git a/stix2/test/v21/test_utils.py b/stix2/test/v21/test_utils.py index f64cec2..41bc087 100644 --- a/stix2/test/v21/test_utils.py +++ b/stix2/test/v21/test_utils.py @@ -71,7 +71,7 @@ def test_parse_datetime_invalid(ts): {"a": 1}, '{"a": 1}', StringIO(u'{"a": 1}'), - [("a", 1,)], + [("a", 1)], ], ) def test_get_dict(data): diff --git a/stix2/test/v21/test_versioning.py b/stix2/test/v21/test_versioning.py index adfa7a0..f10877f 100644 --- a/stix2/test/v21/test_versioning.py +++ b/stix2/test/v21/test_versioning.py @@ -50,10 +50,12 @@ def test_making_new_version_with_embedded_object(): **CAMPAIGN_MORE_KWARGS ) - campaign_v2 = campaign_v1.new_version(external_references=[{ - "source_name": "capec", - "external_id": "CAPEC-164", - }]) + campaign_v2 = campaign_v1.new_version( + external_references=[{ + "source_name": "capec", + "external_id": "CAPEC-164", + }], + ) assert campaign_v1.id == campaign_v2.id assert campaign_v1.spec_version == campaign_v2.spec_version diff --git a/stix2/utils.py b/stix2/utils.py index 1b88f72..22efcc2 100644 --- a/stix2/utils.py +++ b/stix2/utils.py @@ -71,9 +71,11 @@ def _to_enum(value, enum_type, enum_default=None): elif isinstance(value, six.string_types): value = enum_type[value.upper()] else: - raise TypeError("Not a valid {}: {}".format( - enum_type.__name__, value, - )) + raise TypeError( + "Not a valid {}: {}".format( + enum_type.__name__, value, + ), + ) return value diff --git a/stix2/v20/observables.py b/stix2/v20/observables.py index cec979e..56262b0 100644 --- a/stix2/v20/observables.py +++ b/stix2/v20/observables.py @@ -440,24 +440,28 @@ class SocketExt(_Extension): ('is_blocking', BooleanProperty()), ('is_listening', BooleanProperty()), ( - 'protocol_family', EnumProperty(allowed=[ - "PF_INET", - "PF_IPX", - "PF_APPLETALK", - "PF_INET6", - "PF_AX25", - "PF_NETROM", - ]), + 'protocol_family', EnumProperty( + allowed=[ + "PF_INET", + "PF_IPX", + "PF_APPLETALK", + "PF_INET6", + "PF_AX25", + "PF_NETROM", + ], + ), ), ('options', DictionaryProperty(spec_version="2.0")), ( - 'socket_type', EnumProperty(allowed=[ - "SOCK_STREAM", - "SOCK_DGRAM", - "SOCK_RAW", - "SOCK_RDM", - "SOCK_SEQPACKET", - ]), + 'socket_type', EnumProperty( + allowed=[ + "SOCK_STREAM", + "SOCK_DGRAM", + "SOCK_RAW", + "SOCK_RDM", + "SOCK_SEQPACKET", + ], + ), ), ('socket_descriptor', IntegerProperty()), ('socket_handle', IntegerProperty()), @@ -537,33 +541,39 @@ class WindowsServiceExt(_Extension): ('display_name', StringProperty()), ('group_name', StringProperty()), ( - 'start_type', EnumProperty(allowed=[ - "SERVICE_AUTO_START", - "SERVICE_BOOT_START", - "SERVICE_DEMAND_START", - "SERVICE_DISABLED", - "SERVICE_SYSTEM_ALERT", - ]), + 'start_type', EnumProperty( + allowed=[ + "SERVICE_AUTO_START", + "SERVICE_BOOT_START", + "SERVICE_DEMAND_START", + "SERVICE_DISABLED", + "SERVICE_SYSTEM_ALERT", + ], + ), ), ('service_dll_refs', ListProperty(ObjectReferenceProperty(valid_types='file'))), ( - 'service_type', EnumProperty(allowed=[ - "SERVICE_KERNEL_DRIVER", - "SERVICE_FILE_SYSTEM_DRIVER", - "SERVICE_WIN32_OWN_PROCESS", - "SERVICE_WIN32_SHARE_PROCESS", - ]), + 'service_type', EnumProperty( + allowed=[ + "SERVICE_KERNEL_DRIVER", + "SERVICE_FILE_SYSTEM_DRIVER", + "SERVICE_WIN32_OWN_PROCESS", + "SERVICE_WIN32_SHARE_PROCESS", + ], + ), ), ( - 'service_status', EnumProperty(allowed=[ - "SERVICE_CONTINUE_PENDING", - "SERVICE_PAUSE_PENDING", - "SERVICE_PAUSED", - "SERVICE_RUNNING", - "SERVICE_START_PENDING", - "SERVICE_STOP_PENDING", - "SERVICE_STOPPED", - ]), + 'service_status', EnumProperty( + allowed=[ + "SERVICE_CONTINUE_PENDING", + "SERVICE_PAUSE_PENDING", + "SERVICE_PAUSED", + "SERVICE_RUNNING", + "SERVICE_START_PENDING", + "SERVICE_STOP_PENDING", + "SERVICE_STOPPED", + ], + ), ), ]) @@ -687,21 +697,23 @@ class WindowsRegistryValueType(_STIXBase20): ('name', StringProperty(required=True)), ('data', StringProperty()), ( - 'data_type', EnumProperty(allowed=[ - "REG_NONE", - "REG_SZ", - "REG_EXPAND_SZ", - "REG_BINARY", - "REG_DWORD", - "REG_DWORD_BIG_ENDIAN", - "REG_LINK", - "REG_MULTI_SZ", - "REG_RESOURCE_LIST", - "REG_FULL_RESOURCE_DESCRIPTION", - "REG_RESOURCE_REQUIREMENTS_LIST", - "REG_QWORD", - "REG_INVALID_TYPE", - ]), + 'data_type', EnumProperty( + allowed=[ + "REG_NONE", + "REG_SZ", + "REG_EXPAND_SZ", + "REG_BINARY", + "REG_DWORD", + "REG_DWORD_BIG_ENDIAN", + "REG_LINK", + "REG_MULTI_SZ", + "REG_RESOURCE_LIST", + "REG_FULL_RESOURCE_DESCRIPTION", + "REG_RESOURCE_REQUIREMENTS_LIST", + "REG_QWORD", + "REG_INVALID_TYPE", + ], + ), ), ]) @@ -790,11 +802,13 @@ def CustomObservable(type='x-custom-observable', properties=None): """ def wrapper(cls): - _properties = list(itertools.chain.from_iterable([ - [('type', TypeProperty(type, spec_version='2.0'))], - properties, - [('extensions', ExtensionsProperty(spec_version="2.0", enclosing_type=type))], - ])) + _properties = list( + itertools.chain.from_iterable([ + [('type', TypeProperty(type, spec_version='2.0'))], + properties, + [('extensions', ExtensionsProperty(spec_version="2.0", enclosing_type=type))], + ]), + ) return _custom_observable_builder(cls, type, _properties, '2.0', _Observable) return wrapper diff --git a/stix2/v20/sdo.py b/stix2/v20/sdo.py index 8cbd94b..e1f6410 100644 --- a/stix2/v20/sdo.py +++ b/stix2/v20/sdo.py @@ -356,23 +356,25 @@ def CustomObject(type='x-custom-type', properties=None): """ def wrapper(cls): - _properties = list(itertools.chain.from_iterable([ - [ - ('type', TypeProperty(type, spec_version='2.0')), - ('id', IDProperty(type, spec_version='2.0')), - ('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.0')), - ('created', TimestampProperty(default=lambda: NOW, precision='millisecond')), - ('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')), - ], - [x for x in properties if not x[0].startswith('x_')], - [ - ('revoked', BooleanProperty(default=lambda: False)), - ('labels', ListProperty(StringProperty)), - ('external_references', ListProperty(ExternalReference)), - ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.0'))), - ('granular_markings', ListProperty(GranularMarking)), - ], - sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]), - ])) + _properties = list( + itertools.chain.from_iterable([ + [ + ('type', TypeProperty(type, spec_version='2.0')), + ('id', IDProperty(type, spec_version='2.0')), + ('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.0')), + ('created', TimestampProperty(default=lambda: NOW, precision='millisecond')), + ('modified', TimestampProperty(default=lambda: NOW, precision='millisecond')), + ], + [x for x in properties if not x[0].startswith('x_')], + [ + ('revoked', BooleanProperty(default=lambda: False)), + ('labels', ListProperty(StringProperty)), + ('external_references', ListProperty(ExternalReference)), + ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.0'))), + ('granular_markings', ListProperty(GranularMarking)), + ], + sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]), + ]), + ) return _custom_object_builder(cls, type, _properties, '2.0', _DomainObject) return wrapper diff --git a/stix2/v21/observables.py b/stix2/v21/observables.py index 0859564..359cc6f 100644 --- a/stix2/v21/observables.py +++ b/stix2/v21/observables.py @@ -505,13 +505,15 @@ class SocketExt(_Extension): ('is_listening', BooleanProperty()), ('options', DictionaryProperty(spec_version='2.1')), ( - 'socket_type', EnumProperty(allowed=[ - "SOCK_STREAM", - "SOCK_DGRAM", - "SOCK_RAW", - "SOCK_RDM", - "SOCK_SEQPACKET", - ]), + 'socket_type', EnumProperty( + allowed=[ + "SOCK_STREAM", + "SOCK_DGRAM", + "SOCK_RAW", + "SOCK_RDM", + "SOCK_SEQPACKET", + ], + ), ), ('socket_descriptor', IntegerProperty(min=0)), ('socket_handle', IntegerProperty()), @@ -612,12 +614,14 @@ class WindowsProcessExt(_Extension): ('window_title', StringProperty()), ('startup_info', DictionaryProperty(spec_version='2.1')), ( - 'integrity_level', EnumProperty(allowed=[ - "low", - "medium", - "high", - "system", - ]), + 'integrity_level', EnumProperty( + allowed=[ + "low", + "medium", + "high", + "system", + ], + ), ), ]) @@ -634,33 +638,39 @@ class WindowsServiceExt(_Extension): ('display_name', StringProperty()), ('group_name', StringProperty()), ( - 'start_type', EnumProperty(allowed=[ - "SERVICE_AUTO_START", - "SERVICE_BOOT_START", - "SERVICE_DEMAND_START", - "SERVICE_DISABLED", - "SERVICE_SYSTEM_ALERT", - ]), + 'start_type', EnumProperty( + allowed=[ + "SERVICE_AUTO_START", + "SERVICE_BOOT_START", + "SERVICE_DEMAND_START", + "SERVICE_DISABLED", + "SERVICE_SYSTEM_ALERT", + ], + ), ), ('service_dll_refs', ListProperty(ReferenceProperty(valid_types='file', spec_version="2.1"))), ( - 'service_type', EnumProperty(allowed=[ - "SERVICE_KERNEL_DRIVER", - "SERVICE_FILE_SYSTEM_DRIVER", - "SERVICE_WIN32_OWN_PROCESS", - "SERVICE_WIN32_SHARE_PROCESS", - ]), + 'service_type', EnumProperty( + allowed=[ + "SERVICE_KERNEL_DRIVER", + "SERVICE_FILE_SYSTEM_DRIVER", + "SERVICE_WIN32_OWN_PROCESS", + "SERVICE_WIN32_SHARE_PROCESS", + ], + ), ), ( - 'service_status', EnumProperty(allowed=[ - "SERVICE_CONTINUE_PENDING", - "SERVICE_PAUSE_PENDING", - "SERVICE_PAUSED", - "SERVICE_RUNNING", - "SERVICE_START_PENDING", - "SERVICE_STOP_PENDING", - "SERVICE_STOPPED", - ]), + 'service_status', EnumProperty( + allowed=[ + "SERVICE_CONTINUE_PENDING", + "SERVICE_PAUSE_PENDING", + "SERVICE_PAUSED", + "SERVICE_RUNNING", + "SERVICE_START_PENDING", + "SERVICE_STOP_PENDING", + "SERVICE_STOPPED", + ], + ), ), ]) @@ -808,21 +818,23 @@ class WindowsRegistryValueType(_STIXBase21): ('name', StringProperty()), ('data', StringProperty()), ( - 'data_type', EnumProperty(allowed=[ - "REG_NONE", - "REG_SZ", - "REG_EXPAND_SZ", - "REG_BINARY", - "REG_DWORD", - "REG_DWORD_BIG_ENDIAN", - "REG_LINK", - "REG_MULTI_SZ", - "REG_RESOURCE_LIST", - "REG_FULL_RESOURCE_DESCRIPTION", - "REG_RESOURCE_REQUIREMENTS_LIST", - "REG_QWORD", - "REG_INVALID_TYPE", - ]), + 'data_type', EnumProperty( + allowed=[ + "REG_NONE", + "REG_SZ", + "REG_EXPAND_SZ", + "REG_BINARY", + "REG_DWORD", + "REG_DWORD_BIG_ENDIAN", + "REG_LINK", + "REG_MULTI_SZ", + "REG_RESOURCE_LIST", + "REG_FULL_RESOURCE_DESCRIPTION", + "REG_RESOURCE_REQUIREMENTS_LIST", + "REG_QWORD", + "REG_INVALID_TYPE", + ], + ), ), ]) @@ -935,13 +947,15 @@ def CustomObservable(type='x-custom-observable', properties=None, id_contrib_pro """ def wrapper(cls): - _properties = list(itertools.chain.from_iterable([ - [('type', TypeProperty(type, spec_version='2.1'))], - [('spec_version', StringProperty(fixed='2.1'))], - [('id', IDProperty(type, spec_version='2.1'))], - properties, - [('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))], - ])) + _properties = list( + itertools.chain.from_iterable([ + [('type', TypeProperty(type, spec_version='2.1'))], + [('spec_version', StringProperty(fixed='2.1'))], + [('id', IDProperty(type, spec_version='2.1'))], + properties, + [('extensions', ExtensionsProperty(spec_version='2.1', enclosing_type=type))], + ]), + ) return _custom_observable_builder(cls, type, _properties, '2.1', _Observable, id_contrib_props) return wrapper diff --git a/stix2/v21/sdo.py b/stix2/v21/sdo.py index 4108001..c078967 100644 --- a/stix2/v21/sdo.py +++ b/stix2/v21/sdo.py @@ -789,27 +789,29 @@ def CustomObject(type='x-custom-type', properties=None): """ def wrapper(cls): - _properties = list(itertools.chain.from_iterable([ - [ - ('type', TypeProperty(type, spec_version='2.1')), - ('spec_version', StringProperty(fixed='2.1')), - ('id', IDProperty(type, spec_version='2.1')), - ('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')), - ('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), - ('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), - ], - [x for x in properties if not x[0].startswith('x_')], - [ - ('revoked', BooleanProperty(default=lambda: False)), - ('labels', ListProperty(StringProperty)), - ('confidence', IntegerProperty()), - ('lang', StringProperty()), - ('external_references', ListProperty(ExternalReference)), - ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))), - ('granular_markings', ListProperty(GranularMarking)), - ], - sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]), - ])) + _properties = list( + itertools.chain.from_iterable([ + [ + ('type', TypeProperty(type, spec_version='2.1')), + ('spec_version', StringProperty(fixed='2.1')), + ('id', IDProperty(type, spec_version='2.1')), + ('created_by_ref', ReferenceProperty(valid_types='identity', spec_version='2.1')), + ('created', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), + ('modified', TimestampProperty(default=lambda: NOW, precision='millisecond', precision_constraint='min')), + ], + [x for x in properties if not x[0].startswith('x_')], + [ + ('revoked', BooleanProperty(default=lambda: False)), + ('labels', ListProperty(StringProperty)), + ('confidence', IntegerProperty()), + ('lang', StringProperty()), + ('external_references', ListProperty(ExternalReference)), + ('object_marking_refs', ListProperty(ReferenceProperty(valid_types='marking-definition', spec_version='2.1'))), + ('granular_markings', ListProperty(GranularMarking)), + ], + sorted([x for x in properties if x[0].startswith('x_')], key=lambda x: x[0]), + ]), + ) return _custom_object_builder(cls, type, _properties, '2.1', _DomainObject) return wrapper