mirror of https://github.com/MISP/MISP
chg: [stix1 export] Improvement of some functions
parent
d12b47c73a
commit
8c9f925ec2
|
@ -38,6 +38,7 @@ from cybox.common import Hash, ByteRun, ByteRuns
|
||||||
from stix.extensions.test_mechanism.snort_test_mechanism import *
|
from stix.extensions.test_mechanism.snort_test_mechanism import *
|
||||||
from stix.extensions.identity.ciq_identity_3_0 import CIQIdentity3_0Instance, STIXCIQIdentity3_0, PartyName, ElectronicAddressIdentifier, FreeTextAddress
|
from stix.extensions.identity.ciq_identity_3_0 import CIQIdentity3_0Instance, STIXCIQIdentity3_0, PartyName, ElectronicAddressIdentifier, FreeTextAddress
|
||||||
from stix.extensions.identity.ciq_identity_3_0 import Address as ciq_Address
|
from stix.extensions.identity.ciq_identity_3_0 import Address as ciq_Address
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from stix.utils import idgen
|
from stix.utils import idgen
|
||||||
|
@ -55,7 +56,7 @@ TLP_mapping = {'0' : 'AMBER', '1' : 'GREEN', '2' : 'GREEN', '3' : 'GREEN', '4' :
|
||||||
TLP_order = {'RED' : 4, 'AMBER' : 3, 'GREEN' : 2, 'WHITE' : 1}
|
TLP_order = {'RED' : 4, 'AMBER' : 3, 'GREEN' : 2, 'WHITE' : 1}
|
||||||
confidence_mapping = {False : 'None', True : 'High'}
|
confidence_mapping = {False : 'None', True : 'High'}
|
||||||
|
|
||||||
not_implemented_attributes = ['yara', 'pattern-in-traffic', 'pattern-in-memory']
|
not_implemented_attributes = ['yara', 'snort', 'pattern-in-traffic', 'pattern-in-memory']
|
||||||
|
|
||||||
non_indicator_attributes = ['text', 'comment', 'other', 'link', 'target-user', 'target-email', 'target-machine', 'target-org', 'target-location', 'target-external', 'vulnerability', 'attachment']
|
non_indicator_attributes = ['text', 'comment', 'other', 'link', 'target-user', 'target-email', 'target-machine', 'target-org', 'target-location', 'target-external', 'vulnerability', 'attachment']
|
||||||
|
|
||||||
|
@ -64,7 +65,13 @@ hash_type_attributes = {"single":["md5", "sha1", "sha224", "sha256", "sha384", "
|
||||||
# mapping for the attributes that can go through the simpleobservable script
|
# mapping for the attributes that can go through the simpleobservable script
|
||||||
misp_cybox_name = {"domain" : "DomainName", "hostname" : "Hostname", "url" : "URI", "AS" : "AutonomousSystem", "mutex" : "Mutex", "named pipe" : "Pipe", "link" : "URI"}
|
misp_cybox_name = {"domain" : "DomainName", "hostname" : "Hostname", "url" : "URI", "AS" : "AutonomousSystem", "mutex" : "Mutex", "named pipe" : "Pipe", "link" : "URI"}
|
||||||
cybox_name_attribute = {"DomainName" : "value", "Hostname" : "hostname_value", "URI" : "value", "AutonomousSystem" : "number", "Pipe" : "name", "Mutex" : "name"}
|
cybox_name_attribute = {"DomainName" : "value", "Hostname" : "hostname_value", "URI" : "value", "AutonomousSystem" : "number", "Pipe" : "name", "Mutex" : "name"}
|
||||||
misp_indicator_type = {"domain" : "Domain Watchlist", "hostname" : "Domain Watchlist", "url" : "URL Watchlist", "AS" : "", "mutex" : "Host Characteristics", "named pipe" : "Host Characteristics", "link" : ""}
|
misp_indicator_type = {"AS" : "", "mutex" : "Host Characteristics", "named pipe" : "Host Characteristics",
|
||||||
|
"email-attachment": "Malicious E-mail", "url" : "URL Watchlist"}
|
||||||
|
misp_indicator_type.update(dict.fromkeys(hash_type_attributes["single"] + hash_type_attributes["composite"] + ["filename"] + ["attachment"], "File Hash Watchlist"))
|
||||||
|
misp_indicator_type.update(dict.fromkeys(["email-src", "email-dst", "email-subject", "email-reply-to", "email-attachment"], "Malicious E-mail"))
|
||||||
|
misp_indicator_type.update(dict.fromkeys(["ip-src", "ip-dst", "ip-src|port", "ip-dst|port"], "IP Watchlist"))
|
||||||
|
misp_indicator_type.update(dict.fromkeys(["domain", "domain|ip", "hostname"], "Domain Watchlist"))
|
||||||
|
misp_indicator_type.update(dict.fromkeys(["regkey", "regkey|value"], "Host Characteristics"))
|
||||||
cybox_validation = {"AutonomousSystem": "isInt"}
|
cybox_validation = {"AutonomousSystem": "isInt"}
|
||||||
|
|
||||||
# mapping Windows Registry Hives and their abbreviations
|
# mapping Windows Registry Hives and their abbreviations
|
||||||
|
@ -110,6 +117,7 @@ class StixBuilder(object):
|
||||||
except TypeError:
|
except TypeError:
|
||||||
idgen.set_id_namespace(Namespace(namespace[0], namespace[1], "MISP"))
|
idgen.set_id_namespace(Namespace(namespace[0], namespace[1], "MISP"))
|
||||||
self.namespace_prefix = idgen.get_id_namespace_alias()
|
self.namespace_prefix = idgen.get_id_namespace_alias()
|
||||||
|
## MAPPING FOR ATTRIBUTES
|
||||||
self.simple_type_to_method = {"port": self.generate_port_observable, "domain|ip": self.generate_domain_ip_observable}
|
self.simple_type_to_method = {"port": self.generate_port_observable, "domain|ip": self.generate_domain_ip_observable}
|
||||||
self.simple_type_to_method.update(dict.fromkeys(hash_type_attributes["single"] + hash_type_attributes["composite"] + ["filename"] + ["attachment"], self.resolve_file_observable))
|
self.simple_type_to_method.update(dict.fromkeys(hash_type_attributes["single"] + hash_type_attributes["composite"] + ["filename"] + ["attachment"], self.resolve_file_observable))
|
||||||
self.simple_type_to_method.update(dict.fromkeys(["ip-src", "ip-dst", "ip-src|port", "ip-dst|port"], self.generate_ip_observable))
|
self.simple_type_to_method.update(dict.fromkeys(["ip-src", "ip-dst", "ip-src|port", "ip-dst|port"], self.generate_ip_observable))
|
||||||
|
@ -167,18 +175,23 @@ class StixBuilder(object):
|
||||||
event_tags = self.misp_event.Tag
|
event_tags = self.misp_event.Tag
|
||||||
if event_tags:
|
if event_tags:
|
||||||
Tags['event'] = event_tags
|
Tags['event'] = event_tags
|
||||||
self.set_tag(incident, event_tags)
|
for tag in event_tags:
|
||||||
|
tag_name = "MISP Tag: {}".format(tag['name'])
|
||||||
|
self.add_journal_entry(incident, tag_name)
|
||||||
external_id = ExternalID(value=str(self.misp_event.id), source="MISP Event")
|
external_id = ExternalID(value=str(self.misp_event.id), source="MISP Event")
|
||||||
incident.add_external_id(external_id)
|
incident.add_external_id(external_id)
|
||||||
incident_status_name = status_mapping.get(str(self.misp_event.analysis), None)
|
incident_status_name = status_mapping.get(str(self.misp_event.analysis), None)
|
||||||
if incident_status_name is not None:
|
if incident_status_name is not None:
|
||||||
incident.status = IncidentStatus(incident_status_name)
|
incident.status = IncidentStatus(incident_status_name)
|
||||||
self.set_tlp(incident, self.misp_event.distribution, event_tags)
|
try:
|
||||||
self.set_src(incident, self.misp_event.Org.get('name'))
|
incident.handling = self.set_tlp(self.misp_event.distribution, event_tags)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
incident.information_source = self.set_src()
|
||||||
self.orgc_name = self.misp_event.Orgc.get('name')
|
self.orgc_name = self.misp_event.Orgc.get('name')
|
||||||
self.set_rep(incident)
|
incident.reporter = self.set_rep()
|
||||||
self.ttps = []
|
self.ttps = []
|
||||||
self.resolve_attributes(incident, self.misp_event.attributes, Tags)
|
self.resolve_attributes(incident, Tags)
|
||||||
self.resolve_objects(incident, Tags)
|
self.resolve_objects(incident, Tags)
|
||||||
self.add_related_indicators(incident)
|
self.add_related_indicators(incident)
|
||||||
return incident
|
return incident
|
||||||
|
@ -195,17 +208,20 @@ class StixBuilder(object):
|
||||||
incident_time.incident_reported = timestamp
|
incident_time.incident_reported = timestamp
|
||||||
incident.time = incident_time
|
incident.time = incident_time
|
||||||
|
|
||||||
def resolve_attributes(self, incident, attributes, tags):
|
def resolve_attributes(self, incident, tags):
|
||||||
for attribute in attributes:
|
for attribute in self.misp_event.attributes:
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
if attribute_type in not_implemented_attributes:
|
if attribute_type in not_implemented_attributes:
|
||||||
journal_entry = "!Not implemented attribute category/type combination caught! attribute[{}][{}]: {}".format(attribute.category,
|
if attribute_type == "snort":
|
||||||
attribute_type, attribute.value)
|
self.generate_TM(incident, attribute, tags)
|
||||||
self.add_journal_entry(incident, journal_entry)
|
else:
|
||||||
|
journal_entry = "!Not implemented attribute category/type combination caught! attribute[{}][{}]: {}".format(attribute.category,
|
||||||
|
attribute_type, attribute.value)
|
||||||
|
self.add_journal_entry(incident, journal_entry)
|
||||||
elif attribute_type in non_indicator_attributes:
|
elif attribute_type in non_indicator_attributes:
|
||||||
self.handle_non_indicator_attribute(incident, tags, attribute)
|
self.handle_non_indicator_attribute(incident, attribute, tags)
|
||||||
else:
|
else:
|
||||||
self.handle_indicator_attribute(incident, tags, attribute)
|
self.handle_indicator_attribute(incident, attribute, tags)
|
||||||
|
|
||||||
def resolve_objects(self, incident, tags):
|
def resolve_objects(self, incident, tags):
|
||||||
for misp_object in self.misp_event.objects:
|
for misp_object in self.misp_event.objects:
|
||||||
|
@ -237,34 +253,50 @@ class StixBuilder(object):
|
||||||
ittp = TTP(idref=ttp.id_, timestamp=ttp.timestamp)
|
ittp = TTP(idref=ttp.id_, timestamp=ttp.timestamp)
|
||||||
rindicator.item.add_indicated_ttp(ittp)
|
rindicator.item.add_indicated_ttp(ittp)
|
||||||
|
|
||||||
def handle_indicator_attribute(self, incident, tags, attribute):
|
def handle_indicator_attribute(self, incident, attribute, tags):
|
||||||
indicator = self.generate_indicator(attribute, tags, self.orgc_name)
|
if attribute.to_ids:
|
||||||
indicator.add_indicator_type("Malware Artifacts")
|
indicator = self.generate_indicator(attribute, tags)
|
||||||
indicator.add_valid_time_position(ValidTime())
|
indicator.add_indicator_type("Malware Artifacts")
|
||||||
if attribute.type == 'email-attachment':
|
try:
|
||||||
indicator.add_indicator_type("Malicious E-mail")
|
indicator.add_indicator_type(misp_indicator_type[attribute.type])
|
||||||
self.generate_email_attachment_object(indicator, attribute)
|
except:
|
||||||
else:
|
pass
|
||||||
self.generate_observable(indicator, attribute)
|
indicator.add_valid_time_position(ValidTime())
|
||||||
if 'data' in attribute and attribute.type == 'malware-sample':
|
observable = self.handle_attribute(attribute)
|
||||||
observable = self.create_artifact_object(attribute)
|
|
||||||
indicator.add_observable(observable)
|
indicator.add_observable(observable)
|
||||||
related_indicator = RelatedIndicator(indicator, relationship=attribute.category)
|
if 'data' in attribute and attribute.type == "malware-sample":
|
||||||
incident.related_indicators.append(related_indicator)
|
artifact = self.create_artifact_object(attribute)
|
||||||
|
indicator.add_observable(artifact)
|
||||||
|
related_indicator = RelatedIndicator(indicator, relationship=attribute.category)
|
||||||
|
incident.related_indicators.append(related_indicator)
|
||||||
|
else:
|
||||||
|
observable = self.handle_attribute(attribute)
|
||||||
|
related_observable = RelatedObservable(observable, relationship=attribute.category)
|
||||||
|
incident.related_observables.append(related_observable)
|
||||||
|
if attribute.data and attribute.type == "malware-sample":
|
||||||
|
artifact = self.create_artifact_object(attribute)
|
||||||
|
related_artifact = RelatedObservable(artifact)
|
||||||
|
incident.related_observables.append(related_artifact)
|
||||||
|
|
||||||
def handle_non_indicator_attribute(self, incident, tags, attribute):
|
def handle_attribute(self, attribute):
|
||||||
|
if attribute.type == 'email-attachment':
|
||||||
|
observable = self.generate_email_attachment_object(attribute)
|
||||||
|
else:
|
||||||
|
observable = self.generate_observable(attribute)
|
||||||
|
return observable
|
||||||
|
|
||||||
|
def handle_non_indicator_attribute(self, incident, attribute, tags):
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
attribute_category = attribute.category
|
attribute_category = attribute.category
|
||||||
if attribute_type == "vulnerability":
|
if attribute_type == "vulnerability":
|
||||||
self.generate_vulnerability(incident, tags, attribute)
|
ttp = self.generate_vulnerability(attribute, tags)
|
||||||
|
incident.leveraged_ttps.append(self.append_ttp(attribute_category, ttp))
|
||||||
elif attribute_type == "link":
|
elif attribute_type == "link":
|
||||||
if attribute_category == "Payload delivery":
|
self.add_reference(incident, attribute.value)
|
||||||
self.handle_indicator_attribute(incident, tags, attribute)
|
|
||||||
else:
|
|
||||||
self.add_reference(incident, attribute.value)
|
|
||||||
elif attribute_type in ('comment', 'text', 'other'):
|
elif attribute_type in ('comment', 'text', 'other'):
|
||||||
if attribute_category == "Payload type":
|
if attribute_category == "Payload type":
|
||||||
self.generate_ttp(incident, tags, attribute)
|
ttp = self.generate_ttp(attribute, tags)
|
||||||
|
incident.leveraged_ttps.append(self.append_ttp(attribute_category, ttp))
|
||||||
elif attribute_category == "Attribution":
|
elif attribute_category == "Attribution":
|
||||||
ta = self.generate_threat_actor(attribute)
|
ta = self.generate_threat_actor(attribute)
|
||||||
rta = RelatedThreatActor(ta, relationship="Attribution")
|
rta = RelatedThreatActor(ta, relationship="Attribution")
|
||||||
|
@ -280,7 +312,7 @@ class StixBuilder(object):
|
||||||
aa.description = description
|
aa.description = description
|
||||||
incident.affected_assets.append(aa)
|
incident.affected_assets.append(aa)
|
||||||
elif attribute_type.startswith('target-'):
|
elif attribute_type.startswith('target-'):
|
||||||
self.resolve_identity_attribute(incident, attribute)
|
incident.add_victim(self.resolve_identity_attribute(attribute))
|
||||||
elif attribute_type == "attachment":
|
elif attribute_type == "attachment":
|
||||||
observable = self.return_attachment_composition(attribute)
|
observable = self.return_attachment_composition(attribute)
|
||||||
related_observable = RelatedObservable(observable, relationship=attribute.category)
|
related_observable = RelatedObservable(observable, relationship=attribute.category)
|
||||||
|
@ -299,8 +331,7 @@ class StixBuilder(object):
|
||||||
observable.id_ = "{}:{}-{}".format(self.namespace_prefix, id_type, attribute.uuid)
|
observable.id_ = "{}:{}-{}".format(self.namespace_prefix, id_type, attribute.uuid)
|
||||||
return observable
|
return observable
|
||||||
|
|
||||||
def generate_domain_ip_observable(self, indicator, attribute):
|
def generate_domain_ip_observable(self, attribute):
|
||||||
indicator.add_indicator_type("Domain Watchlist")
|
|
||||||
domain, ip = attribute.value.split('|')
|
domain, ip = attribute.value.split('|')
|
||||||
address_object = self.resolve_ip_type(attribute.type, ip)
|
address_object = self.resolve_ip_type(attribute.type, ip)
|
||||||
address_object.parent.id_ = "{}:AddressObject-{}".format(self.namespace_prefix, attribute.uuid)
|
address_object.parent.id_ = "{}:AddressObject-{}".format(self.namespace_prefix, attribute.uuid)
|
||||||
|
@ -318,7 +349,7 @@ class StixBuilder(object):
|
||||||
observable.observable_composition = composite_object
|
observable.observable_composition = composite_object
|
||||||
return observable
|
return observable
|
||||||
|
|
||||||
def generate_email_attachment_object(self, indicator, attribute):
|
def generate_email_attachment_object(self, attribute):
|
||||||
attribute_uuid = attribute.uuid
|
attribute_uuid = attribute.uuid
|
||||||
file_object = File()
|
file_object = File()
|
||||||
file_object.file_name = attribute.value
|
file_object.file_name = attribute.value
|
||||||
|
@ -331,19 +362,12 @@ class StixBuilder(object):
|
||||||
email.parent.id_ = "{}:EmailMessageObject-{}".format(self.namespace_prefix, attribute_uuid)
|
email.parent.id_ = "{}:EmailMessageObject-{}".format(self.namespace_prefix, attribute_uuid)
|
||||||
observable = Observable(email)
|
observable = Observable(email)
|
||||||
observable.id_ = "{}:observable-{}".format(self.namespace_prefix, attribute_uuid)
|
observable.id_ = "{}:observable-{}".format(self.namespace_prefix, attribute_uuid)
|
||||||
indicator.observable = observable
|
return observable
|
||||||
|
|
||||||
def generate_file_observable(self, filename, h_value, fuzzy):
|
def generate_file_observable(self, filename, h_value, fuzzy):
|
||||||
file_object = File()
|
file_object = File()
|
||||||
if filename:
|
if filename:
|
||||||
if '/' in filename or '\\' in filename:
|
self.resolve_filename(file_object, filename)
|
||||||
file_object.file_path = ntpath.dirname(filename)
|
|
||||||
file_object.file_path.condition = "Equals"
|
|
||||||
file_object.file_name = ntpath.basename(filename)
|
|
||||||
file_object.file_name.condition = "Equals"
|
|
||||||
else:
|
|
||||||
file_object.file_name = filename
|
|
||||||
file_object.file_name.condition = "Equals"
|
|
||||||
if h_value:
|
if h_value:
|
||||||
file_object.add_hash(Hash(hash_value=h_value, exact=True))
|
file_object.add_hash(Hash(hash_value=h_value, exact=True))
|
||||||
if fuzzy:
|
if fuzzy:
|
||||||
|
@ -367,13 +391,13 @@ class StixBuilder(object):
|
||||||
file_object._fields[field_type]._inner[0].type_ = Hash.TYPE_SSDEEP
|
file_object._fields[field_type]._inner[0].type_ = Hash.TYPE_SSDEEP
|
||||||
file_object._fields[field_type]._inner[0].type_.condition = "Equals"
|
file_object._fields[field_type]._inner[0].type_.condition = "Equals"
|
||||||
|
|
||||||
def generate_indicator(self, attribute, tags, org):
|
def generate_indicator(self, attribute, tags):
|
||||||
indicator = Indicator(timestamp=attribute.timestamp)
|
indicator = Indicator(timestamp=attribute.timestamp)
|
||||||
indicator.id_ = "{}:indicator-{}".format(namespace[1], attribute.uuid)
|
indicator.id_ = "{}:indicator-{}".format(namespace[1], attribute.uuid)
|
||||||
self.set_prod(indicator, org)
|
indicator.producer = self.set_prod(self.orgc_name)
|
||||||
if attribute.comment:
|
if attribute.comment:
|
||||||
indicator.description = attribute.comment
|
indicator.description = attribute.comment
|
||||||
self.set_tlp(indicator, attribute.distribution, self.merge_tags(tags, attribute))
|
indicator.handling = self.set_tlp(attribute.distribution, self.merge_tags(tags, attribute))
|
||||||
indicator.title = "{}: {} (MISP Attribute #{})".format(attribute.category, attribute.value, attribute.id)
|
indicator.title = "{}: {} (MISP Attribute #{})".format(attribute.category, attribute.value, attribute.id)
|
||||||
indicator.description = indicator.title
|
indicator.description = indicator.title
|
||||||
confidence_description = "Derived from MISP's IDS flag. If an attribute is marked for IDS exports, the confidence will be high, otherwise none"
|
confidence_description = "Derived from MISP's IDS flag. If an attribute is marked for IDS exports, the confidence will be high, otherwise none"
|
||||||
|
@ -383,8 +407,7 @@ class StixBuilder(object):
|
||||||
indicator.confidence = Confidence(value=confidence_value, description=confidence_description, timestamp=attribute.timestamp)
|
indicator.confidence = Confidence(value=confidence_value, description=confidence_description, timestamp=attribute.timestamp)
|
||||||
return indicator
|
return indicator
|
||||||
|
|
||||||
def generate_ip_observable(self, indicator, attribute):
|
def generate_ip_observable(self, attribute):
|
||||||
indicator.add_indicator_type("IP Watchlist")
|
|
||||||
address_object = self.resolve_ip_type(attribute.type, attribute.value)
|
address_object = self.resolve_ip_type(attribute.type, attribute.value)
|
||||||
address_object.parent.id_ = "{}:AddressObject-{}".format(self.namespace_prefix, attribute.uuid)
|
address_object.parent.id_ = "{}:AddressObject-{}".format(self.namespace_prefix, attribute.uuid)
|
||||||
if '|' in attribute.value:
|
if '|' in attribute.value:
|
||||||
|
@ -405,37 +428,30 @@ class StixBuilder(object):
|
||||||
else:
|
else:
|
||||||
return address_object
|
return address_object
|
||||||
|
|
||||||
def generate_observable(self, indicator, attribute):
|
def generate_observable(self, attribute):
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
if attribute_type in ('snort', 'yara'):
|
try:
|
||||||
self.generate_TM(indicator, attribute)
|
observable_property = self.simple_type_to_method[attribute_type](attribute)
|
||||||
else:
|
except KeyError:
|
||||||
observable = None
|
return False
|
||||||
try:
|
if isinstance(observable_property, Observable):
|
||||||
observable_property = self.simple_type_to_method[attribute_type](indicator, attribute)
|
return observable_property
|
||||||
except KeyError:
|
observable_property.condition = "Equals"
|
||||||
return False
|
observable_object = Object(observable_property)
|
||||||
if isinstance(observable_property, Observable):
|
observable_object.id_ = "{}:{}-{}".format(self.namespace_prefix, observable_property.__class__.__name__, attribute.uuid)
|
||||||
return indicator.add_observable(observable_property)
|
observable = Observable(observable_object)
|
||||||
observable_property.condition = "Equals"
|
observable.id_ = "{}:observable-{}".format(self.namespace_prefix, attribute.uuid)
|
||||||
observable_object = Object(observable_property)
|
return observable
|
||||||
observable_object.id_ = "{}:{}-{}".format(self.namespace_prefix, observable_property.__class__.__name__, attribute.uuid)
|
|
||||||
observable = Observable(observable_object)
|
|
||||||
observable.id_ = "{}:observable-{}".format(self.namespace_prefix, attribute.uuid)
|
|
||||||
indicator.add_observable(observable)
|
|
||||||
|
|
||||||
def generate_port_observable(self, indicator, attribute):
|
def generate_port_observable(self, attribute):
|
||||||
port_object = Port()
|
port_object = Port()
|
||||||
port_object.port_value = attribute.value
|
port_object.port_value = attribute.value
|
||||||
port_object.port_value.condition = "Equals"
|
port_object.port_value.condition = "Equals"
|
||||||
port_object.parent.id_ = "{}:PortObject-{}".format(self.namespace_prefix, attribute.uuid)
|
port_object.parent.id_ = "{}:PortObject-{}".format(self.namespace_prefix, attribute.uuid)
|
||||||
return port_object
|
return port_object
|
||||||
|
|
||||||
def generate_regkey_observable(self, indicator, attribute):
|
def generate_regkey_observable(self, attribute, value=None):
|
||||||
attribute_type = attribute.type
|
if attribute.type == "regkey|value":
|
||||||
indicator.add_indicator_type("Host Characteristics")
|
|
||||||
value = ""
|
|
||||||
if attribute_type == "regkey|value":
|
|
||||||
regkey, value = attribute.value.split('|')
|
regkey, value = attribute.value.split('|')
|
||||||
else:
|
else:
|
||||||
regkey = attribute.value
|
regkey = attribute.value
|
||||||
|
@ -454,15 +470,12 @@ class StixBuilder(object):
|
||||||
return reg_object
|
return reg_object
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate_simple_observable(indicator, attribute):
|
def generate_simple_observable(attribute):
|
||||||
cybox_name = misp_cybox_name[attribute.type]
|
cybox_name = misp_cybox_name[attribute.type]
|
||||||
if cybox_name == "AutonomousSystem":
|
if cybox_name == "AutonomousSystem":
|
||||||
if not attribute.value.isdigit():
|
if not attribute.value.isdigit():
|
||||||
return False
|
return False
|
||||||
constructor = getattr(this_module, cybox_name, None)
|
constructor = getattr(this_module, cybox_name, None)
|
||||||
indicator_type = misp_indicator_type[attribute.type]
|
|
||||||
if indicator_type:
|
|
||||||
indicator.add_indicator_type(indicator_type)
|
|
||||||
new_object = constructor()
|
new_object = constructor()
|
||||||
setattr(new_object, cybox_name_attribute[cybox_name], attribute.value)
|
setattr(new_object, cybox_name_attribute[cybox_name], attribute.value)
|
||||||
setattr(getattr(new_object, cybox_name_attribute[cybox_name]), "condition", "Equals")
|
setattr(getattr(new_object, cybox_name_attribute[cybox_name]), "condition", "Equals")
|
||||||
|
@ -479,24 +492,30 @@ class StixBuilder(object):
|
||||||
ta.description = description
|
ta.description = description
|
||||||
return ta
|
return ta
|
||||||
|
|
||||||
@staticmethod
|
def generate_TM(self, incident, attribute, tags):
|
||||||
def generate_TM(indicator, attribute):
|
if attribute.to_ids:
|
||||||
if attribute.type == "snort":
|
|
||||||
tm = SnortTestMechanism()
|
tm = SnortTestMechanism()
|
||||||
value = attribute.value.encode('utf-8')
|
value = attribute.value.encode('utf-8')
|
||||||
tm.rules = [value]
|
tm.rule = value
|
||||||
|
indicator = self.generate_indicator(attribute, tags)
|
||||||
|
indicator.add_indicator_type("Malware Artifacts")
|
||||||
|
indicator.add_valid_time_position(ValidTime())
|
||||||
indicator.test_mechanisms = [tm]
|
indicator.test_mechanisms = [tm]
|
||||||
|
related_indicator = RelatedIndicator(indicator, relationship=category)
|
||||||
|
incident.related_indicators.append(related_indicator)
|
||||||
|
|
||||||
def generate_ttp(self, incident, tags, attribute):
|
def generate_ttp(self, attribute, tags):
|
||||||
ttp = self.create_ttp(tags, attribute)
|
ttp = self.create_ttp(attribute, tags)
|
||||||
mmalware = MalwareInstance()
|
malware = MalwareInstance()
|
||||||
malware.add_name(attribute.value)
|
malware.add_name(attribute.value)
|
||||||
ttp.behavior = Behavior()
|
ttp.behavior = Behavior()
|
||||||
ttp.behavior.add_malware_instance(malware)
|
ttp.behavior.add_malware_instance(malware)
|
||||||
self.append_ttp(incident, attribute, ttp)
|
if attribute.comment:
|
||||||
|
ttp.description = attribute.comment
|
||||||
|
return ttp
|
||||||
|
|
||||||
def generate_vulnerability(self, incident, tags, attribute):
|
def generate_vulnerability(self, attribute, tags):
|
||||||
ttp = self.create_ttp(tags, attribute)
|
ttp = self.create_ttp(attribute, tags)
|
||||||
vulnerability = Vulnerability()
|
vulnerability = Vulnerability()
|
||||||
vulnerability.cve_id = attribute.value
|
vulnerability.cve_id = attribute.value
|
||||||
ET = ExploitTarget(timestamp=attribute.timestamp)
|
ET = ExploitTarget(timestamp=attribute.timestamp)
|
||||||
|
@ -507,12 +526,11 @@ class StixBuilder(object):
|
||||||
ET.title = "Vulnerability {}".format(attribute.value)
|
ET.title = "Vulnerability {}".format(attribute.value)
|
||||||
ET.add_vulnerability(vulnerability)
|
ET.add_vulnerability(vulnerability)
|
||||||
ttp.exploit_targets.append(ET)
|
ttp.exploit_targets.append(ET)
|
||||||
self.append_ttp(incident, attribute, ttp)
|
return ttp
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_email_observable(indicator, attribute):
|
def resolve_email_observable(attribute):
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
indicator.add_indicator_type("Malicious E-mail")
|
|
||||||
new_object = EmailMessage()
|
new_object = EmailMessage()
|
||||||
email_header = EmailHeader()
|
email_header = EmailHeader()
|
||||||
if attribute_type == 'email-src':
|
if attribute_type == 'email-src':
|
||||||
|
@ -530,13 +548,12 @@ class StixBuilder(object):
|
||||||
new_object.header = email_header
|
new_object.header = email_header
|
||||||
return new_object
|
return new_object
|
||||||
|
|
||||||
def resolve_file_observable(self, indicator, attribute):
|
def resolve_file_observable(self, attribute):
|
||||||
fuzzy = False
|
fuzzy = False
|
||||||
f, h = [""] * 2
|
f, h = [""] * 2
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
if attribute_type in hash_type_attributes['composite']:
|
if attribute_type in hash_type_attributes['composite']:
|
||||||
f, h = attribute.value.split('|')
|
f, h = attribute.value.split('|')
|
||||||
indicator.add_indicator_type("File Hash Watchlist")
|
|
||||||
composite = attribute_type.split('|')
|
composite = attribute_type.split('|')
|
||||||
if len(composite) > 1 and composite[1] == "ssdeep":
|
if len(composite) > 1 and composite[1] == "ssdeep":
|
||||||
fuzzy = True
|
fuzzy = True
|
||||||
|
@ -545,13 +562,12 @@ class StixBuilder(object):
|
||||||
f = attribute.value
|
f = attribute.value
|
||||||
else:
|
else:
|
||||||
h = attribute.value
|
h = attribute.value
|
||||||
indicator.add_indicator_type("File Hash Watchlist")
|
if attribute_type == "ssdeep":
|
||||||
if attribute_type == "ssdeep":
|
|
||||||
fuzzy = True
|
fuzzy = True
|
||||||
return self.generate_file_observable(f, h, fuzzy)
|
return self.generate_file_observable(f, h, fuzzy)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_http_observable(indicator, attribute):
|
def resolve_http_observable(attribute):
|
||||||
request_response = HTTPRequestResponse()
|
request_response = HTTPRequestResponse()
|
||||||
client_request = HTTPClientRequest()
|
client_request = HTTPClientRequest()
|
||||||
if attribute.type == 'user-agent':
|
if attribute.type == 'user-agent':
|
||||||
|
@ -572,7 +588,7 @@ class StixBuilder(object):
|
||||||
return new_object
|
return new_object
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_identity_attribute(incident, attribute):
|
def resolve_identity_attribute(attribute):
|
||||||
attribute_type = attribute.type
|
attribute_type = attribute.type
|
||||||
ciq_identity = CIQIdentity3_0Instance()
|
ciq_identity = CIQIdentity3_0Instance()
|
||||||
identity_spec = STIXCIQIdentity3_0()
|
identity_spec = STIXCIQIdentity3_0()
|
||||||
|
@ -591,10 +607,10 @@ class StixBuilder(object):
|
||||||
ciq_identity.id_ = "{}:Identity-{}".format(namespace[1], attribute.uuid)
|
ciq_identity.id_ = "{}:Identity-{}".format(namespace[1], attribute.uuid)
|
||||||
# is this a good idea?
|
# is this a good idea?
|
||||||
ciq_identity.name = "{}: {} (MISP Attribute #{})".format(attribute_type, attribute.value, attribute.id)
|
ciq_identity.name = "{}: {} (MISP Attribute #{})".format(attribute_type, attribute.value, attribute.id)
|
||||||
incident.add_victim(ciq_identity)
|
return ciq_identity
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_pattern_observable(indicator, attribute):
|
def resolve_pattern_observable(attribute):
|
||||||
if attribute.type == "pattern-in-file":
|
if attribute.type == "pattern-in-file":
|
||||||
byte_run = ByteRun()
|
byte_run = ByteRun()
|
||||||
byte_run.byte_run_data = attribute.value
|
byte_run.byte_run_data = attribute.value
|
||||||
|
@ -604,7 +620,7 @@ class StixBuilder(object):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_system_observable(incident, attribute):
|
def resolve_system_observable(attribute):
|
||||||
system_object = System()
|
system_object = System()
|
||||||
network_interface = NetworkInterface()
|
network_interface = NetworkInterface()
|
||||||
network_interface.mac = attribute.value
|
network_interface.mac = attribute.value
|
||||||
|
@ -631,17 +647,12 @@ class StixBuilder(object):
|
||||||
observable.description = attribute.comment
|
observable.description = attribute.comment
|
||||||
return observable
|
return observable
|
||||||
|
|
||||||
def set_rep(self, target):
|
def set_rep(self):
|
||||||
identity = Identity(name=self.orgc_name)
|
identity = Identity(name=self.orgc_name)
|
||||||
information_source = InformationSource(identity=identity)
|
information_source = InformationSource(identity=identity)
|
||||||
target.reporter = information_source
|
return information_source
|
||||||
|
|
||||||
def set_tag(self, target, tags):
|
def set_tlp(self, distribution, tags):
|
||||||
for tag in tags:
|
|
||||||
tag_name = "MISP Tag: {}".format(tag['name'])
|
|
||||||
self.add_journal_entry(target, tag_name)
|
|
||||||
|
|
||||||
def set_tlp(self, target, distribution, tags):
|
|
||||||
marking_specification = MarkingSpecification()
|
marking_specification = MarkingSpecification()
|
||||||
marking_specification.controlled_structure = "../../../descendant-or-self::node()"
|
marking_specification.controlled_structure = "../../../descendant-or-self::node()"
|
||||||
tlp = TLPMarkingStructure()
|
tlp = TLPMarkingStructure()
|
||||||
|
@ -653,15 +664,14 @@ class StixBuilder(object):
|
||||||
if event_colors:
|
if event_colors:
|
||||||
color = self.set_color(event_colors)
|
color = self.set_color(event_colors)
|
||||||
else:
|
else:
|
||||||
color = TLP_mapping.get(str(distribution), None)
|
color = TLP_mapping.get(str(self.misp_event.distribution), None)
|
||||||
if color is None:
|
if color is None:
|
||||||
return target
|
return
|
||||||
tlp.color = color
|
tlp.color = color
|
||||||
marking_specification.marking_structures.append(tlp)
|
marking_specification.marking_structures.append(tlp)
|
||||||
handling = Marking()
|
handling = Marking()
|
||||||
handling.add_marking(marking_specification)
|
handling.add_marking(marking_specification)
|
||||||
target.handling = handling
|
return handling
|
||||||
return target
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def add_journal_entry(incident, entry_line):
|
def add_journal_entry(incident, entry_line):
|
||||||
|
@ -680,18 +690,19 @@ class StixBuilder(object):
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
target.information_source.references = [reference]
|
target.information_source.references = [reference]
|
||||||
|
|
||||||
def append_ttp(self, incident, attribute, ttp):
|
def append_ttp(self, category, ttp):
|
||||||
if attribute.comment:
|
|
||||||
ttp.description = attribute.comment
|
|
||||||
self.ttps.append(ttp)
|
self.ttps.append(ttp)
|
||||||
rttp = TTP(idref=ttp.id_, timestamp=ttp.timestamp)
|
rttp = TTP(idref=ttp.id_, timestamp=ttp.timestamp)
|
||||||
related_ttp = RelatedTTP(rttp, relationship=attribute.category)
|
related_ttp = RelatedTTP(rttp, relationship=category)
|
||||||
incident.leveraged_ttps.append(related_ttp)
|
return related_ttp
|
||||||
|
|
||||||
def create_ttp(self, tags, attribute):
|
def create_ttp(self, attribute, tags):
|
||||||
ttp = TTP(timestamp=attribute.timestamp)
|
ttp = TTP(timestamp=attribute.timestamp)
|
||||||
ttp.id_ = "{}:ttp-{}".format(namespace[1], attribute.uuid)
|
ttp.id_ = "{}:ttp-{}".format(namespace[1], attribute.uuid)
|
||||||
self.set_tlp(ttp, attribute.distribution, self.merge_tags(tags, attribute))
|
try:
|
||||||
|
ttp.handling = self.set_tlp(attribute.distribution, self.merge_tags(tags, attribute))
|
||||||
|
except:
|
||||||
|
pass
|
||||||
ttp.title = "{}: {} (MISP Attribute #{})".format(attribute.category, attribute.value, attribute.id)
|
ttp.title = "{}: {} (MISP Attribute #{})".format(attribute.category, attribute.value, attribute.id)
|
||||||
return ttp
|
return ttp
|
||||||
|
|
||||||
|
@ -719,6 +730,17 @@ class StixBuilder(object):
|
||||||
result['attributes'] = attribute.Tag
|
result['attributes'] = attribute.Tag
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def resolve_filename(file_object, filename):
|
||||||
|
if '/' in filename or '\\' in filename:
|
||||||
|
file_object.file_path = ntpath.dirname(filename)
|
||||||
|
file_object.file_path.condition = "Equals"
|
||||||
|
file_object.file_name = ntpath.basename(filename)
|
||||||
|
file_object.file_name.condition = "Equals"
|
||||||
|
else:
|
||||||
|
file_object.file_name = filename
|
||||||
|
file_object.file_name.condition = "Equals"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def resolve_ip_type(attribute_type, attribute_value):
|
def resolve_ip_type(attribute_type, attribute_value):
|
||||||
address_object = Address()
|
address_object = Address()
|
||||||
|
@ -766,16 +788,15 @@ class StixBuilder(object):
|
||||||
return color_value
|
return color_value
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def set_prod(target, org):
|
def set_prod(org):
|
||||||
identity = Identity(name=org)
|
identity = Identity(name=org)
|
||||||
information_source = InformationSource(identity=identity)
|
information_source = InformationSource(identity=identity)
|
||||||
target.producer = information_source
|
return information_source
|
||||||
|
|
||||||
@staticmethod
|
def set_src(self):
|
||||||
def set_src(target, org):
|
identity = Identity(name=self.misp_event.Org.get('name'))
|
||||||
identity = Identity(name=org)
|
|
||||||
information_source = InformationSource(identity=identity)
|
information_source = InformationSource(identity=identity)
|
||||||
target.information_source = information_source
|
return information_source
|
||||||
|
|
||||||
def main(args):
|
def main(args):
|
||||||
stix_builder = StixBuilder(args)
|
stix_builder = StixBuilder(args)
|
||||||
|
|
Loading…
Reference in New Issue