From 588a1abc00dd74ddccf4a1c702b1f1e65cea0495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 30 Oct 2015 17:23:25 +0100 Subject: [PATCH] Add basic support for the proposal API --- pymisp/api.py | 176 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 64 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 76fbb62..73866bb 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -107,10 +107,10 @@ class PyMISP(object): self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1', 'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url', 'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort', - 'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', - 'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', - 'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment', - 'yara', 'target-user', 'target-email', 'target-machine', 'target-org', + 'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe', + 'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment', + 'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment', + 'yara', 'target-user', 'target-email', 'target-machine', 'target-org', 'target-location', 'target-external', 'other'] try: @@ -313,16 +313,19 @@ class PyMISP(object): # ##### File attributes ##### - def _send_attributes(self, event, attributes): - event = self._prepare_update(event) - for a in attributes: - if a.get('distribution') is None: - a['distribution'] = event['Event']['distribution'] - event['Event']['Attribute'] = attributes - response = self.update_event(event['Event']['id'], event, 'json') + def _send_attributes(self, event, attributes, proposal=False): + if proposal: + response = self.proposal_add(event['Event']['id'], attributes) + else: + event = self._prepare_update(event) + for a in attributes: + if a.get('distribution') is None: + a['distribution'] = event['Event']['distribution'] + event['Event']['Attribute'] = attributes + response = self.update_event(event['Event']['id'], event, 'json') return self._check_response(response) - def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None): + def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None, proposal=False): categories = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis'] if category not in categories: raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories)))) @@ -343,9 +346,9 @@ class PyMISP(object): attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256), to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): type_value = '{}' value = '{}' if rvalue: @@ -357,125 +360,125 @@ class PyMISP(object): attributes = [] attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if in_file: attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution)) if in_memory: attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if not named_pipe.startswith('\\.\\pipe\\'): named_pipe = '\\.\\pipe\\{}'.format(named_pipe) attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None): + def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] if not mutex.startswith('\\BaseNamedObjects\\'): mutex = '\\BaseNamedObjects\\{}'.format(mutex) attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) # ##### Network attributes ##### - def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None): + def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) # ##### Email attributes ##### - - def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None): + + def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None): + def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False): categories = ['Payload delivery', 'Network activity'] if category not in categories: raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories)))) attributes = [] attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None): + def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) + return self._send_attributes(event, attributes, proposal) - def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None): + def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - + return self._send_attributes(event, attributes, proposal) + # ##### Target attributes ##### - - def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None): + + def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - - def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None): + return self._send_attributes(event, attributes, proposal) + + def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - - def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None): + return self._send_attributes(event, attributes, proposal) + + def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - - def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None): + return self._send_attributes(event, attributes, proposal) + + def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - - def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None): + return self._send_attributes(event, attributes, proposal) + + def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - - def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None): + return self._send_attributes(event, attributes, proposal) + + def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False): attributes = [] attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution)) - return self._send_attributes(event, attributes) - + return self._send_attributes(event, attributes, proposal) + # ################################################## # ######### Upload samples through the API ######### # ################################################## @@ -546,6 +549,51 @@ class PyMISP(object): response = session.post(url, data=json.dumps(to_post)) return self._check_response(response) + # ############################ + # ######## Proposals ######### + # ############################ + + def __query_proposal(self, session, path, id, attribute=None): + path = path.strip('/') + url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id)) + query = None + if path in ['add', 'edit']: + query = {'request': {'ShadowAttribute': attribute}} + if path == 'view': + response = session.get(url) + else: + if query is not None: + response = session.post(url, data=json.dumps(query)) + else: + response = session.post(url) + return self._check_response(response) + + def proposal_view(self, event_id=None, proposal_id=None): + session = self.__prepare_session('json') + if proposal_id is not None and event_id is not None: + return {'error': 'You can only view an event ID or a proposal ID'} + if event_id is not None: + id = event_id + else: + id = proposal_id + return self.__query_proposal(session, 'view', id) + + def proposal_add(self, event_id, attribute): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'add', event_id, attribute) + + def proposal_edit(self, attribute_id, attribute): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'edit', attribute_id, attribute) + + def proposal_accept(self, proposal_id): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'accept', proposal_id) + + def proposal_discard(self, proposal_id): + session = self.__prepare_session('json') + return self.__query_proposal(session, 'discard', proposal_id) + # ############################## # ######## REST Search ######### # ##############################