cti-python-stix2/stix2/__init__.py

166 lines
5.4 KiB
Python

"""Python APIs for STIX 2."""
# flake8: noqa
from . import exceptions
from .bundle import Bundle
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, Directory, DomainName,
EmailAddress, EmailMessage, EmailMIMEComponent, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType)
from .other import (TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE,
ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
Tool, Vulnerability)
from .sro import Relationship, Sighting
from .utils import get_dict
OBJ_MAP = {
'attack-pattern': AttackPattern,
'campaign': Campaign,
'course-of-action': CourseOfAction,
'identity': Identity,
'indicator': Indicator,
'intrusion-set': IntrusionSet,
'malware': Malware,
'marking-definition': MarkingDefinition,
'observed-data': ObservedData,
'report': Report,
'relationship': Relationship,
'threat-actor': ThreatActor,
'tool': Tool,
'sighting': Sighting,
'vulnerability': Vulnerability,
}
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}
def parse(data, allow_custom=False):
"""Deserialize a string or file-like object into a STIX object.
Args:
data: The STIX 2 string to be parsed.
allow_custom (bool): Whether to allow custom properties or not. Default: False.
Returns:
An instantiated Python STIX object.
"""
obj = get_dict(data)
if 'type' not in obj:
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP[obj['type']]
except KeyError:
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom types, use the CustomObject decorator." % obj['type'])
return obj_class(allow_custom=allow_custom, **obj)
def parse_observable(data, _valid_refs=[], allow_custom=False):
"""Deserialize a string or file-like object into a STIX Cyber Observable object.
Args:
data: The STIX 2 string to be parsed.
_valid_refs: A list of object references valid for the scope of the object being parsed.
allow_custom: Whether to allow custom properties or not. Default: False.
Returns:
An instantiated Python STIX Cyber Observable object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
if 'type' not in obj:
raise exceptions.ParseError("Can't parse object with no 'type' property: %s" % str(obj))
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
raise exceptions.ParseError("Can't parse unknown object type '%s'! For custom observables, use the CustomObservable decorator." % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise exceptions.ParseError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(allow_custom=allow_custom, **obj['extensions'][name])
return obj_class(allow_custom=allow_custom, **obj)
def _register_type(new_type):
"""Register a custom STIX Object type.
"""
OBJ_MAP[new_type._type] = new_type
def _register_observable(new_observable):
"""Register a custom STIX Cyber Observable type.
"""
OBJ_MAP_OBSERVABLE[new_observable._type] = new_observable