diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 5654681..161427d 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -1,3 +1,3 @@ -__version__ = '2.1' +__version__ = '2.1.1' from .api import PyMISP, PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey diff --git a/pymisp/api.py b/pymisp/api.py index 2e1dacc..f739e1e 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -107,8 +107,11 @@ class PyMISP(object): self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1', 'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url', 'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort', - 'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', 'mutex', - 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', 'text', 'other'] + 'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', + 'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', + 'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment', + 'yara', 'target-user', 'target-email', 'target-machine', 'target-org', + 'target-location', 'target-external', 'other'] try: # Make sure the MISP instance is working and the URL is valid @@ -310,16 +313,19 @@ class PyMISP(object): # ##### File attributes ##### - def _send_attributes(self, event, attributes): - event = self._prepare_update(event) - for a in attributes: - if a.get('distribution') is None: - a['distribution'] = event['Event']['distribution'] - event['Event']['Attribute'] = attributes - response = self.update_event(event['Event']['id'], event, 'json') + def _send_attributes(self, event, attributes, proposal=False): + if proposal: + response = self.proposal_add(event['Event']['id'], attributes) + else: + event = self._prepare_update(event) + for a in attributes: + if a.get('distribution') is None: + a['distribution'] = event['Event']['distribution'] + event['Event']['Attribute'] = attributes + response = self.update_event(event['Event']['id'], event, 'json') return self._check_response(response) - def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None): + def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None, proposal=False): categories = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis'] if category not in categories: raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories)))) @@ -340,9 +346,9 @@ class PyMISP(object): attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256), to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): type_value = '{}' value = '{}' if rvalue: @@ -354,67 +360,129 @@ class PyMISP(object): attributes = [] attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if in_file: attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution)) if in_memory: attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if not named_pipe.startswith('\\.\\pipe\\'): named_pipe = '\\.\\pipe\\{}'.format(named_pipe) attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if not mutex.startswith('\\BaseNamedObjects\\'): mutex = '\\BaseNamedObjects\\{}'.format(mutex) attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) # ##### Network attributes ##### - def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_ipsrc(self, event, ipsrc, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute(category, 'ip-src', ipsrc, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) + + # ##### Email attributes ##### + + def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): + categories = ['Payload delivery', 'Network activity'] + if category not in categories: + raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories)))) + attributes = [] + attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + # ##### Target attributes ##### + + def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) + + def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): + attributes = [] + attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution)) + return self._send_attributes(event, attributes, proposal) # ################################################## # ######### Upload samples through the API ######### @@ -486,6 +554,51 @@ class PyMISP(object): response = session.post(url, data=json.dumps(to_post)) return self._check_response(response) + # ############################ + # ######## Proposals ######### + # ############################ + + def __query_proposal(self, session, path, id, attribute=None): + path = path.strip('/') + url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id)) + query = None + if path in ['add', 'edit']: + query = {'request': {'ShadowAttribute': attribute}} + if path == 'view': + response = session.get(url) + else: + if query is not None: + response = session.post(url, data=json.dumps(query)) + else: + response = session.post(url) + return self._check_response(response) + + def proposal_view(self, event_id=None, proposal_id=None): + session = self.__prepare_session('json') + if proposal_id is not None and event_id is not None: + return {'error': 'You can only view an event ID or a proposal ID'} + if event_id is not None: + id = event_id + else: + id = proposal_id + return self.__query_proposal(session, 'view', id) + + def proposal_add(self, event_id, attribute): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'add', event_id, attribute) + + def proposal_edit(self, attribute_id, attribute): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'edit', attribute_id, attribute) + + def proposal_accept(self, proposal_id): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'accept', proposal_id) + + def proposal_discard(self, proposal_id): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'discard', proposal_id) + # ############################## # ######## REST Search ######### # ############################## diff --git a/tests/test.py b/tests/test.py index a0653c2..de5a4da 100755 --- a/tests/test.py +++ b/tests/test.py @@ -33,11 +33,12 @@ class TestBasic(unittest.TestCase): def new_event(self): event = self.misp.new_event(0, 1, 0, "This is a test") event_id = self._clean_event(event) - self.assertEqual(event, {u'Event': {u'info': u'This is a test', u'locked': False, - u'attribute_count': u'0', u'analysis': u'0', - u'ShadowAttribute': [], u'published': False, - u'distribution': u'0', u'Attribute': [], u'proposal_email_lock': False, - u'threat_level_id': u'1'}}) + to_check = {u'Event': {u'info': u'This is a test', u'locked': False, + u'attribute_count': u'0', u'analysis': u'0', + u'ShadowAttribute': [], u'published': False, + u'distribution': u'0', u'Attribute': [], u'proposal_email_lock': False, + u'threat_level_id': u'1'}}, + self.assertEqual(event, to_check, 'Failed at creating a new Event') return int(event_id) def add_hashes(self, eventid): @@ -59,7 +60,7 @@ class TestBasic(unittest.TestCase): u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9', u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}], u'proposal_email_lock': False, u'threat_level_id': u'1'}} - self.assertEqual(event, to_check) + self.assertEqual(event, to_check, 'Failed at adding hashes') def publish(self, eventid): r = self.misp.get_event(eventid) @@ -80,19 +81,62 @@ class TestBasic(unittest.TestCase): u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9', u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}], u'proposal_email_lock': False, u'threat_level_id': u'1'}} - self.assertEqual(event, to_check) + self.assertEqual(event, to_check, 'Failed at publishing event') def delete(self, eventid): event = self.misp.delete_event(eventid) - event.json() + print event.json() - def test_all(self): + def delete_attr(self, attrid): + event = self.misp.delete_attribute(attrid) + print event.json() + + def get(self, eventid): + event = self.misp.get_event(eventid) + print event.json() + + def add(self): + event = {u'Event': {u'info': u'This is a test', u'locked': False, + u'attribute_count': u'3', u'analysis': u'0', + u'ShadowAttribute': [], u'published': False, u'distribution': u'0', + u'Attribute': [ + {u'category': u'Payload installation', u'comment': u'Fanny modules', + u'to_ids': False, u'value': u'dll_installer.dll|0a209ac0de4ac033f31d6ba9191a8f7a', + u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|md5'}, + {u'category': u'Payload installation', u'comment': u'Fanny modules', + u'to_ids': False, u'value': u'dll_installer.dll|1f0ae54ac3f10d533013f74f48849de4e65817a7', + u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha1'}, + {u'category': u'Payload installation', u'comment': u'Fanny modules', + u'to_ids': False, u'value': u'dll_installer.dll|003315b0aea2fcb9f77d29223dd8947d0e6792b3a0227e054be8eb2a11f443d9', + u'ShadowAttribute': [], u'distribution': u'2', u'type': u'filename|sha256'}], + u'proposal_email_lock': False, u'threat_level_id': u'1'}} + event = self.misp.add_event(event) + print event.json() + + def test_create_event(self): eventid = self.new_event() time.sleep(1) - self.add_hashes(eventid) - time.sleep(1) - self.publish(eventid) self.delete(eventid) + def test_get_event(self): + eventid = self.new_event() + time.sleep(1) + self.get(eventid) + time.sleep(1) + self.delete(eventid) + + def test_add_event(self): + self.add() + time.sleep(1) + self.delete(1) + + def test_del_attr(self): + eventid = self.new_event() + time.sleep(1) + self.delete_attr(1) + time.sleep(1) + self.delete(eventid) + + if __name__ == '__main__': unittest.main()