mirror of https://github.com/MISP/MISP
add: [stix2 import] Importing external User Account objects
- Reusing parsing functionspull/4861/head
parent
fb05bc7ab0
commit
d4e4423622
|
@ -300,6 +300,38 @@ class StixParser():
|
|||
'value': value.dst_port, 'to_ids': False})
|
||||
return attributes
|
||||
|
||||
def attributes_from_user_account_observable(self, observable):
|
||||
observable = observable['0']
|
||||
attributes = self.fill_user_account_observable_attributes(observable)
|
||||
if 'extensions' in observable and 'unix-account-ext' in observable['extensions']:
|
||||
extension = observable['extensions']['unix-account-ext']
|
||||
if 'groups' in extension:
|
||||
for group in extension['groups']:
|
||||
attributes.append({'type': 'text', 'object_relation': 'group',
|
||||
'to_ids': False, 'disable_correlation': True,
|
||||
'value': group})
|
||||
attributes.extend(self.fill_user_account_observable_attributes(extension))
|
||||
return attributes
|
||||
|
||||
def attributes_from_user_account_pattern(self, pattern):
|
||||
attributes = []
|
||||
for p in pattern:
|
||||
p_type, p_value = p.split(' = ')
|
||||
p_value = p_value[1:-1]
|
||||
if "extensions.'unix-account-ext'" in p_type:
|
||||
relation = p_type.split('.')[-1]
|
||||
if 'groups' in relation:
|
||||
attributes.append({'type': 'text', 'object_relation': 'group',
|
||||
'disable_correlation': True, 'value': p_value})
|
||||
continue
|
||||
else:
|
||||
relation = p_type.split(':')[1]
|
||||
if relation in user_account_mapping:
|
||||
attribute = {'value': p_value}
|
||||
attribute.update(user_account_mapping[relation])
|
||||
attributes.append(attribute)
|
||||
return attributes
|
||||
|
||||
def attributes_from_x509_observable(self, objects):
|
||||
_object = objects['0']
|
||||
attributes = self.fill_observable_attributes(_object.hashes, x509_mapping) if (hasattr(_object, 'hashes') and _object.hashes) else []
|
||||
|
@ -488,7 +520,8 @@ class StixFromMISPParser(StixParser):
|
|||
'process': {'observable': self.attributes_from_process_observable, 'pattern': self.pattern_process},
|
||||
'registry-key': {'observable': self.attributes_from_regkey_observable, 'pattern': self.pattern_regkey},
|
||||
'url': {'observable': self.attributes_from_url_observable, 'pattern': self.pattern_url},
|
||||
'user-account': {'observable': self.observable_user_account, 'pattern': self.pattern_user_account},
|
||||
'user-account': {'observable': self.attributes_from_user_account_observable,
|
||||
'pattern': self.attributes_from_user_account_pattern},
|
||||
'WindowsPEBinaryFile': {'observable': self.observable_pe, 'pattern': self.pattern_pe},
|
||||
'x509': {'observable': self.attributes_from_x509_observable, 'pattern': self.pattern_x509}}
|
||||
self.object_from_refs = {'course-of-action': self.parse_MISP_course_of_action, 'vulnerability': self.parse_vulnerability,
|
||||
|
@ -893,38 +926,6 @@ class StixFromMISPParser(StixParser):
|
|||
def pattern_url(self, pattern):
|
||||
return self.fill_pattern_attributes(pattern, url_mapping)
|
||||
|
||||
def observable_user_account(self, observable):
|
||||
observable = observable['0']
|
||||
attributes = self.fill_user_account_observable_attributes(observable)
|
||||
if 'extensions' in observable and 'unix-account-ext' in observable['extensions']:
|
||||
extension = observable['extensions']['unix-account-ext']
|
||||
if 'groups' in extension:
|
||||
for group in extension['groups']:
|
||||
attributes.append({'type': 'text', 'object_relation': 'group',
|
||||
'to_ids': False, 'disable_correlation': True,
|
||||
'value': group})
|
||||
attributes.extend(self.fill_user_account_observable_attributes(extension))
|
||||
return attributes
|
||||
|
||||
def pattern_user_account(self, pattern):
|
||||
attributes = []
|
||||
for p in pattern:
|
||||
p_type, p_value = p.split(' = ')
|
||||
p_value = p_value[1:-1]
|
||||
if "extensions.'unix-account-ext'" in p_type:
|
||||
relation = p_type.split('.')[-1]
|
||||
if 'groups' in relation:
|
||||
attributes.append({'type': 'text', 'object_relation': 'group',
|
||||
'disable_correlation': True, 'value': p_value})
|
||||
continue
|
||||
else:
|
||||
relation = p_type.split(':')[1]
|
||||
if relation in user_account_mapping:
|
||||
attribute = {'value': p_value}
|
||||
attribute.update(user_account_mapping[relation])
|
||||
attributes.append(attribute)
|
||||
return attributes
|
||||
|
||||
def pattern_x509(self, pattern):
|
||||
return self.fill_pattern_attributes(pattern, x509_mapping)
|
||||
|
||||
|
@ -1053,6 +1054,7 @@ class ExternalStixParser(StixParser):
|
|||
('process',): self.parse_process_observable,
|
||||
('x509-certificate',): self.parse_x509_observable,
|
||||
('url',): self.parse_url_observable,
|
||||
('user-account',): self.parse_user_account_observable,
|
||||
('windows-registry-key',): self.parse_regkey_observable}
|
||||
self.pattern_mapping = {('domain-name',): self.parse_domain_ip_port_pattern,
|
||||
('domain-name', 'ipv4-addr', 'url'): self.parse_domain_ip_port_pattern,
|
||||
|
@ -1063,6 +1065,7 @@ class ExternalStixParser(StixParser):
|
|||
('network-traffic',): self.parse_network_traffic_pattern,
|
||||
('process',): self.parse_process_pattern,
|
||||
('url',): self.parse_url_pattern,
|
||||
('user-account',): self.parse_user_account_pattern,
|
||||
('windows-registry-key',): self.parse_regkey_pattern,
|
||||
('x509-certificate',): self.parse_x509_pattern}
|
||||
self.pattern_forbidden_relations = (' LIKE ', ' FOLLOWEDBY ', ' MATCHES ', ' ISSUBSET ', ' ISSUPERSET ', ' REPEATS ')
|
||||
|
@ -1337,6 +1340,16 @@ class ExternalStixParser(StixParser):
|
|||
attributes = self.attributes_from_url_observable(objects)
|
||||
self.handle_import_case(attributes, 'url', marking, uuid)
|
||||
|
||||
def parse_user_account_observable(self, observable, marking, uuid):
|
||||
attributes = self.attributes_from_user_account_observable(observable)
|
||||
name = self.__define_user_account_name(attributes)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_user_account_pattern(self, pattern, marking, uuid):
|
||||
attributes = self.attributes_from_user_account_pattern(pattern)
|
||||
name = self.__define_user_account_name(attributes)
|
||||
self.handle_import_case(attributes, name, marking, uuid)
|
||||
|
||||
def parse_x509_observable(self, objects, marking, uuid):
|
||||
attributes = self.attributes_from_x509_observable(objects)
|
||||
self.handle_import_case(attributes, 'x509', marking, uuid)
|
||||
|
@ -1359,6 +1372,14 @@ class ExternalStixParser(StixParser):
|
|||
misp_object.add_attribute(**attribute)
|
||||
return misp_object
|
||||
|
||||
@staticmethod
|
||||
def __define_user_account_name(attributes):
|
||||
if len(attributes) == 2:
|
||||
relations = (attribute['type'] for attribute in attributes)
|
||||
if 'user_id' in relations and 'credential' in relations:
|
||||
return 'credential'
|
||||
return 'user-account'
|
||||
|
||||
@staticmethod
|
||||
def fill_observable_attributes(stix_object, object_mapping):
|
||||
attributes = []
|
||||
|
|
Loading…
Reference in New Issue