Factorize all add_* in favor of add_named_attribute()

Not tested...
pull/30/head
Nicolas Bareil 2016-11-29 09:14:18 +01:00
parent 7b5b45146c
commit d3d7bccf0b
1 changed files with 62 additions and 151 deletions

View File

@ -399,22 +399,13 @@ class PyMISP(object):
return self._send_attributes(event, attributes, proposal)
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for link in self._one_or_more(link):
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'link', link, category, to_ids, comment, distribution, proposal)
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for name in self._one_or_more(name):
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'text', name, category, to_ids, comment, distribution, proposal)
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for filename in self._one_or_more(filename):
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'filename', filename, category, to_ids, comment, distribution, proposal)
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
if rvalue:
@ -443,199 +434,119 @@ class PyMISP(object):
return self._send_attributes(event, attributes, proposal)
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for pattern in self._one_or_more(pattern):
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
if not (in_file or in_memory):
raise PyMISPError('Invalid pattern type: please use in_memory=True or in_file=True')
itemtype = 'pattern-in-file' if in_file else 'pattern-in-memory'
return self._add_named_attributes(event, itemtype, pattern, category, to_ids, comment, distribution, proposal)
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for named_pipe in self._one_or_more(named_pipe):
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def scrub(s):
if not s.startswith('\\.\\pipe\\'):
s = '\\.\\pipe\\{}'.format(s)
return s
attributes = map(scrub, self._one_or_more(named_pipe))
return self._add_named_attributes(event, 'named pipe', value, category, to_ids, comment, distribution, proposal)
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not mutex.startswith('\\BaseNamedObjects\\'):
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def scrub(s):
if not s.startswith('\\BaseNamedObjects\\'):
s = '\\BaseNamedObjects\\{}'.format(s)
return self
attributes = map(scrub, self._one_or_more(mutex))
return self._add_named_attributes(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal)
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for yara in self._one_or_more(yara):
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'yara', yara, category, to_ids, comment, distribution, proposal)
# ##### Network attributes #####
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for ipdst in self._one_or_more(ipdst):
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'ip-dst', ipdst, category, to_ids, comment, distribution, proposal)
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for ipsrc in self._one_or_more(ipsrc):
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'ip-src', ipsrc, category, to_ids, comment, distribution, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for hostname in self._one_or_more(hostname):
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'hostname', hostname, category, to_ids, comment, distribution, proposal)
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for domain in self._one_or_more(domain):
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'domain', domain, category, to_ids, comment, distribution, proposal)
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
composed = map(lambda x,y: '%s|%s' % (x, z), zip(domain, ip))
return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal)
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for domain, ip in domain_ips.items():
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
composed = map(lambda x,y: '%s|%s' % (x, z), domain_ips.items())
return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal)
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for url in self._one_or_more(url):
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'url', url, category, to_ids, comment, distribution, proposal)
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for useragent in self._one_or_more(useragent):
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'user-agent', useragent, category, to_ids, comment, distribution, proposal)
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for pattern in self._one_or_more(pattern):
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'pattern-in-traffic', pattern, category, to_ids, comment, distribution, proposal)
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for snort in self._one_or_more(snort):
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'snort', snort, category, to_ids, comment, distribution, proposal)
def add_net_other(self, event, netother, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'other', netother, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'other', netother, category, to_ids, comment, distribution, proposal)
# ##### Email attributes #####
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_src(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'email-src', email, category, to_ids, comment, distribution, proposal)
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
return self._add_named_attributes(event, 'email-dst', email, category, to_ids, comment, distribution, proposal)
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_subject(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'email-subject', email, category, to_ids, comment, distribution, proposal)
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_attachment(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'email-attachement', email, category, to_ids, comment, distribution, proposal)
# ##### Target attributes #####
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_email(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-email', target, category, to_ids, comment, distribution, proposal)
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_user(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-user', target, category, to_ids, comment, distribution, proposal)
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_machine(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-machine', target, category, to_ids, comment, distribution, proposal)
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_org(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-orge', target, category, to_ids, comment, distribution, proposal)
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_location(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-location', target, category, to_ids, comment, distribution, proposal)
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_external(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'target-external', target, category, to_ids, comment, distribution, proposal)
# ##### Attribution attributes #####
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_threat_actor(self, event, target, category='Attribution', to_ids=True, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'threat-actor', target, category, to_ids, comment, distribution, proposal)
# ##### Internal reference attributes #####
def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_link(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'link', reference, category, to_ids, comment, distribution, proposal)
def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_comment(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'comment', reference, category, to_ids, comment, distribution, proposal)
def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_text(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'text', reference, category, to_ids, comment, distribution, proposal)
def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_other(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False):
return self._add_named_attributes(event, 'other', reference, category, to_ids, comment, distribution, proposal)
# ##################################################
# ######### Upload samples through the API #########