allow multiple attributes to be sent off at once

Slightly worked out version of the suggestion by doomedraven in #42.
pull/25/head
Jurriaan Bremer 2016-08-16 11:44:08 +02:00
parent 2722540322
commit f23a7c3357
2 changed files with 90 additions and 36 deletions

View File

@ -349,6 +349,10 @@ class PyMISP(object):
event['Event']['id'] = int(event['Event']['id'])
return event
def _one_or_more(self, value):
"""Returns a list/tuple of one or more items, regardless of input."""
return value if isinstance(value, (tuple, list)) else (value,)
# ########## Helpers ##########
def get(self, eid):
@ -443,22 +447,23 @@ class PyMISP(object):
def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
for link in self._one_or_more(link):
attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
for name in self._one_or_more(name):
attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
for filename in self._one_or_more(filename):
attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
type_value = '{}'
value = '{}'
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
@ -470,20 +475,36 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_regkeys(self, event, regkeys_values, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for regkey, rvalue in regkeys_values.items():
if rvalue:
type_value = 'regkey|value'
value = '{}|{}'.format(regkey, rvalue)
else:
type_value = 'regkey'
value = regkey
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
if in_file:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
if in_memory:
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
for named_pipe in self._one_or_more(named_pipe):
if not named_pipe.startswith('\\.\\pipe\\'):
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
@ -495,29 +516,34 @@ class PyMISP(object):
def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
for yara in self._one_or_more(yara):
attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Network attributes #####
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
for ipdst in self._one_or_more(ipdst):
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
for ipsrc in self._one_or_more(ipsrc):
attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
for hostname in self._one_or_more(hostname):
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
for domain in self._one_or_more(domain):
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
@ -525,107 +551,132 @@ class PyMISP(object):
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
for domain, ip in domain_ips.items():
attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
for url in self._one_or_more(url):
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
for useragent in self._one_or_more(useragent):
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
for pattern in self._one_or_more(pattern):
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
for snort in self._one_or_more(snort):
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Email attributes #####
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
for email in self._one_or_more(email):
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Target attributes #####
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Attribution attributes #####
def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
for target in self._one_or_more(target):
attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##### Internal reference attributes #####
def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False):
attributes = []
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
for reference in self._one_or_more(reference):
attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution))
return self._send_attributes(event, attributes, proposal)
# ##################################################

View File

@ -156,6 +156,9 @@ class TestBasic(unittest.TestCase):
time.sleep(1)
self.delete(eventid)
def test_one_or_more(self):
self.assertEqual(self.misp._one_or_more(1), (1,))
self.assertEqual(self.misp._one_or_more([1]), [1])
if __name__ == '__main__':
unittest.main()