cti-python-stix2/stix2/__init__.py

141 lines
4.4 KiB
Python

"""Python APIs for STIX 2."""
# flake8: noqa
from . import exceptions
from .bundle import Bundle
from .observables import (URL, AlternateDataStream, ArchiveExt, Artifact,
AutonomousSystem, Directory, DomainName,
EmailAddress, EmailMessage, EmailMIMEComponent, File,
HTTPRequestExt, ICMPExt, IPv4Address, IPv6Address,
MACAddress, Mutex, NetworkTraffic, NTFSExt, PDFExt,
Process, RasterImageExt, SocketExt, Software, TCPExt,
UNIXAccountExt, UserAccount, WindowsPEBinaryExt,
WindowsPEOptionalHeaderType, WindowsPESection,
WindowsProcessExt, WindowsRegistryKey,
WindowsRegistryValueType, WindowsServiceExt,
X509Certificate, X509V3ExtenstionsType)
from .other import (ExternalReference, GranularMarking, KillChainPhase,
MarkingDefinition, StatementMarking, TLPMarking)
from .sdo import (AttackPattern, Campaign, CourseOfAction, Identity, Indicator,
IntrusionSet, Malware, ObservedData, Report, ThreatActor,
Tool, Vulnerability)
from .sro import Relationship, Sighting
from .utils import get_dict
OBJ_MAP = {
'attack-pattern': AttackPattern,
'campaign': Campaign,
'course-of-action': CourseOfAction,
'identity': Identity,
'indicator': Indicator,
'intrusion-set': IntrusionSet,
'malware': Malware,
'marking-definition': MarkingDefinition,
'observed-data': ObservedData,
'report': Report,
'relationship': Relationship,
'threat-actor': ThreatActor,
'tool': Tool,
'sighting': Sighting,
'vulnerability': Vulnerability,
}
OBJ_MAP_OBSERVABLE = {
'artifact': Artifact,
'autonomous-system': AutonomousSystem,
'directory': Directory,
'domain-name': DomainName,
'email-address': EmailAddress,
'email-message': EmailMessage,
'file': File,
'ipv4-addr': IPv4Address,
'ipv6-addr': IPv6Address,
'mac-addr': MACAddress,
'mutex': Mutex,
'network-traffic': NetworkTraffic,
'process': Process,
'software': Software,
'url': URL,
'user-account': UserAccount,
'windows-registry-key': WindowsRegistryKey,
'x509-certificate': X509Certificate,
}
EXT_MAP_FILE = {
'archive-ext': ArchiveExt,
'ntfs-ext': NTFSExt,
'pdf-ext': PDFExt,
'raster-image-ext': RasterImageExt,
'windows-pebinary-ext': WindowsPEBinaryExt
}
EXT_MAP_NETWORK_TRAFFIC = {
'http-request-ext': HTTPRequestExt,
'icmp-ext': ICMPExt,
'socket-ext': SocketExt,
'tcp-ext': TCPExt,
}
EXT_MAP_PROCESS = {
'windows-process-ext': WindowsProcessExt,
'windows-service-ext': WindowsServiceExt,
}
EXT_MAP_USER_ACCOUNT = {
'unix-account-ext': UNIXAccountExt,
}
EXT_MAP = {
'file': EXT_MAP_FILE,
'network-traffic': EXT_MAP_NETWORK_TRAFFIC,
'process': EXT_MAP_PROCESS,
'user-account': EXT_MAP_USER_ACCOUNT,
}
def parse(data):
"""Deserialize a string or file-like object into a STIX object"""
obj = get_dict(data)
if 'type' not in obj:
# TODO parse external references, kill chain phases, and granular markings
pass
else:
try:
obj_class = OBJ_MAP[obj['type']]
except KeyError:
# TODO handle custom objects
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
return obj_class(**obj)
return obj
def parse_observable(data, _valid_refs):
"""Deserialize a string or file-like object into a STIX Cyber Observable
object.
"""
obj = get_dict(data)
obj['_valid_refs'] = _valid_refs
if 'type' not in obj:
raise ValueError("'type' is a required field!")
try:
obj_class = OBJ_MAP_OBSERVABLE[obj['type']]
except KeyError:
# TODO handle custom observable objects
raise ValueError("Can't parse unknown object type '%s'!" % obj['type'])
if 'extensions' in obj and obj['type'] in EXT_MAP:
for name, ext in obj['extensions'].items():
if name not in EXT_MAP[obj['type']]:
raise ValueError("Can't parse Unknown extension type '%s' for object type '%s'!" % (name, obj['type']))
ext_class = EXT_MAP[obj['type']][name]
obj['extensions'][name] = ext_class(**obj['extensions'][name])
return obj_class(**obj)