From d3d7bccf0b78e3e7226a0dfd1681b31500552e07 Mon Sep 17 00:00:00 2001 From: Nicolas Bareil Date: Tue, 29 Nov 2016 09:14:18 +0100 Subject: [PATCH 1/5] Factorize all add_* in favor of add_named_attribute() Not tested... --- pymisp/api.py | 213 +++++++++++++++----------------------------------- 1 file changed, 62 insertions(+), 151 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index a457ca2..92ecd1d 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -399,22 +399,13 @@ class PyMISP(object): return self._send_attributes(event, attributes, proposal) def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for link in self._one_or_more(link): - attributes.append(self._prepare_full_attribute(category, 'link', link, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'link', link, category, to_ids, comment, distribution, proposal) def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for name in self._one_or_more(name): - attributes.append(self._prepare_full_attribute(category, 'text', name, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'text', name, category, to_ids, comment, distribution, proposal) def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for filename in self._one_or_more(filename): - attributes.append(self._prepare_full_attribute(category, 'filename', filename, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'filename', filename, category, to_ids, comment, distribution, proposal) def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): if rvalue: @@ -443,199 +434,119 @@ class PyMISP(object): return self._send_attributes(event, attributes, proposal) def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for pattern in self._one_or_more(pattern): - if in_file: - attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution)) - if in_memory: - attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution)) - - return self._send_attributes(event, attributes, proposal) + if not (in_file or in_memory): + raise PyMISPError('Invalid pattern type: please use in_memory=True or in_file=True') + itemtype = 'pattern-in-file' if in_file else 'pattern-in-memory' + return self._add_named_attributes(event, itemtype, pattern, category, to_ids, comment, distribution, proposal) def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for named_pipe in self._one_or_more(named_pipe): - if not named_pipe.startswith('\\.\\pipe\\'): - named_pipe = '\\.\\pipe\\{}'.format(named_pipe) - attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def scrub(s): + if not s.startswith('\\.\\pipe\\'): + s = '\\.\\pipe\\{}'.format(s) + return s + attributes = map(scrub, self._one_or_more(named_pipe)) + return self._add_named_attributes(event, 'named pipe', value, category, to_ids, comment, distribution, proposal) def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - if not mutex.startswith('\\BaseNamedObjects\\'): - mutex = '\\BaseNamedObjects\\{}'.format(mutex) - attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def scrub(s): + if not s.startswith('\\BaseNamedObjects\\'): + s = '\\BaseNamedObjects\\{}'.format(s) + return self + attributes = map(scrub, self._one_or_more(mutex)) + return self._add_named_attributes(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal) def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for yara in self._one_or_more(yara): - attributes.append(self._prepare_full_attribute(category, 'yara', yara, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'yara', yara, category, to_ids, comment, distribution, proposal) # ##### Network attributes ##### def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for ipdst in self._one_or_more(ipdst): - attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'ip-dst', ipdst, category, to_ids, comment, distribution, proposal) def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for ipsrc in self._one_or_more(ipsrc): - attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'ip-src', ipsrc, category, to_ids, comment, distribution, proposal) def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for hostname in self._one_or_more(hostname): - attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'hostname', hostname, category, to_ids, comment, distribution, proposal) def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for domain in self._one_or_more(domain): - attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'domain', domain, category, to_ids, comment, distribution, proposal) def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + composed = map(lambda x,y: '%s|%s' % (x, z), zip(domain, ip)) + return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for domain, ip in domain_ips.items(): - attributes.append(self._prepare_full_attribute(category, 'domain|ip', "%s|%s" % (domain, ip), to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + composed = map(lambda x,y: '%s|%s' % (x, z), domain_ips.items()) + return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for url in self._one_or_more(url): - attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'url', url, category, to_ids, comment, distribution, proposal) def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for useragent in self._one_or_more(useragent): - attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'user-agent', useragent, category, to_ids, comment, distribution, proposal) def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for pattern in self._one_or_more(pattern): - attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'pattern-in-traffic', pattern, category, to_ids, comment, distribution, proposal) def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for snort in self._one_or_more(snort): - attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'snort', snort, category, to_ids, comment, distribution, proposal) def add_net_other(self, event, netother, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - attributes.append(self._prepare_full_attribute(category, 'other', netother, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'other', netother, category, to_ids, comment, distribution, proposal) # ##### Email attributes ##### - def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for email in self._one_or_more(email): - attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_email_src(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'email-src', email, category, to_ids, comment, distribution, proposal) def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for email in self._one_or_more(email): - attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + return self._add_named_attributes(event, 'email-dst', email, category, to_ids, comment, distribution, proposal) - def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for email in self._one_or_more(email): - attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_email_subject(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'email-subject', email, category, to_ids, comment, distribution, proposal) - def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for email in self._one_or_more(email): - attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_email_attachment(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'email-attachement', email, category, to_ids, comment, distribution, proposal) # ##### Target attributes ##### - def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_email(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-email', target, category, to_ids, comment, distribution, proposal) - def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_user(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-user', target, category, to_ids, comment, distribution, proposal) - def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_machine(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-machine', target, category, to_ids, comment, distribution, proposal) - def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_org(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-orge', target, category, to_ids, comment, distribution, proposal) - def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_location(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-location', target, category, to_ids, comment, distribution, proposal) - def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_target_external(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'target-external', target, category, to_ids, comment, distribution, proposal) # ##### Attribution attributes ##### - def add_threat_actor(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): - attributes = [] - for target in self._one_or_more(target): - attributes.append(self._prepare_full_attribute('Attribution', 'threat-actor', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_threat_actor(self, event, target, category='Attribution', to_ids=True, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'threat-actor', target, category, to_ids, comment, distribution, proposal) # ##### Internal reference attributes ##### - def add_internal_link(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for reference in self._one_or_more(reference): - attributes.append(self._prepare_full_attribute('Internal reference', 'link', reference, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_internal_link(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'link', reference, category, to_ids, comment, distribution, proposal) - def add_internal_comment(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for reference in self._one_or_more(reference): - attributes.append(self._prepare_full_attribute('Internal reference', 'comment', reference, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_internal_comment(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'comment', reference, category, to_ids, comment, distribution, proposal) - def add_internal_text(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for reference in self._one_or_more(reference): - attributes.append(self._prepare_full_attribute('Internal reference', 'text', reference, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_internal_text(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'text', reference, category, to_ids, comment, distribution, proposal) - def add_internal_other(self, event, reference, to_ids=False, comment=None, distribution=None, proposal=False): - attributes = [] - for reference in self._one_or_more(reference): - attributes.append(self._prepare_full_attribute('Internal reference', 'other', reference, to_ids, comment, distribution)) - return self._send_attributes(event, attributes, proposal) + def add_internal_other(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): + return self._add_named_attributes(event, 'other', reference, category, to_ids, comment, distribution, proposal) # ################################################## # ######### Upload samples through the API ######### From e44e33fe903d0d3af551e4895f72d784269b2def Mon Sep 17 00:00:00 2001 From: Nicolas Bareil Date: Thu, 1 Dec 2016 10:48:33 +0100 Subject: [PATCH 2/5] capitalizeformat() does not exist on Python2 and fix category variables --- pymisp/mispevent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 1fdd0cf..2223868 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -104,7 +104,7 @@ class MISPAttribute(object): def set_all_values(self, **kwargs): if kwargs.get('type') and kwargs.get('category'): if kwargs['type'] not in self.category_type_mapping[kwargs['category']]: - raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.capitalizeformat(self.type, self.category, (', '.join(self.category_type_mapping[self.category])))) + raise NewAttributeError('{} and {} is an invalid combinaison, type for this category has to be in {}'.format(self.type, self.category, (', '.join(self.category_type_mapping[kwargs['category']])))) # Required if kwargs.get('type'): self.type = kwargs['type'] From fff3a66d09d2c8eb19d3ed7cdc920b6de9a33892 Mon Sep 17 00:00:00 2001 From: Nicolas Bareil Date: Thu, 1 Dec 2016 10:49:12 +0100 Subject: [PATCH 3/5] Unit-tests --- pymisp/api.py | 72 +++++++++++++++++++++---------------------- tests/test_offline.py | 72 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 36 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 92ecd1d..55d8852 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -399,13 +399,13 @@ class PyMISP(object): return self._send_attributes(event, attributes, proposal) def av_detection_link(self, event, link, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'link', link, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'link', link, category, to_ids, comment, distribution, proposal) def add_detection_name(self, event, name, category='Antivirus detection', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'text', name, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'text', name, category, to_ids, comment, distribution, proposal) def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'filename', filename, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'filename', filename, category, to_ids, comment, distribution, proposal) def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): if rvalue: @@ -423,7 +423,7 @@ class PyMISP(object): attributes = [] for regkey, rvalue in regkeys_values.items(): - if rvalue: + if rvalue is not None: type_value = 'regkey|value' value = '{}|{}'.format(regkey, rvalue) else: @@ -437,7 +437,7 @@ class PyMISP(object): if not (in_file or in_memory): raise PyMISPError('Invalid pattern type: please use in_memory=True or in_file=True') itemtype = 'pattern-in-file' if in_file else 'pattern-in-memory' - return self._add_named_attributes(event, itemtype, pattern, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, itemtype, pattern, category, to_ids, comment, distribution, proposal) def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): def scrub(s): @@ -445,7 +445,7 @@ class PyMISP(object): s = '\\.\\pipe\\{}'.format(s) return s attributes = map(scrub, self._one_or_more(named_pipe)) - return self._add_named_attributes(event, 'named pipe', value, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'named pipe', attributes, category, to_ids, comment, distribution, proposal) def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): def scrub(s): @@ -453,100 +453,100 @@ class PyMISP(object): s = '\\BaseNamedObjects\\{}'.format(s) return self attributes = map(scrub, self._one_or_more(mutex)) - return self._add_named_attributes(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal) def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'yara', yara, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'yara', yara, category, to_ids, comment, distribution, proposal) # ##### Network attributes ##### def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'ip-dst', ipdst, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'ip-dst', ipdst, category, to_ids, comment, distribution, proposal) def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'ip-src', ipsrc, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'ip-src', ipsrc, category, to_ids, comment, distribution, proposal) def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'hostname', hostname, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'hostname', hostname, category, to_ids, comment, distribution, proposal) def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'domain', domain, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'domain', domain, category, to_ids, comment, distribution, proposal) def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - composed = map(lambda x,y: '%s|%s' % (x, z), zip(domain, ip)) - return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) + composed = map(lambda x: '%s|%s' % (domain, x), ip) + return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - composed = map(lambda x,y: '%s|%s' % (x, z), domain_ips.items()) - return self._add_named_attributes(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) + composed = map(lambda (x,y): '%s|%s' % (x, y), domain_ips.items()) + return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'url', url, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'url', url, category, to_ids, comment, distribution, proposal) def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'user-agent', useragent, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'user-agent', useragent, category, to_ids, comment, distribution, proposal) def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'pattern-in-traffic', pattern, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'pattern-in-traffic', pattern, category, to_ids, comment, distribution, proposal) def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'snort', snort, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'snort', snort, category, to_ids, comment, distribution, proposal) def add_net_other(self, event, netother, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'other', netother, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'other', netother, category, to_ids, comment, distribution, proposal) # ##### Email attributes ##### def add_email_src(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'email-src', email, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'email-src', email, category, to_ids, comment, distribution, proposal) def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'email-dst', email, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'email-dst', email, category, to_ids, comment, distribution, proposal) def add_email_subject(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'email-subject', email, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'email-subject', email, category, to_ids, comment, distribution, proposal) def add_email_attachment(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'email-attachement', email, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'email-attachment', email, category, to_ids, comment, distribution, proposal) # ##### Target attributes ##### def add_target_email(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-email', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-email', target, category, to_ids, comment, distribution, proposal) def add_target_user(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-user', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-user', target, category, to_ids, comment, distribution, proposal) def add_target_machine(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-machine', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-machine', target, category, to_ids, comment, distribution, proposal) def add_target_org(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-orge', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-org', target, category, to_ids, comment, distribution, proposal) def add_target_location(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-location', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-location', target, category, to_ids, comment, distribution, proposal) def add_target_external(self, event, target, category='Targeting data', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'target-external', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'target-external', target, category, to_ids, comment, distribution, proposal) # ##### Attribution attributes ##### def add_threat_actor(self, event, target, category='Attribution', to_ids=True, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'threat-actor', target, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'threat-actor', target, category, to_ids, comment, distribution, proposal) # ##### Internal reference attributes ##### def add_internal_link(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'link', reference, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'link', reference, category, to_ids, comment, distribution, proposal) def add_internal_comment(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'comment', reference, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'comment', reference, category, to_ids, comment, distribution, proposal) def add_internal_text(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'text', reference, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'text', reference, category, to_ids, comment, distribution, proposal) def add_internal_other(self, event, reference, category='Internal reference', to_ids=False, comment=None, distribution=None, proposal=False): - return self._add_named_attributes(event, 'other', reference, category, to_ids, comment, distribution, proposal) + return self.add_named_attribute(event, 'other', reference, category, to_ids, comment, distribution, proposal) # ################################################## # ######### Upload samples through the API ######### diff --git a/tests/test_offline.py b/tests/test_offline.py index b4854af..c4a8a1a 100644 --- a/tests/test_offline.py +++ b/tests/test_offline.py @@ -126,5 +126,77 @@ class TestOffline(unittest.TestCase): json.dumps(misp_event, cls=EncodeUpdate) json.dumps(misp_event, cls=EncodeFull) + def test_addAttributes(self, m): + class MockPyMISP(PyMISP): + def _send_attributes(self, event, attributes, proposal=False): + return len(attributes) + self.initURI(m) + p = MockPyMISP(self.domain, self.key) + evt = p.get(1) + self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940', + sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', + sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b', + filename='foobar.exe')) + self.assertEquals(3, p.add_hashes(evt, md5='68b329da9893e34099c7d8ad5cb9c940', + sha1='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', + sha256='01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b')) + p.av_detection_link(evt, 'https://foocorp.com') + p.add_detection_name(evt, 'WATERMELON') + p.add_filename(evt, 'foobar.exe') + p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar') + p.add_regkey(evt, 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foobar', rvalue='foobar') + regkeys = { + 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\foo': None, + 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bar': 'baz', + 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bae': 0, + } + self.assertEquals(3, p.add_regkeys(evt, regkeys)) + p.add_pattern(evt, '.*foobar.*', in_memory=True) + p.add_pattern(evt, '.*foobar.*', in_file=True) + self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False) + p.add_pipe(evt, 'foo') + p.add_pipe(evt, '\\.\\pipe\\foo') + self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', 'baz'])) + self.assertEquals(3, p.add_pipe(evt, ['foo', 'bar', '\\.\\pipe\\baz'])) + p.add_mutex(evt, 'foo') + self.assertEquals(1, p.add_mutex(evt, '\\BaseNamedObjects\\foo')) + self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', 'baz'])) + self.assertEquals(3, p.add_mutex(evt, ['foo', 'bar', '\\BaseNamedObjects\\baz'])) + p.add_yara(evt, 'rule Foo {}') + self.assertEquals(2, p.add_yara(evt, ['rule Foo {}', 'rule Bar {}'])) + p.add_ipdst(evt, '1.2.3.4') + self.assertEquals(2, p.add_ipdst(evt, ['1.2.3.4', '5.6.7.8'])) + p.add_ipsrc(evt, '1.2.3.4') + self.assertEquals(2, p.add_ipsrc(evt, ['1.2.3.4', '5.6.7.8'])) + p.add_hostname(evt, 'a.foobar.com') + self.assertEquals(2, p.add_hostname(evt, ['a.foobar.com', 'a.foobaz.com'])) + p.add_domain(evt, 'foobar.com') + self.assertEquals(2, p.add_domain(evt, ['foobar.com', 'foobaz.com'])) + p.add_domain_ip(evt, 'foo.com', '1.2.3.4') + self.assertEquals(2, p.add_domain_ip(evt, 'foo.com', ['1.2.3.4', '5.6.7.8'])) + self.assertEquals(2, p.add_domains_ips(evt, {'foo.com': '1.2.3.4', 'bar.com': '4.5.6.7'})) + p.add_url(evt, 'https://example.com') + self.assertEquals(2, p.add_url(evt, ['https://example.com', 'http://foo.com'])) + p.add_useragent(evt, 'Mozilla') + self.assertEquals(2, p.add_useragent(evt, ['Mozilla', 'Godzilla'])) + p.add_traffic_pattern(evt, 'blabla') + p.add_snort(evt, 'blaba') + p.add_net_other(evt, 'blabla') + p.add_email_src(evt, 'foo@bar.com') + p.add_email_dst(evt, 'foo@bar.com') + p.add_email_subject(evt, 'you won the lottery') + p.add_email_attachment(evt, 'foo.doc') + p.add_target_email(evt, 'foo@bar.com') + p.add_target_user(evt, 'foo') + p.add_target_machine(evt, 'foobar') + p.add_target_org(evt, 'foobar') + p.add_target_location(evt, 'foobar') + p.add_target_external(evt, 'foobar') + p.add_threat_actor(evt, 'WATERMELON') + p.add_internal_link(evt, 'foobar') + p.add_internal_comment(evt, 'foobar') + p.add_internal_text(evt, 'foobar') + p.add_internal_other(evt, 'foobar') + if __name__ == '__main__': unittest.main() From 802fc0f20ebbabc655765d40918590e5f170c9f1 Mon Sep 17 00:00:00 2001 From: Nicolas Bareil Date: Thu, 1 Dec 2016 10:59:30 +0100 Subject: [PATCH 4/5] python3 does not like lambda (x,y) syntax --- pymisp/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymisp/api.py b/pymisp/api.py index 55d8852..77acb0a 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -477,7 +477,7 @@ class PyMISP(object): return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - composed = map(lambda (x,y): '%s|%s' % (x, y), domain_ips.items()) + composed = map(lambda x: '%s|%s' % (x[0], x[1]), domain_ips.items()) return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): From 59b7d199701abc61b28c7c084d2a9a337d115fcc Mon Sep 17 00:00:00 2001 From: Nicolas Bareil Date: Thu, 1 Dec 2016 14:26:59 +0100 Subject: [PATCH 5/5] map() is a generator in Python3 --- pymisp/api.py | 8 ++++---- tests/test_offline.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 77acb0a..6c3e391 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -444,7 +444,7 @@ class PyMISP(object): if not s.startswith('\\.\\pipe\\'): s = '\\.\\pipe\\{}'.format(s) return s - attributes = map(scrub, self._one_or_more(named_pipe)) + attributes = list(map(scrub, self._one_or_more(named_pipe))) return self.add_named_attribute(event, 'named pipe', attributes, category, to_ids, comment, distribution, proposal) def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): @@ -452,7 +452,7 @@ class PyMISP(object): if not s.startswith('\\BaseNamedObjects\\'): s = '\\BaseNamedObjects\\{}'.format(s) return self - attributes = map(scrub, self._one_or_more(mutex)) + attributes = list(map(scrub, self._one_or_more(mutex))) return self.add_named_attribute(event, 'mutex', attributes, category, to_ids, comment, distribution, proposal) def add_yara(self, event, yara, category='Payload delivery', to_ids=False, comment=None, distribution=None, proposal=False): @@ -473,11 +473,11 @@ class PyMISP(object): return self.add_named_attribute(event, 'domain', domain, category, to_ids, comment, distribution, proposal) def add_domain_ip(self, event, domain, ip, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - composed = map(lambda x: '%s|%s' % (domain, x), ip) + composed = list(map(lambda x: '%s|%s' % (domain, x), ip)) return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_domains_ips(self, event, domain_ips, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): - composed = map(lambda x: '%s|%s' % (x[0], x[1]), domain_ips.items()) + composed = list(map(lambda x: '%s|%s' % (x[0], x[1]), domain_ips.items())) return self.add_named_attribute(event, 'domain|ip', composed, category, to_ids, comment, distribution, proposal) def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): diff --git a/tests/test_offline.py b/tests/test_offline.py index c4a8a1a..f32eeea 100644 --- a/tests/test_offline.py +++ b/tests/test_offline.py @@ -150,7 +150,7 @@ class TestOffline(unittest.TestCase): 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bar': 'baz', 'HKLM\\Software\\Microsoft\\Outlook\\Addins\\bae': 0, } - self.assertEquals(3, p.add_regkeys(evt, regkeys)) + self.assertEqual(3, p.add_regkeys(evt, regkeys)) p.add_pattern(evt, '.*foobar.*', in_memory=True) p.add_pattern(evt, '.*foobar.*', in_file=True) self.assertRaises(pm.PyMISPError, p.add_pattern, evt, '.*foobar.*', in_memory=False, in_file=False)