mirror of https://github.com/MISP/PyMISP
Merge branch 'nbareil-master'
commit
676d013905
215
pymisp/api.py
215
pymisp/api.py
|
@ -399,22 +399,13 @@ class PyMISP(object):
|
||||||
return self._send_attributes(event, attributes, proposal)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
|
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'link', link, category, to_ids, comment, distribution, proposal)
|
||||||
for link in self._one_or_more(link):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'text', name, category, to_ids, comment, distribution, proposal)
|
||||||
for name in self._one_or_more(name):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'filename', filename, category, to_ids, comment, distribution, proposal)
|
||||||
for filename in self._one_or_more(filename):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
if rvalue:
|
if rvalue:
|
||||||
|
@ -432,7 +423,7 @@ class PyMISP(object):
|
||||||
attributes = []
|
attributes = []
|
||||||
|
|
||||||
for regkey, rvalue in regkeys_values.items():
|
for regkey, rvalue in regkeys_values.items():
|
||||||
if rvalue:
|
if rvalue is not None:
|
||||||
type_value = 'regkey|value'
|
type_value = 'regkey|value'
|
||||||
value = '{}|{}'.format(regkey, rvalue)
|
value = '{}|{}'.format(regkey, rvalue)
|
||||||
else:
|
else:
|
||||||
|
@ -443,199 +434,119 @@ class PyMISP(object):
|
||||||
return self._send_attributes(event, attributes, proposal)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
if not (in_file or in_memory):
|
||||||
for pattern in self._one_or_more(pattern):
|
raise PyMISPError('Invalid pattern type: please use in_memory=True or in_file=True')
|
||||||
if in_file:
|
itemtype = 'pattern-in-file' if in_file else 'pattern-in-memory'
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
|
return self.add_named_attribute(event, itemtype, pattern, category, to_ids, comment, distribution, proposal)
|
||||||
if in_memory:
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
|
|
||||||
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
def scrub(s):
|
||||||
for named_pipe in self._one_or_more(named_pipe):
|
if not s.startswith('\\.\\pipe\\'):
|
||||||
if not named_pipe.startswith('\\.\\pipe\\'):
|
s = '\\.\\pipe\\{}'.format(s)
|
||||||
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
|
return s
|
||||||
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
|
attributes = list(map(scrub, self._one_or_more(named_pipe)))
|
||||||
return self._send_attributes(event, attributes, proposal)
|
return self.add_named_attribute(event, 'named pipe', attributes, category, to_ids, comment, distribution, proposal)
|
||||||
|
|
||||||
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
def scrub(s):
|
||||||
if not mutex.startswith('\\BaseNamedObjects\\'):
|
if not s.startswith('\\BaseNamedObjects\\'):
|
||||||
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
|
s = '\\BaseNamedObjects\\{}'.format(s)
|
||||||
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
|
return self
|
||||||
return self._send_attributes(event, attributes, proposal)
|
attributes = list(map(scrub, self._one_or_more(mutex)))
|
||||||
|
return self.add_named_attribute(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal)
|
||||||
|
|
||||||
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'yara', yara, category, to_ids, comment, distribution, proposal)
|
||||||
for yara in self._one_or_more(yara):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##### Network attributes #####
|
# ##### Network attributes #####
|
||||||
|
|
||||||
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'ip-dst', ipdst, category, to_ids, comment, distribution, proposal)
|
||||||
for ipdst in self._one_or_more(ipdst):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'ip-src', ipsrc, category, to_ids, comment, distribution, proposal)
|
||||||
for ipsrc in self._one_or_more(ipsrc):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'hostname', hostname, category, to_ids, comment, distribution, proposal)
|
||||||
for hostname in self._one_or_more(hostname):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'domain', domain, category, to_ids, comment, distribution, proposal)
|
||||||
for domain in self._one_or_more(domain):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
composed = list(map(lambda x: '%s|%s' % (domain, x), ip))
|
||||||
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
|
return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal)
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
composed = list(map(lambda x: '%s|%s' % (x[0], x[1]), domain_ips.items()))
|
||||||
for domain, ip in domain_ips.items():
|
return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal)
|
||||||
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'url', url, category, to_ids, comment, distribution, proposal)
|
||||||
for url in self._one_or_more(url):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'user-agent', useragent, category, to_ids, comment, distribution, proposal)
|
||||||
for useragent in self._one_or_more(useragent):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'pattern-in-traffic', pattern, category, to_ids, comment, distribution, proposal)
|
||||||
for pattern in self._one_or_more(pattern):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'snort', snort, category, to_ids, comment, distribution, proposal)
|
||||||
for snort in self._one_or_more(snort):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_net_other(self, event, netother, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_net_other(self, event, netother, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'other', netother, category, to_ids, comment, distribution, proposal)
|
||||||
attributes.append(self._prepare_full_attribute(category, 'other', netother, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##### Email attributes #####
|
# ##### Email attributes #####
|
||||||
|
|
||||||
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_email_src(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'email-src', email, category, to_ids, comment, distribution, proposal)
|
||||||
for email in self._one_or_more(email):
|
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'email-dst', email, category, to_ids, comment, distribution, proposal)
|
||||||
for email in self._one_or_more(email):
|
|
||||||
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_email_subject(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'email-subject', email, category, to_ids, comment, distribution, proposal)
|
||||||
for email in self._one_or_more(email):
|
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_email_attachment(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'email-attachment', email, category, to_ids, comment, distribution, proposal)
|
||||||
for email in self._one_or_more(email):
|
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##### Target attributes #####
|
# ##### Target attributes #####
|
||||||
|
|
||||||
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_email(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-email', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_user(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-user', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_machine(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-machine', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_org(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-org', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_location(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-location', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_target_external(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'target-external', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##### Attribution attributes #####
|
# ##### Attribution attributes #####
|
||||||
|
|
||||||
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
def add_threat_actor(self, event, target, category='Attribution', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'threat-actor', target, category, to_ids, comment, distribution, proposal)
|
||||||
for target in self._one_or_more(target):
|
|
||||||
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##### Internal reference attributes #####
|
# ##### Internal reference attributes #####
|
||||||
|
|
||||||
def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_internal_link(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'link', reference, category, to_ids, comment, distribution, proposal)
|
||||||
for reference in self._one_or_more(reference):
|
|
||||||
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_internal_comment(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'comment', reference, category, to_ids, comment, distribution, proposal)
|
||||||
for reference in self._one_or_more(reference):
|
|
||||||
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_internal_text(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'text', reference, category, to_ids, comment, distribution, proposal)
|
||||||
for reference in self._one_or_more(reference):
|
|
||||||
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_internal_other(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
return self.add_named_attribute(event, 'other', reference, category, to_ids, comment, distribution, proposal)
|
||||||
for reference in self._one_or_more(reference):
|
|
||||||
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
|
|
||||||
return self._send_attributes(event, attributes, proposal)
|
|
||||||
|
|
||||||
# ##################################################
|
# ##################################################
|
||||||
# ######### Upload samples through the API #########
|
# ######### Upload samples through the API #########
|
||||||
|
|
|
@ -104,7 +104,7 @@ class MISPAttribute(object):
|
||||||
def set_all_values(self, **kwargs):
|
def set_all_values(self, **kwargs):
|
||||||
if kwargs.get('type') and kwargs.get('category'):
|
if kwargs.get('type') and kwargs.get('category'):
|
||||||
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
|
if kwargs['type'] not in self.category_type_mapping[kwargs['category']]:
|
||||||
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.capitalizeformat(self.type, self.category, (', '.join(self.category_type_mapping[self.category]))))
|
raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.format(self.type, self.category, (', '.join(self.category_type_mapping[kwargs['category']]))))
|
||||||
# Required
|
# Required
|
||||||
if kwargs.get('type'):
|
if kwargs.get('type'):
|
||||||
self.type = kwargs['type']
|
self.type = kwargs['type']
|
||||||
|
|
|
@ -8,7 +8,7 @@ import os
|
||||||
|
|
||||||
import pymisp as pm
|
import pymisp as pm
|
||||||
from pymisp import PyMISP
|
from pymisp import PyMISP
|
||||||
from pymisp import NewEventError
|
# from pymisp import NewEventError
|
||||||
from pymisp import MISPEvent
|
from pymisp import MISPEvent
|
||||||
from pymisp import EncodeUpdate
|
from pymisp import EncodeUpdate
|
||||||
from pymisp import EncodeFull
|
from pymisp import EncodeFull
|
||||||
|
@ -84,7 +84,7 @@ class TestOffline(unittest.TestCase):
|
||||||
def test_publish(self, m):
|
def test_publish(self, m):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
pymisp = PyMISP(self.domain, self.key)
|
pymisp = PyMISP(self.domain, self.key)
|
||||||
e = pymisp.publish(self.event) # requests-mock always return the non-published event
|
e = pymisp.publish(self.event) # requests-mock always return the non-published event
|
||||||
pub = self.event
|
pub = self.event
|
||||||
pub['Event']['published'] = True
|
pub['Event']['published'] = True
|
||||||
# self.assertEqual(e, pub) FIXME: broken test, not-published event returned
|
# self.assertEqual(e, pub) FIXME: broken test, not-published event returned
|
||||||
|
@ -135,17 +135,89 @@ class TestOffline(unittest.TestCase):
|
||||||
json.dumps(misp_event, cls=EncodeUpdate)
|
json.dumps(misp_event, cls=EncodeUpdate)
|
||||||
json.dumps(misp_event, cls=EncodeFull)
|
json.dumps(misp_event, cls=EncodeFull)
|
||||||
|
|
||||||
def test_searchIndexByTagId (self, m):
|
def test_searchIndexByTagId(self, m):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
pymisp = PyMISP(self.domain, self.key)
|
pymisp = PyMISP(self.domain, self.key)
|
||||||
response = pymisp.search_index(tag="1")
|
response = pymisp.search_index(tag="1")
|
||||||
self.assertEqual(response['response'],self.search_index_result)
|
self.assertEqual(response['response'], self.search_index_result)
|
||||||
|
|
||||||
def test_searchIndexByTagName (self, m):
|
def test_searchIndexByTagName(self, m):
|
||||||
self.initURI(m)
|
self.initURI(m)
|
||||||
pymisp = PyMISP(self.domain, self.key)
|
pymisp = PyMISP(self.domain, self.key)
|
||||||
response = pymisp.search_index(tag='ecsirt:malicious-code="ransomware"')
|
response = pymisp.search_index(tag='ecsirt:malicious-code="ransomware"')
|
||||||
self.assertEqual(response['response'],self.search_index_result)
|
self.assertEqual(response['response'], self.search_index_result)
|
||||||
|
|
||||||
|
def test_addAttributes(self, m):
|
||||||
|
class MockPyMISP(PyMISP):
|
||||||
|
def _send_attributes(self, event, attributes, proposal=False):
|
||||||
|
return len(attributes)
|
||||||
|
self.initURI(m)
|
||||||
|
p = MockPyMISP(self.domain, self.key)
|
||||||
|
evt = p.get(1)
|
||||||
|
self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
||||||
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
||||||
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b',
|
||||||
|
filename='foobar.exe'))
|
||||||
|
self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940',
|
||||||
|
sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
|
||||||
|
sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b'))
|
||||||
|
p.av_detection_link(evt, 'https://foocorp.com')
|
||||||
|
p.add_detection_name(evt, 'WATERMELON')
|
||||||
|
p.add_filename(evt, 'foobar.exe')
|
||||||
|
p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar')
|
||||||
|
p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar', rvalue='foobar')
|
||||||
|
regkeys = {
|
||||||
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foo': None,
|
||||||
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bar': 'baz',
|
||||||
|
'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bae': 0,
|
||||||
|
}
|
||||||
|
self.assertEqual(3, p.add_regkeys(evt, regkeys))
|
||||||
|
p.add_pattern(evt, '.*foobar.*', in_memory=True)
|
||||||
|
p.add_pattern(evt, '.*foobar.*', in_file=True)
|
||||||
|
self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False)
|
||||||
|
p.add_pipe(evt, 'foo')
|
||||||
|
p.add_pipe(evt, '\\.\\pipe\\foo')
|
||||||
|
self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', 'baz']))
|
||||||
|
self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz']))
|
||||||
|
p.add_mutex(evt, 'foo')
|
||||||
|
self.assertEquals(1, p.add_mutex(evt, '\\BaseNamedObjects\\foo'))
|
||||||
|
self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', 'baz']))
|
||||||
|
self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz']))
|
||||||
|
p.add_yara(evt, 'rule Foo {}')
|
||||||
|
self.assertEquals(2, p.add_yara(evt, ['rule Foo {}', 'rule Bar {}']))
|
||||||
|
p.add_ipdst(evt, '1.2.3.4')
|
||||||
|
self.assertEquals(2, p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8']))
|
||||||
|
p.add_ipsrc(evt, '1.2.3.4')
|
||||||
|
self.assertEquals(2, p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8']))
|
||||||
|
p.add_hostname(evt, 'a.foobar.com')
|
||||||
|
self.assertEquals(2, p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com']))
|
||||||
|
p.add_domain(evt, 'foobar.com')
|
||||||
|
self.assertEquals(2, p.add_domain(evt, ['foobar.com', 'foobaz.com']))
|
||||||
|
p.add_domain_ip(evt, 'foo.com', '1.2.3.4')
|
||||||
|
self.assertEquals(2, p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8']))
|
||||||
|
self.assertEquals(2, p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'}))
|
||||||
|
p.add_url(evt, 'https://example.com')
|
||||||
|
self.assertEquals(2, p.add_url(evt, ['https://example.com', 'http://foo.com']))
|
||||||
|
p.add_useragent(evt, 'Mozilla')
|
||||||
|
self.assertEquals(2, p.add_useragent(evt, ['Mozilla', 'Godzilla']))
|
||||||
|
p.add_traffic_pattern(evt, 'blabla')
|
||||||
|
p.add_snort(evt, 'blaba')
|
||||||
|
p.add_net_other(evt, 'blabla')
|
||||||
|
p.add_email_src(evt, 'foo@bar.com')
|
||||||
|
p.add_email_dst(evt, 'foo@bar.com')
|
||||||
|
p.add_email_subject(evt, 'you won the lottery')
|
||||||
|
p.add_email_attachment(evt, 'foo.doc')
|
||||||
|
p.add_target_email(evt, 'foo@bar.com')
|
||||||
|
p.add_target_user(evt, 'foo')
|
||||||
|
p.add_target_machine(evt, 'foobar')
|
||||||
|
p.add_target_org(evt, 'foobar')
|
||||||
|
p.add_target_location(evt, 'foobar')
|
||||||
|
p.add_target_external(evt, 'foobar')
|
||||||
|
p.add_threat_actor(evt, 'WATERMELON')
|
||||||
|
p.add_internal_link(evt, 'foobar')
|
||||||
|
p.add_internal_comment(evt, 'foobar')
|
||||||
|
p.add_internal_text(evt, 'foobar')
|
||||||
|
p.add_internal_other(evt, 'foobar')
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue