fix: [stix2 export] Exporting only TLP tags as MarkingDefinition

- The other tags are (as before a recent change)
  exported as labels
pull/4037/head
chrisr3d 2019-01-19 23:07:38 +01:00
parent 157f20c757
commit ced9145824
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
1 changed files with 32 additions and 34 deletions

View File

@ -68,21 +68,16 @@ class StixBuilder():
external_refs = [self.__parse_link(link) for link in self.links] external_refs = [self.__parse_link(link) for link in self.links]
report_args = {'type': 'report', 'id': self.report_id, 'name': self.misp_event['info'], report_args = {'type': 'report', 'id': self.report_id, 'name': self.misp_event['info'],
'created_by_ref': self.identity_id, 'created': self.misp_event['date'], 'created_by_ref': self.identity_id, 'created': self.misp_event['date'],
'published': self.get_datetime_from_timestamp(self.misp_event['publish_timestamp']), 'published': self.get_datetime_from_timestamp(self.misp_event['publish_timestamp'])}
'labels': []} labels = _MISP_event_tags
if self.misp_event.get('Tag'): if self.misp_event.get('Tag'):
markings = [] markings = []
for tag in self.misp_event['Tag']: for tag in self.misp_event['Tag']:
tag_name = tag['name'] name = tag['name']
if tag_name in _MISP_event_tags: markings.append(name) if name.startswith('tlp:') else labels.append(name)
report_args['labels'].append(tag_name)
else:
markings.append(self.markings[tag_name]['id'] if tag_name in self.markings else self.create_marking(tag_name))
if markings: if markings:
report_args['object_marking_refs'] = markings report_args['object_marking_refs'] = self.handle_tags(markings)
for label in _MISP_event_tags: report_args['labels'] = labels
if label not in report_args['labels']:
report_args['labels'].append(label)
if external_refs: if external_refs:
report_args['external_references'] = external_refs report_args['external_references'] = external_refs
self.add_all_markings() self.add_all_markings()
@ -417,15 +412,14 @@ class StixBuilder():
def add_custom(self, attribute): def add_custom(self, attribute):
custom_object_id = "x-misp-object--{}".format(attribute['uuid']) custom_object_id = "x-misp-object--{}".format(attribute['uuid'])
custom_object_type = "x-misp-object-{}".format(attribute['type'].replace('|', '-').lower()) custom_object_type = "x-misp-object-{}".format(attribute['type'].replace('|', '-').lower())
labels = self.create_labels(attribute) labels, markings = self.create_labels(attribute)
custom_object_args = {'id': custom_object_id, 'x_misp_category': attribute['category'], 'labels': labels, custom_object_args = {'id': custom_object_id, 'x_misp_category': attribute['category'], 'labels': labels,
'x_misp_timestamp': self.get_datetime_from_timestamp(attribute['timestamp']), 'x_misp_timestamp': self.get_datetime_from_timestamp(attribute['timestamp']),
'x_misp_value': attribute['value'], 'created_by_ref': self.identity_id} 'x_misp_value': attribute['value'], 'created_by_ref': self.identity_id}
if attribute.get('comment'): if attribute.get('comment'):
custom_object_args['x_misp_comment'] = attribute['comment'] custom_object_args['x_misp_comment'] = attribute['comment']
markings = [] if markings:
if attribute.get('Tag'): markings = self.handle_tags(markings)
markings = self.handle_tags(attribute['Tag'])
custom_object_args['object_marking_refs'] = markings custom_object_args['object_marking_refs'] = markings
@CustomObject(custom_object_type, [('id', properties.StringProperty(required=True)), @CustomObject(custom_object_type, [('id', properties.StringProperty(required=True)),
('x_misp_timestamp', properties.StringProperty(required=True)), ('x_misp_timestamp', properties.StringProperty(required=True)),
@ -445,13 +439,13 @@ class StixBuilder():
def add_identity(self, attribute): def add_identity(self, attribute):
identity_id = "identity--{}".format(attribute['uuid']) identity_id = "identity--{}".format(attribute['uuid'])
name = attribute['value'] name = attribute['value']
labels = self.create_labels(attribute) labels, markings = self.create_labels(attribute)
identity_args = {'id': identity_id, 'type': identity, 'name': name, 'labels': labels, identity_args = {'id': identity_id, 'type': identity, 'name': name, 'labels': labels,
'identity_class': 'individual', 'created_by_ref': self.identity_id} 'identity_class': 'individual', 'created_by_ref': self.identity_id}
if attribute.get('comment'): if attribute.get('comment'):
identity_args['description'] = attribute['comment'] identity_args['description'] = attribute['comment']
if attribute.get('Tag'): if markings:
identity_args['object_marking_refs'] = self.handle_tags(attribute['Tag']) identity_args['object_marking_refs'] = self.handle_tags(markings)
identity = Identity(**identity_args, interoperability=True) identity = Identity(**identity_args, interoperability=True)
self.append_object(identity, identity_id) self.append_object(identity, identity_id)
@ -461,7 +455,7 @@ class StixBuilder():
self.parse_galaxies(attribute['Galaxy'], indicator_id) self.parse_galaxies(attribute['Galaxy'], indicator_id)
category = attribute['category'] category = attribute['category']
killchain = self.create_killchain(category) killchain = self.create_killchain(category)
labels = self.create_labels(attribute) labels, markings = self.create_labels(attribute)
attribute_value = attribute['value'] if attribute_type != "AS" else self.define_attribute_value(attribute['value'], attribute['comment']) attribute_value = attribute['value'] if attribute_type != "AS" else self.define_attribute_value(attribute['value'], attribute['comment'])
pattern = mispTypesMapping[attribute_type]['pattern'](attribute_type, attribute_value, attribute['data']) if attribute.get('data') else self.define_pattern(attribute_type, attribute_value) pattern = mispTypesMapping[attribute_type]['pattern'](attribute_type, attribute_value, attribute['data']) if attribute.get('data') else self.define_pattern(attribute_type, attribute_value)
indicator_args = {'id': indicator_id, 'type': 'indicator', 'labels': labels, 'kill_chain_phases': killchain, indicator_args = {'id': indicator_id, 'type': 'indicator', 'labels': labels, 'kill_chain_phases': killchain,
@ -473,8 +467,8 @@ class StixBuilder():
break break
if attribute.get('comment'): if attribute.get('comment'):
indicator_args['description'] = attribute['comment'] indicator_args['description'] = attribute['comment']
if attribute.get('Tag'): if markings:
indicator_args['object_marking_refs'] = self.handle_tags(attribute['Tag']) indicator_args['object_marking_refs'] = self.handle_tags(markings)
indicator = Indicator(**indicator_args, interoperability=True) indicator = Indicator(**indicator_args, interoperability=True)
self.append_object(indicator, indicator_id) self.append_object(indicator, indicator_id)
@ -495,14 +489,14 @@ class StixBuilder():
observed_data_id = "observed-data--{}".format(attribute['uuid']) observed_data_id = "observed-data--{}".format(attribute['uuid'])
self.parse_galaxies(attribute['Galaxy'], observed_data_id) self.parse_galaxies(attribute['Galaxy'], observed_data_id)
timestamp = self.get_datetime_from_timestamp(attribute['timestamp']) timestamp = self.get_datetime_from_timestamp(attribute['timestamp'])
labels = self.create_labels(attribute) labels, markings = self.create_labels(attribute)
attribute_value = attribute['value'] if attribute_type != "AS" else self.define_attribute_value(attribute['value'], attribute['comment']) attribute_value = attribute['value'] if attribute_type != "AS" else self.define_attribute_value(attribute['value'], attribute['comment'])
observable = mispTypesMapping[attribute_type]['observable'](attribute_type, attribute_value, attribute['data']) if attribute.get('data') else self.define_observable(attribute_type, attribute_value) observable = mispTypesMapping[attribute_type]['observable'](attribute_type, attribute_value, attribute['data']) if attribute.get('data') else self.define_observable(attribute_type, attribute_value)
observed_data_args = {'id': observed_data_id, 'type': 'observed-data', 'number_observed': 1, observed_data_args = {'id': observed_data_id, 'type': 'observed-data', 'number_observed': 1,
'first_observed': timestamp, 'last_observed': timestamp, 'labels': labels, 'first_observed': timestamp, 'last_observed': timestamp, 'labels': labels,
'created_by_ref': self.identity_id, 'objects': observable} 'created_by_ref': self.identity_id, 'objects': observable}
if attribute.get('Tag'): if markings:
observed_data_args['object_marking_refs'] = self.handle_tags(attribute['Tag']) observed_data_args['object_marking_refs'] = self.handle_tags(markings)
observed_data = ObservedData(**observed_data_args, interoperability=True) observed_data = ObservedData(**observed_data_args, interoperability=True)
self.append_object(observed_data, observed_data_id) self.append_object(observed_data, observed_data_id)
@ -522,12 +516,12 @@ class StixBuilder():
vulnerability_id = "vulnerability--{}".format(attribute['uuid']) vulnerability_id = "vulnerability--{}".format(attribute['uuid'])
name = attribute['value'] name = attribute['value']
vulnerability_data = [mispTypesMapping['vulnerability']['vulnerability_args'](name)] vulnerability_data = [mispTypesMapping['vulnerability']['vulnerability_args'](name)]
labels = self.create_labels(attribute) labels, markings = self.create_labels(attribute)
vulnerability_args = {'id': vulnerability_id, 'type': 'vulnerability', vulnerability_args = {'id': vulnerability_id, 'type': 'vulnerability',
'name': name, 'external_references': vulnerability_data, 'name': name, 'external_references': vulnerability_data,
'created_by_ref': self.identity_id, 'labels': labels} 'created_by_ref': self.identity_id, 'labels': labels}
if attribute.get('Tag'): if markings:
vulnerability_args['object_marking_refs'] = self.handle_tags(attribute['Tag']) vulnerability_args['object_marking_refs'] = self.handle_tags(markings)
vulnerability = Vulnerability(**vulnerability_args, interoperability=True) vulnerability = Vulnerability(**vulnerability_args, interoperability=True)
self.append_object(vulnerability, vulnerability_id) self.append_object(vulnerability, vulnerability_id)
@ -654,9 +648,15 @@ class StixBuilder():
@staticmethod @staticmethod
def create_labels(attribute): def create_labels(attribute):
return ['misp:type="{}"'.format(attribute['type']), labels = ['misp:type="{}"'.format(attribute['type']),
'misp:category="{}"'.format(attribute['category']), 'misp:category="{}"'.format(attribute['category']),
'misp:to_ids="{}"'.format(attribute['to_ids'])] 'misp:to_ids="{}"'.format(attribute['to_ids'])]
markings = []
if attribute.get('Tag'):
for tag in attribute['Tag']:
name = tag['name']
markings.append(name) if name.startswith('tlp:') else labels.append(name)
return labels, markings
@staticmethod @staticmethod
def create_object_labels(name, category, to_ids): def create_object_labels(name, category, to_ids):
@ -667,11 +667,9 @@ class StixBuilder():
def create_marking(self, tag): def create_marking(self, tag):
id = 'marking-definition--%s' % uuid.uuid4() id = 'marking-definition--%s' % uuid.uuid4()
tag_list = tag.split(':') definition_type, definition = tag.split(':')
namespace, predicate = tag_list if len(tag_list) == 2 else (tag_list[0], ":".join(tag_list[1:]))
definition_type, marking = (namespace, predicate) if namespace == 'tlp' else ('statement', self._parse_tag(namespace, predicate))
self.markings[tag] = {'type': 'marking-definition', 'id': id, 'definition_type': definition_type, self.markings[tag] = {'type': 'marking-definition', 'id': id, 'definition_type': definition_type,
'definition': {definition_type: marking}} 'definition': {definition_type: definition}}
return id return id
@staticmethod @staticmethod