chg: [stix2 import] Observable single attributes parsing functions are now in the main script

- Also update of the mapping dictionary with the
  latest updated functions moved from the mapping
  script to the main script
pull/6022/head
chrisr3d 2020-05-20 01:16:50 +02:00
parent 29245f7338
commit d53d3bfb52
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
2 changed files with 84 additions and 5 deletions

View File

@ -436,7 +436,7 @@ class StixFromMISPParser(StixParser):
attribute = self.create_attribute_dict(observable)
attribute['to_ids'] = False
objects = observable.objects
value = stix2misp_mapping.misp_types_mapping[attribute['type']](objects, attribute['type'])
value = self.parse_single_attribute_observable(objects, attribute['type'])
if isinstance(value, tuple):
value, data = value
attribute['data'] = io.BytesIO(data.encode())
@ -480,6 +480,12 @@ class StixFromMISPParser(StixParser):
## OBSERVABLE PARSING FUNCTIONS ##
################################################################################
@staticmethod
def _define_hash_type(hash_type):
if 'sha' in hash_type:
return f'SHA-{hash_type.split("sha")[1]}'
return hash_type.upper() if hash_type == 'md5' else hash_type
@staticmethod
def _fetch_file_observable(observable_objects):
for key, observable in observable_objects.items():
@ -546,9 +552,17 @@ class StixFromMISPParser(StixParser):
attributes.append(attribute)
return attributes
def _parse_attachment(self, observable):
if len(observable) > 1:
return self._parse_name(observable, index='1'), self._parse_payload(observable)
return self._parse_name(observable)
def parse_credential_observable(self, observable):
return self.fill_observable_attributes(observable['0'], 'credential_mapping')
def _parse_domain_ip_attribute(self, observable):
return f'{self._parse_value(observable)}|{self._parse_value(observable, index="1")}'
@staticmethod
def parse_domain_ip_observable(observable):
attributes = []
@ -558,6 +572,10 @@ class StixFromMISPParser(StixParser):
attributes.append(attribute)
return attributes
@staticmethod
def _parse_email_message(observable, attribute_type):
return observable['0'].get(attribute_type.split('-')[1])
def parse_email_observable(self, observable):
email, references = self.filter_main_object(observable, 'EmailMessage')
attributes = self.fill_observable_attributes(email, 'email_mapping')
@ -615,6 +633,16 @@ class StixFromMISPParser(StixParser):
})
return attributes
def _parse_filename_hash(self, observable, attribute_type, index='0'):
hash_type = attribute_type.split('|')[1]
filename = self._parse_name(observable, index=index)
hash_value = self._parse_hash(observable, hash_type, index=index)
return f'{filename}|{hash_value}'
def _parse_hash(self, observable, attribute_type, index='0'):
hash_type = self._define_hash_type(attribute_type)
return observable[index]['hashes'].get(hash_type)
def parse_ip_port_observable(self, observable):
network_traffic, references = self.filter_main_object(observable, 'NetworkTraffic')
references = {key: {'object': value, 'used': False} for key, value in references.items()}
@ -635,6 +663,20 @@ class StixFromMISPParser(StixParser):
attributes.append(attribute)
return attributes
def _parse_malware_sample(self, observable):
if len(observable) > 1:
value = self._parse_filename_hash(observable, 'filename|md5', '1')
return value, self._parse_payload(observable)
return self._parse_filename_hash(observable, 'filename|md5')
@staticmethod
def _parse_name(observable, index='0'):
return observable[index].get('name')
def _parse_network_attribute(self, observable):
port = self._parse_port(observable, index='1')
return f'{self._parse_value(observable)}|{port}'
def parse_network_connection_observable(self, observable):
network_traffic, references = self.filter_main_object(observable, 'NetworkTraffic')
references = {key: {'object': value, 'used': False} for key, value in references.items()}
@ -693,6 +735,14 @@ class StixFromMISPParser(StixParser):
attributes.append(attribute)
return attributes
@staticmethod
def _parse_number(observable):
return observable['0'].get('number')
@staticmethod
def _parse_payload(observable):
return observable['0'].get('payload_bin')
def parse_pe_observable(self, observable):
pe_object = MISPObject('pe', misp_objects_path_custom=self._misp_objects_path)
key = self._fetch_file_observable(observable)
@ -708,6 +758,11 @@ class StixFromMISPParser(StixParser):
self.misp_event.add_object(**pe_object)
return self.parse_file_observable(observable), pe_object.uuid
@staticmethod
def _parse_port(observable, index='0'):
port_observable = observable[index]
return port_observable['src_port'] if 'src_port' in port_observable else port_observable['dst_port']
def parse_process_observable(self, observable):
references = {}
for key, value in observable.items():
@ -734,6 +789,10 @@ class StixFromMISPParser(StixParser):
attribute.update({'object_relation': f'{feature}-pid', 'value': reference.pid, 'to_ids': False})
return attribute
@staticmethod
def _parse_regkey_attribute(observable):
return observable['0'].get('key')
def parse_regkey_observable(self, observable):
attributes = []
for key, value in observable['0'].items():
@ -745,6 +804,15 @@ class StixFromMISPParser(StixParser):
attributes.extend(self.fill_observable_attributes(observable['0']['values'][0], 'regkey_mapping'))
return attributes
def _parse_regkey_value(self, observable):
regkey = self._parse_regkey_attribute(observable)
return f'{regkey}|{observable["0"]["values"][0].get("name")}'
def parse_single_attribute_observable(self, observable, attribute_type):
if attribute_type in stix2misp_mapping.attributes_type_mapping:
return getattr(self, stix2misp_mapping.attributes_type_mapping[attribute_type])(observable, attribute_type)
return getattr(self, stix2misp_mapping.attributes_mapping[attribute_type])(observable)
def _parse_socket_extension(self, extension):
attributes = []
extension = {key: value for key, value in extension.items()}
@ -787,6 +855,14 @@ class StixFromMISPParser(StixParser):
attributes.extend(self.fill_observable_attributes(extension, 'user_account_mapping'))
return attributes
@staticmethod
def _parse_value(observable, index='0'):
return observable[index].get('value')
def _parse_x509_attribute(self, observable, attribute_type):
hash_type = attribute_type.split('-')[-1]
return self._parse_hash(observable, hash_type)
def parse_x509_observable(self, observable):
attributes = self.fill_observable_attributes(observable['0'], 'x509_mapping')
if hasattr(observable['0'], 'hashes') and observable['0'].hashes:

View File

@ -15,9 +15,9 @@ attributes_mapping = {
'mutex': '_parse_name',
'uri': '_parse_value',
'port': '_parse_port',
'ip-dst|port': '_parse_ip_port',
'ip-src|port': '_parse_ip_port',
'hostname|port': '_parse_hostname_port',
'ip-dst|port': '_parse_network_attribute',
'ip-src|port': '_parse_network_attribute',
'hostname|port': '_parse_network_attribute',
'email-reply-to': '_parse_value',
'attachment': '_parse_attachment',
'mac-address': '_parse_value',
@ -55,7 +55,10 @@ attributes_type_mapping = {
'filename|sha512': '_parse_filename_hash',
'filename|sha512/224': '_parse_filename_hash',
'filename|sha512/256': '_parse_filename_hash',
'filename|tlsh': '_parse_filename_hash'
'filename|tlsh': '_parse_filename_hash',
'x509-fingerprint-md5': '_parse_x509_attribute',
'x509-fingerprint-sha1': '_parse_x509_attribute',
'x509-fingerprint-sha256': '_parse_x509_attribute'
}
address_family_attribute_mapping = {'type': 'text','object_relation': 'address-family'}