mirror of https://github.com/MISP/MISP
Merge branch '2.4' of github.com:MISP/MISP into 2.4
commit
2a64c09035
|
@ -3528,7 +3528,7 @@ class Server extends AppModel
|
|||
public function stixDiagnostics(&$diagnostic_errors, &$stixVersion, &$cyboxVersion, &$mixboxVersion, &$maecVersion, &$pymispVersion)
|
||||
{
|
||||
$result = array();
|
||||
$expected = array('stix' => '1.2.0.6', 'cybox' => '2.1.0.17', 'mixbox' => '1.0.3', 'maec' => '4.1.0.14', 'pymisp' => '>2.4.93');
|
||||
$expected = array('stix' => '1.2.0.6', 'cybox' => '2.1.0.18.dev0', 'mixbox' => '1.0.3', 'maec' => '4.1.0.14', 'pymisp' => '>2.4.93');
|
||||
// check if the STIX and Cybox libraries are working using the test script stixtest.py
|
||||
$scriptResult = shell_exec('python3 ' . APP . 'files' . DS . 'scripts' . DS . 'stixtest.py');
|
||||
$scriptResult = json_decode($scriptResult, true);
|
||||
|
|
|
@ -363,16 +363,14 @@ class StixBuilder():
|
|||
|
||||
@staticmethod
|
||||
def generate_galaxy_args(galaxy, b_killchain, b_alias, sdo_type):
|
||||
galaxy_type = galaxy.get('type')
|
||||
name = galaxy.get('name')
|
||||
cluster = galaxy['GalaxyCluster'][0]
|
||||
sdo_id = "{}--{}".format(sdo_type, cluster.get('uuid'))
|
||||
description = "{} | {}".format(galaxy.get('description'), cluster.get('description'))
|
||||
labels = ['misp:type=\"{}\"'.format(galaxy_type)]
|
||||
sdo_args = {'id': sdo_id, 'type': sdo_type, 'name': name, 'description': description}
|
||||
description = "{} | {}".format(galaxy['description'], cluster['description'])
|
||||
labels = ['misp:name=\"{}\"'.format(galaxy['name'])]
|
||||
sdo_args = {'id': sdo_id, 'type': sdo_type, 'name': cluster['value'], 'description': description}
|
||||
if b_killchain:
|
||||
killchain = [{'kill_chain_name': 'misp-category',
|
||||
'phase_name': galaxy_type}]
|
||||
'phase_name': galaxy['type']}]
|
||||
sdo_args['kill_chain_phases'] = killchain
|
||||
if cluster['tag_name']:
|
||||
labels.append(cluster.get('tag_name'))
|
||||
|
|
|
@ -35,46 +35,30 @@ with open(os.path.join(__path__[0], 'data/describeTypes.json'), 'r') as f:
|
|||
|
||||
class StixParser():
|
||||
def __init__(self):
|
||||
super(StixParser, self).__init__()
|
||||
self.misp_event = MISPEvent()
|
||||
self.event = defaultdict(dict)
|
||||
self.misp_event['Galaxy'] = []
|
||||
|
||||
def loadEvent(self, args):
|
||||
filename = os.path.join(os.path.dirname(args[0]), args[1])
|
||||
with open(filename, 'r', encoding='utf-8') as f:
|
||||
event = json.loads(f.read())
|
||||
def load_data(self, filename, version, event, args):
|
||||
self.filename = filename
|
||||
self.stix_version = 'STIX {}'.format(event.get('spec_version'))
|
||||
for o in event.get('objects'):
|
||||
parsed_object = stix2.parse(o, allow_custom=True)
|
||||
try:
|
||||
object_type = parsed_object._type
|
||||
except AttributeError:
|
||||
object_type = parsed_object['type']
|
||||
object_uuid = parsed_object['id'].split('--')[1]
|
||||
if object_type.startswith('x-misp-object'):
|
||||
object_type = 'x-misp-object'
|
||||
self.event[object_type][object_uuid] = parsed_object
|
||||
if not self.event:
|
||||
print(json.dumps({'success': 0, 'message': 'There is no valid STIX object to import'}))
|
||||
sys.exit(1)
|
||||
if args[2] is not None:
|
||||
self.add_original_file(args[2])
|
||||
self.stix_version = version
|
||||
self.event = event
|
||||
if args and args[0] is not None:
|
||||
self.add_original_file(args[0])
|
||||
try:
|
||||
event_distribution = args[3]
|
||||
event_distribution = args[1]
|
||||
if not isinstance(event_distribution, int):
|
||||
event_distribution = int(event_distribution) if event_distribution.isdigit() else 5
|
||||
except IndexError:
|
||||
event_distribution = 5
|
||||
try:
|
||||
attribute_distribution = args[4]
|
||||
attribute_distribution = args[2]
|
||||
if attribute_distribution != 'event' and not isinstance(attribute_distribution, int):
|
||||
attribute_distribution = int(attribute_distribution) if attribute_distribution.isdigit() else 5
|
||||
except IndexError:
|
||||
attribute_distribution = 5
|
||||
self.misp_event.distribution = event_distribution
|
||||
self.__attribute_distribution = event_distribution if attribute_distribution == 'event' else attribute_distribution
|
||||
self.load_mapping()
|
||||
self._attribute_distribution = event_distribution if attribute_distribution == 'event' else attribute_distribution
|
||||
|
||||
def add_original_file(self, original_filename):
|
||||
with open(self.filename, 'rb') as f:
|
||||
|
@ -86,72 +70,53 @@ class StixParser():
|
|||
'value': self.stix_version})
|
||||
self.misp_event.add_object(**original_file)
|
||||
|
||||
def load_mapping(self):
|
||||
self.objects_mapping = {'asn': {'observable': observable_asn, 'pattern': pattern_asn},
|
||||
'domain-ip': {'observable': observable_domain_ip, 'pattern': pattern_domain_ip},
|
||||
'email': {'observable': self.observable_email, 'pattern': self.pattern_email},
|
||||
'file': {'observable': observable_file, 'pattern': self.pattern_file},
|
||||
'ip-port': {'observable': observable_ip_port, 'pattern': pattern_ip_port},
|
||||
'network-socket': {'observable': observable_socket, 'pattern': pattern_socket},
|
||||
'process': {'observable': observable_process, 'pattern': pattern_process},
|
||||
'registry-key': {'observable': observable_regkey, 'pattern': pattern_regkey},
|
||||
'url': {'observable': observable_url, 'pattern': pattern_url},
|
||||
'WindowsPEBinaryFile': {'observable': self.observable_pe, 'pattern': self.pattern_pe},
|
||||
'x509': {'observable': observable_x509, 'pattern': pattern_x509}}
|
||||
self.object_from_refs = {'course-of-action': self.parse_course_of_action, 'vulnerability': self.parse_vulnerability,
|
||||
'x-misp-object': self.parse_custom}
|
||||
self.object_from_refs.update(dict.fromkeys(list(galaxy_types.keys()), self.parse_galaxy))
|
||||
self.object_from_refs.update(dict.fromkeys(['indicator', 'observed-data'], self.parse_usual_object))
|
||||
|
||||
def handler(self):
|
||||
self.outputname = '{}.stix2'.format(self.filename)
|
||||
if self.from_misp():
|
||||
self.buildMispDict()
|
||||
else:
|
||||
self.version_attribute = {'type': 'text', 'object_relation': 'version', 'value': self.stix_version}
|
||||
self.buildExternalDict()
|
||||
self.buildMISPDict()
|
||||
self.set_distribution()
|
||||
|
||||
def from_misp(self):
|
||||
for _, o in self.event['report'].items():
|
||||
if o._type == 'report' and 'misp:tool="misp2stix2"' in o.get('labels'):
|
||||
return True
|
||||
return False
|
||||
|
||||
def buildMispDict(self):
|
||||
report_attributes = defaultdict(list)
|
||||
orgs = []
|
||||
def buildMISPDict(self):
|
||||
report_attributes = defaultdict(set)
|
||||
for _, report in self.event['report'].items():
|
||||
org_uuid = report['created_by_ref'].split('--')[1]
|
||||
if org_uuid not in orgs:
|
||||
orgs.append(org_uuid)
|
||||
report_name = report['name']
|
||||
if report_name not in orgs:
|
||||
report_attributes['name'].append(report_name)
|
||||
report_attributes['orgs'].add(report['created_by_ref'].split('--')[1])
|
||||
report_attributes['name'].add(report['name'])
|
||||
if report.get('published'):
|
||||
report_attributes['published'].append(report['published'])
|
||||
report_attributes['published'].add(report['published'])
|
||||
if 'labels' in report:
|
||||
report_attributes['labels'].extend([l for l in report['labels'] if l not in report_attributes['labels']])
|
||||
report_attributes['labels'].update([l for l in report['labels']])
|
||||
if 'external_references' in report:
|
||||
self.add_links(report['external_references'])
|
||||
for ref in report['object_refs']:
|
||||
if 'relationship' not in ref:
|
||||
object_type, uuid = ref.split('--')
|
||||
object2parse = self.event[object_type][uuid]
|
||||
labels = object2parse.get('labels')
|
||||
self.object_from_refs[object_type](object2parse, labels)
|
||||
if len(orgs) == 1:
|
||||
identity = self.event['identity'][orgs[0]]
|
||||
self.process_parsing(object2parse, object_type)
|
||||
if len(report_attributes['orgs']) == 1:
|
||||
identity = self.event['identity'][report_attributes['orgs'].pop()]
|
||||
self.misp_event['Org'] = {'name': identity['name']}
|
||||
if len(report_attributes['published']) == 1:
|
||||
self.misp_event.publish_timestamp = self.getTimestampfromDate(report_attributes['published'][0])
|
||||
self.misp_event.publish_timestamp = self.getTimestampfromDate(report_attributes['published'].pop())
|
||||
if len(report_attributes['name']) == 1:
|
||||
self.misp_event.info = report_attributes['name'][0]
|
||||
self.misp_event.info = report_attributes['name'].pop()
|
||||
else:
|
||||
self.misp_event.info = "Imported from MISP import for STIX 2.0 script."
|
||||
self.misp_event.info = "Imported with MISP import script for {}.".format(self.stix_version)
|
||||
for l in report_attributes['labels']:
|
||||
self.misp_event.add_tag(l)
|
||||
|
||||
def set_distribution(self):
|
||||
for attribute in self.misp_event.attributes:
|
||||
attribute.distribution = self._attribute_distribution
|
||||
for misp_object in self.misp_event.objects:
|
||||
misp_object.distribution = self._attribute_distribution
|
||||
for attribute in misp_object.attributes:
|
||||
attribute.distribution = self._attribute_distribution
|
||||
|
||||
def saveFile(self):
|
||||
eventDict = self.misp_event.to_json()
|
||||
outputfile = '{}.stix2'.format(self.filename)
|
||||
with open(outputfile, 'w') as f:
|
||||
f.write(eventDict)
|
||||
|
||||
def add_links(self, refs):
|
||||
for e in refs:
|
||||
link = {"type": "link"}
|
||||
|
@ -165,24 +130,22 @@ class StixParser():
|
|||
link['value'] = e.get('url')
|
||||
self.misp_event.add_attribute(**link)
|
||||
|
||||
def parse_usual_object(self, o, labels):
|
||||
if 'from_object' in labels:
|
||||
self.parse_object(o, labels)
|
||||
else:
|
||||
self.parse_attribute(o, labels)
|
||||
@staticmethod
|
||||
def getTimestampfromDate(stix_date):
|
||||
try:
|
||||
return int(stix_date.timestamp())
|
||||
except AttributeError:
|
||||
return int(time.mktime(time.strptime(stix_date.split('+')[0], "%Y-%m-%d %H:%M:%S")))
|
||||
|
||||
def parse_galaxy(self, o, labels):
|
||||
galaxy_type = self.get_misp_type(labels)
|
||||
tag = labels[1]
|
||||
value = tag.split(':')[1].split('=')[1]
|
||||
galaxy_description, cluster_description = o.get('description').split('|')
|
||||
_, uuid = o.get('id').split('--')
|
||||
galaxy = {'type': galaxy_type, 'name': o.get('name'), 'description': galaxy_description,
|
||||
'GalaxyCluster': [{'type': galaxy_type, 'value':value, 'tag_name': tag,
|
||||
'description': cluster_description, 'uuid': uuid}]}
|
||||
self.misp_event['Galaxy'].append(galaxy)
|
||||
@staticmethod
|
||||
def get_misp_type(labels):
|
||||
return labels[0].split('=')[1][1:-1]
|
||||
|
||||
def parse_course_of_action(self, o, _):
|
||||
@staticmethod
|
||||
def get_misp_category(labels):
|
||||
return labels[1].split('=')[1][1:-1]
|
||||
|
||||
def parse_course_of_action(self, o):
|
||||
misp_object = MISPObject('course-of-action')
|
||||
if 'name' in o:
|
||||
attribute = {'type': 'text', 'object_relation': 'name', 'value': o.get('name')}
|
||||
|
@ -194,6 +157,81 @@ class StixParser():
|
|||
misp_object.add_attribute(**attribute)
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
@staticmethod
|
||||
def parse_observable(observable, attribute_type):
|
||||
return misp_types_mapping[attribute_type](observable, attribute_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_pattern(pattern):
|
||||
if ' AND ' in pattern:
|
||||
pattern_parts = pattern.split(' AND ')
|
||||
if len(pattern_parts) == 3:
|
||||
_, value1 = pattern_parts[2].split(' = ')
|
||||
_, value2 = pattern_parts[0].split(' = ')
|
||||
return '{}|{}'.format(value1[1:-2], value2[1:-1])
|
||||
else:
|
||||
_, value1 = pattern_parts[0].split(' = ')
|
||||
_, value2 = pattern_parts[1].split(' = ')
|
||||
if value1 in ("'ipv4-addr'", "'ipv6-addr'"):
|
||||
return value2[1:-2]
|
||||
return '{}|{}'.format(value1[1:-1], value2[1:-2])
|
||||
else:
|
||||
return pattern.split(' = ')[1][1:-2]
|
||||
|
||||
def parse_pattern_with_data(self, pattern):
|
||||
if 'artifact:payload_bin' not in pattern:
|
||||
return self.parse_pattern(pattern)
|
||||
pattern_parts = pattern.split(' AND ')
|
||||
if len(pattern_parts) == 3:
|
||||
filename = pattern_parts[0].split(' = ')[1]
|
||||
md5 = pattern_parts[1].split(' = ')[1]
|
||||
return "{}|{}".format(filename[1:-1], md5[1:-1]), pattern_parts[2].split(' = ')[1][1:-2]
|
||||
return pattern_parts[0].split(' = ')[1][1:-1], pattern_parts[1].split(' = ')[1][1:-2]
|
||||
|
||||
|
||||
class StixFromMISPParser(StixParser):
|
||||
def __init__(self):
|
||||
super(StixFromMISPParser, self).__init__()
|
||||
self.objects_mapping = {'asn': {'observable': observable_asn, 'pattern': pattern_asn},
|
||||
'domain-ip': {'observable': observable_domain_ip, 'pattern': pattern_domain_ip},
|
||||
'email': {'observable': self.observable_email, 'pattern': self.pattern_email},
|
||||
'file': {'observable': observable_file, 'pattern': self.pattern_file},
|
||||
'ip-port': {'observable': observable_ip_port, 'pattern': pattern_ip_port},
|
||||
'network-socket': {'observable': observable_socket, 'pattern': pattern_socket},
|
||||
'process': {'observable': observable_process, 'pattern': pattern_process},
|
||||
'registry-key': {'observable': observable_regkey, 'pattern': pattern_regkey},
|
||||
'url': {'observable': observable_url, 'pattern': pattern_url},
|
||||
'WindowsPEBinaryFile': {'observable': self.observable_pe, 'pattern': self.pattern_pe},
|
||||
'x509': {'observable': observable_x509, 'pattern': pattern_x509}}
|
||||
self.object_from_refs = {'course-of-action': self.parse_MISP_course_of_action, 'vulnerability': self.parse_vulnerability,
|
||||
'x-misp-object': self.parse_custom}
|
||||
self.object_from_refs.update(dict.fromkeys(list(galaxy_types.keys()), self.parse_galaxy))
|
||||
self.object_from_refs.update(dict.fromkeys(['indicator', 'observed-data'], self.parse_usual_object))
|
||||
|
||||
def process_parsing(self, object2parse, object_type):
|
||||
labels = object2parse.get('labels')
|
||||
self.object_from_refs[object_type](object2parse, labels)
|
||||
|
||||
def parse_usual_object(self, o, labels):
|
||||
if 'from_object' in labels:
|
||||
self.parse_object(o, labels)
|
||||
else:
|
||||
self.parse_attribute(o, labels)
|
||||
|
||||
def parse_galaxy(self, o, labels):
|
||||
name = self.get_misp_type(labels)
|
||||
tag = labels[1]
|
||||
galaxy_type, value = tag.split(':')[1].split('=')
|
||||
galaxy_description, cluster_description = o.get('description').split('|')
|
||||
_, uuid = o.get('id').split('--')
|
||||
galaxy = {'type': galaxy_type, 'name': name, 'description': galaxy_description,
|
||||
'GalaxyCluster': [{'type': galaxy_type, 'value':value, 'tag_name': tag,
|
||||
'description': cluster_description, 'uuid': uuid}]}
|
||||
self.misp_event['Galaxy'].append(galaxy)
|
||||
|
||||
def parse_MISP_course_of_action(self, o, _):
|
||||
self.parse_course_of_action(o)
|
||||
|
||||
def parse_custom(self, o, labels):
|
||||
if 'from_object' in labels:
|
||||
self.parse_custom_object(o, labels)
|
||||
|
@ -499,45 +537,27 @@ class StixParser():
|
|||
self.misp_event.add_object(**pe)
|
||||
return attributes, pe_uuid
|
||||
|
||||
def buildExternalDict(self):
|
||||
self.fetch_report()
|
||||
for o in self.event:
|
||||
object_type = o._type
|
||||
if object_type in ('relationship', 'report'):
|
||||
continue
|
||||
if object_type in galaxy_types:
|
||||
self.parse_external_galaxy(o)
|
||||
elif object_type == 'vulnerability':
|
||||
attribute = {'type': 'vulnerability', 'value': o.get('name')}
|
||||
if 'description' in o:
|
||||
attribute['comment'] = o.get('description')
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
elif object_type == 'course-of-action':
|
||||
self.parse_course_of_action(o)
|
||||
elif object_type == 'indicator':
|
||||
pattern = o.get('pattern')
|
||||
self.parse_external_pattern(pattern)
|
||||
attribute = {'type': 'stix2-pattern', 'object_relation': 'stix2-pattern', 'value': pattern}
|
||||
misp_object = {'name': 'stix2-pattern', 'meta-category': 'stix2-pattern',
|
||||
'Attribute': [self.version_attribute, attribute]}
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def fetch_report(self):
|
||||
reports = []
|
||||
for o in self.event:
|
||||
if o._type == 'report':
|
||||
reports.append(o)
|
||||
if len(reports) == 1:
|
||||
self.report = reports[0]
|
||||
self.parse_report()
|
||||
class ExternalStixParser(StixParser):
|
||||
def __init__(self):
|
||||
super(ExternalStixParser, self).__init__()
|
||||
self.version_attribute = {'type': 'text', 'object_relation': 'version', 'value': self.stix_version}
|
||||
self.object_from_refs = {'course-of-action': self.parse_course_of_action, 'vulnerability': self.parse_external_vulnerability,
|
||||
'indicator': self.parse_external_indicator, 'observed-data': self.parse_external_observable}
|
||||
self.object_from_refs.update(dict.fromkeys(list(galaxy_types.keys()), self.parse_external_galaxy))
|
||||
|
||||
def process_parsing(self, object2parse, object_type):
|
||||
self.object_from_refs[object_type](object2parse)
|
||||
|
||||
def parse_external_galaxy(self, o):
|
||||
galaxy = {'name': galaxy_types[o._type]}
|
||||
if 'kill_chain_phases' in o:
|
||||
galaxy['type'] = o['kill_chain_phases'][0].get('phase_name')
|
||||
galaxy = {'name': galaxy_types[o._type].replace('-', ' ').title()}
|
||||
cluster = defaultdict(dict)
|
||||
cluster['value'] = o.get('name')
|
||||
cluster['description'] = o.get('description')
|
||||
if 'kill_chain_phases' in o:
|
||||
galaxy_type = o['kill_chain_phases'][0].get('phase_name')
|
||||
galaxy['type'] = galaxy_type
|
||||
cluster['type'] = galaxy_type
|
||||
if 'aliases' in o:
|
||||
aliases = []
|
||||
for a in o.get('aliases'):
|
||||
|
@ -546,6 +566,18 @@ class StixParser():
|
|||
galaxy['GalaxyCluster'] = [cluster]
|
||||
self.misp_event['Galaxy'].append(galaxy)
|
||||
|
||||
def parse_external_indicator(self, o):
|
||||
pattern = o.get('pattern')
|
||||
self.parse_external_pattern(pattern)
|
||||
attribute = {'type': 'stix2-pattern', 'object_relation': 'stix2-pattern', 'value': pattern}
|
||||
misp_object = {'name': 'stix2-pattern', 'meta-category': 'stix2-pattern',
|
||||
'Attribute': [self.version_attribute, attribute]}
|
||||
self.misp_event.add_object(**misp_object)
|
||||
|
||||
def parse_external_observable(self, o):
|
||||
observable = o.get('objects')
|
||||
# deeper analyse to come, as well as for indicators
|
||||
|
||||
def parse_external_pattern(self, pattern):
|
||||
if ' OR ' in pattern and ' AND ' not in pattern:
|
||||
pattern = pattern.split('OR')
|
||||
|
@ -558,6 +590,12 @@ class StixParser():
|
|||
attribute = self.attribute_from_external_pattern(pattern[0])
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
|
||||
def parse_external_vulnerability(self, o):
|
||||
attribute = {'type': 'vulnerability', 'value': o.get('name')}
|
||||
if 'description' in o:
|
||||
attribute['comment'] = o.get('description')
|
||||
self.misp_event.add_attribute(**attribute)
|
||||
|
||||
@staticmethod
|
||||
def attribute_from_external_pattern(pattern):
|
||||
pattern_type, pattern_value = pattern.split(' = ')
|
||||
|
@ -569,69 +607,33 @@ class StixParser():
|
|||
# Might cause some issues, need more examples to test
|
||||
return {'type': external_pattern_mapping[stix_type][value_type].get('type'), 'value': pattern_value}
|
||||
|
||||
def set_distribution(self):
|
||||
for attribute in self.misp_event.attributes:
|
||||
attribute.distribution = self.__attribute_distribution
|
||||
for misp_object in self.misp_event.objects:
|
||||
misp_object.distribution = self.__attribute_distribution
|
||||
for attribute in misp_object.attributes:
|
||||
attribute.distribution = self.__attribute_distribution
|
||||
|
||||
def saveFile(self):
|
||||
eventDict = self.misp_event.to_json()
|
||||
outputfile = '{}.stix2'.format(self.filename)
|
||||
with open(outputfile, 'w') as f:
|
||||
f.write(eventDict)
|
||||
|
||||
@staticmethod
|
||||
def getTimestampfromDate(stix_date):
|
||||
try:
|
||||
return int(stix_date.timestamp())
|
||||
except AttributeError:
|
||||
return int(time.mktime(time.strptime(stix_date.split('+')[0], "%Y-%m-%d %H:%M:%S")))
|
||||
|
||||
@staticmethod
|
||||
def get_misp_type(labels):
|
||||
return labels[0].split('=')[1][1:-1]
|
||||
|
||||
@staticmethod
|
||||
def get_misp_category(labels):
|
||||
return labels[1].split('=')[1][1:-1]
|
||||
|
||||
@staticmethod
|
||||
def parse_observable(observable, attribute_type):
|
||||
return misp_types_mapping[attribute_type](observable, attribute_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_pattern(pattern):
|
||||
if ' AND ' in pattern:
|
||||
pattern_parts = pattern.split(' AND ')
|
||||
if len(pattern_parts) == 3:
|
||||
_, value1 = pattern_parts[2].split(' = ')
|
||||
_, value2 = pattern_parts[0].split(' = ')
|
||||
return '{}|{}'.format(value1[1:-2], value2[1:-1])
|
||||
else:
|
||||
_, value1 = pattern_parts[0].split(' = ')
|
||||
_, value2 = pattern_parts[1].split(' = ')
|
||||
if value1 in ("'ipv4-addr'", "'ipv6-addr'"):
|
||||
return value2[1:-2]
|
||||
return '{}|{}'.format(value1[1:-1], value2[1:-2])
|
||||
else:
|
||||
return pattern.split(' = ')[1][1:-2]
|
||||
|
||||
def parse_pattern_with_data(self, pattern):
|
||||
if 'artifact:payload_bin' not in pattern:
|
||||
return self.parse_pattern(pattern)
|
||||
pattern_parts = pattern.split(' AND ')
|
||||
if len(pattern_parts) == 3:
|
||||
filename = pattern_parts[0].split(' = ')[1]
|
||||
md5 = pattern_parts[1].split(' = ')[1]
|
||||
return "{}|{}".format(filename[1:-1], md5[1:-1]), pattern_parts[2].split(' = ')[1][1:-2]
|
||||
return pattern_parts[0].split(' = ')[1][1:-1], pattern_parts[1].split(' = ')[1][1:-2]
|
||||
def from_misp(reports):
|
||||
for _, o in reports.items():
|
||||
if 'misp:tool="misp2stix2"' in o.get('labels'):
|
||||
return True
|
||||
return False
|
||||
|
||||
def main(args):
|
||||
stix_parser = StixParser()
|
||||
stix_parser.loadEvent(args)
|
||||
stix_event = defaultdict(dict)
|
||||
filename = os.path.join(os.path.dirname(args[0]), args[1])
|
||||
with open(filename, 'rb') as f:
|
||||
event = json.loads(f.read().decode('utf-8'))
|
||||
for o in event.get('objects'):
|
||||
parsed_object = stix2.parse(o, allow_custom=True)
|
||||
try:
|
||||
object_type = parsed_object._type
|
||||
except AttributeError:
|
||||
object_type = parsed_object['type']
|
||||
object_uuid = parsed_object['id'].split('--')[1]
|
||||
if object_type.startswith('x-misp-object'):
|
||||
object_type = 'x-misp-object'
|
||||
stix_event[object_type][object_uuid] = parsed_object
|
||||
if not stix_event:
|
||||
print(json.dumps({'success': 0, 'message': 'There is no valid STIX object to import'}))
|
||||
sys.exit(1)
|
||||
stix_version = 'STIX {}'.format(event.get('spec_version'))
|
||||
stix_parser = StixFromMISPParser() if from_misp(stix_event['report']) else ExternalStixParser()
|
||||
stix_parser.load_data(filename, stix_version, stix_event, args[2:])
|
||||
stix_parser.handler()
|
||||
stix_parser.saveFile()
|
||||
print(1)
|
||||
|
|
Loading…
Reference in New Issue