mirror of https://github.com/MISP/PyMISP
Add basic support for the proposal API
parent
305d1949eb
commit
588a1abc00
176
pymisp/api.py
176
pymisp/api.py
|
@ -107,10 +107,10 @@ class PyMISP(object):
|
||||||
self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1',
|
self.types = ['md5', 'sha1', 'sha256', 'filename', 'filename|md5', 'filename|sha1',
|
||||||
'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url',
|
'filename|sha256', 'ip-src', 'ip-dst', 'hostname', 'domain', 'url',
|
||||||
'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort',
|
'user-agent', 'http-method', 'regkey', 'regkey|value', 'AS', 'snort',
|
||||||
'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe',
|
'pattern-in-file', 'pattern-in-traffic', 'pattern-in-memory', 'named pipe',
|
||||||
'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment',
|
'mutex', 'vulnerability', 'attachment', 'malware-sample', 'link', 'comment',
|
||||||
'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment',
|
'text', 'email-src', 'email-dst', 'email-subject', 'email-attachment',
|
||||||
'yara', 'target-user', 'target-email', 'target-machine', 'target-org',
|
'yara', 'target-user', 'target-email', 'target-machine', 'target-org',
|
||||||
'target-location', 'target-external', 'other']
|
'target-location', 'target-external', 'other']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -313,16 +313,19 @@ class PyMISP(object):
|
||||||
|
|
||||||
# ##### File attributes #####
|
# ##### File attributes #####
|
||||||
|
|
||||||
def _send_attributes(self, event, attributes):
|
def _send_attributes(self, event, attributes, proposal=False):
|
||||||
event = self._prepare_update(event)
|
if proposal:
|
||||||
for a in attributes:
|
response = self.proposal_add(event['Event']['id'], attributes)
|
||||||
if a.get('distribution') is None:
|
else:
|
||||||
a['distribution'] = event['Event']['distribution']
|
event = self._prepare_update(event)
|
||||||
event['Event']['Attribute'] = attributes
|
for a in attributes:
|
||||||
response = self.update_event(event['Event']['id'], event, 'json')
|
if a.get('distribution') is None:
|
||||||
|
a['distribution'] = event['Event']['distribution']
|
||||||
|
event['Event']['Attribute'] = attributes
|
||||||
|
response = self.update_event(event['Event']['id'], event, 'json')
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None):
|
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None, proposal=False):
|
||||||
categories = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis']
|
categories = ['Payload delivery', 'Artifacts dropped', 'Payload installation', 'External analysis']
|
||||||
if category not in categories:
|
if category not in categories:
|
||||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
|
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
|
||||||
|
@ -343,9 +346,9 @@ class PyMISP(object):
|
||||||
attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256),
|
attributes.append(self._prepare_full_attribute(category, type_value.format('sha256'), value.format(sha256),
|
||||||
to_ids, comment, distribution))
|
to_ids, comment, distribution))
|
||||||
|
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
|
def add_regkey(self, event, regkey, rvalue=None, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
type_value = '{}'
|
type_value = '{}'
|
||||||
value = '{}'
|
value = '{}'
|
||||||
if rvalue:
|
if rvalue:
|
||||||
|
@ -357,125 +360,125 @@ class PyMISP(object):
|
||||||
|
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, type_value, value, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
|
def add_pattern(self, event, pattern, in_file=True, in_memory=False, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
if in_file:
|
if in_file:
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'pattern-in-file', pattern, to_ids, comment, distribution))
|
||||||
if in_memory:
|
if in_memory:
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'pattern-in-memory', pattern, to_ids, comment, distribution))
|
||||||
|
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
|
def add_pipe(self, event, named_pipe, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
if not named_pipe.startswith('\\.\\pipe\\'):
|
if not named_pipe.startswith('\\.\\pipe\\'):
|
||||||
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
|
named_pipe = '\\.\\pipe\\{}'.format(named_pipe)
|
||||||
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'named pipe', named_pipe, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None):
|
def add_mutex(self, event, mutex, category='Artifacts dropped', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
if not mutex.startswith('\\BaseNamedObjects\\'):
|
if not mutex.startswith('\\BaseNamedObjects\\'):
|
||||||
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
|
mutex = '\\BaseNamedObjects\\{}'.format(mutex)
|
||||||
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'mutex', mutex, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
# ##### Network attributes #####
|
# ##### Network attributes #####
|
||||||
|
|
||||||
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_ipdst(self, event, ipdst, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'ip-dst', ipdst, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_hostname(self, event, hostname, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'hostname', hostname, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_domain(self, event, domain, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'domain', domain, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_url(self, event, url, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'url', url, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_useragent(self, event, useragent, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'user-agent', useragent, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_traffic_pattern(self, event, pattern, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'pattern-in-traffic', pattern, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None):
|
def add_snort(self, event, snort, category='Network activity', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'snort', snort, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
# ##### Email attributes #####
|
# ##### Email attributes #####
|
||||||
|
|
||||||
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None):
|
def add_email_src(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-src', email, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None):
|
def add_email_dst(self, event, email, category='Payload delivery', to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
categories = ['Payload delivery', 'Network activity']
|
categories = ['Payload delivery', 'Network activity']
|
||||||
if category not in categories:
|
if category not in categories:
|
||||||
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
|
raise NewAttributeError('{} is invalid, category has to be in {}'.format(category, (', '.join(categories))))
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute(category, 'email-dst', email, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None):
|
def add_email_subject(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-subject', email, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None):
|
def add_email_attachment(self, event, email, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Payload delivery', 'email-attachment', email, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
# ##### Target attributes #####
|
# ##### Target attributes #####
|
||||||
|
|
||||||
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_email(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-email', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_user(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-user', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_machine(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-machine', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_org(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-org', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_location(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-location', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None):
|
def add_target_external(self, event, target, to_ids=True, comment=None, distribution=None, proposal=False):
|
||||||
attributes = []
|
attributes = []
|
||||||
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
|
attributes.append(self._prepare_full_attribute('Targeting data', 'target-external', target, to_ids, comment, distribution))
|
||||||
return self._send_attributes(event, attributes)
|
return self._send_attributes(event, attributes, proposal)
|
||||||
|
|
||||||
# ##################################################
|
# ##################################################
|
||||||
# ######### Upload samples through the API #########
|
# ######### Upload samples through the API #########
|
||||||
# ##################################################
|
# ##################################################
|
||||||
|
@ -546,6 +549,51 @@ class PyMISP(object):
|
||||||
response = session.post(url, data=json.dumps(to_post))
|
response = session.post(url, data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
|
# ############################
|
||||||
|
# ######## Proposals #########
|
||||||
|
# ############################
|
||||||
|
|
||||||
|
def __query_proposal(self, session, path, id, attribute=None):
|
||||||
|
path = path.strip('/')
|
||||||
|
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
||||||
|
query = None
|
||||||
|
if path in ['add', 'edit']:
|
||||||
|
query = {'request': {'ShadowAttribute': attribute}}
|
||||||
|
if path == 'view':
|
||||||
|
response = session.get(url)
|
||||||
|
else:
|
||||||
|
if query is not None:
|
||||||
|
response = session.post(url, data=json.dumps(query))
|
||||||
|
else:
|
||||||
|
response = session.post(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def proposal_view(self, event_id=None, proposal_id=None):
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
if proposal_id is not None and event_id is not None:
|
||||||
|
return {'error': 'You can only view an event ID or a proposal ID'}
|
||||||
|
if event_id is not None:
|
||||||
|
id = event_id
|
||||||
|
else:
|
||||||
|
id = proposal_id
|
||||||
|
return self.__query_proposal(session, 'view', id)
|
||||||
|
|
||||||
|
def proposal_add(self, event_id, attribute):
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
return self.__query_proposal(session, 'add', event_id, attribute)
|
||||||
|
|
||||||
|
def proposal_edit(self, attribute_id, attribute):
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
return self.__query_proposal(session, 'edit', attribute_id, attribute)
|
||||||
|
|
||||||
|
def proposal_accept(self, proposal_id):
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
return self.__query_proposal(session, 'accept', proposal_id)
|
||||||
|
|
||||||
|
def proposal_discard(self, proposal_id):
|
||||||
|
session = self.__prepare_session('json')
|
||||||
|
return self.__query_proposal(session, 'discard', proposal_id)
|
||||||
|
|
||||||
# ##############################
|
# ##############################
|
||||||
# ######## REST Search #########
|
# ######## REST Search #########
|
||||||
# ##############################
|
# ##############################
|
||||||
|
|
Loading…
Reference in New Issue