Force json if nothing else is supported.

pull/2/merge
Raphaël Vinot 2015-09-21 11:52:26 +02:00
parent f4bf57cc46
commit 35423ebf2e
1 changed files with 30 additions and 30 deletions

View File

@ -132,45 +132,37 @@ class PyMISP(object):
'content-type': 'application/' + out}) 'content-type': 'application/' + out})
return session return session
def __query(self, session, path, query):
if query.get('error') is not None:
return query
url = urljoin(self.root_url, 'events/{}'.format(path.lstrip('/')))
query = {'request': query}
r = session.post(url, data=json.dumps(query))
return r.json()
# ################################################ # ################################################
# ############### Simple REST API ################ # ############### Simple REST API ################
# ################################################ # ################################################
def get_index(self): def get_index(self, force_out=None):
""" """
Return the index. Return the index.
Warning, there's a limit on the number of results Warning, there's a limit on the number of results
""" """
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'events') url = urljoin(self.root_url, 'events')
return session.get(url) return session.get(url)
def get_event(self, event_id): def get_event(self, event_id, force_out=None):
""" """
Get an event Get an event
:param event_id: Event id to get :param event_id: Event id to get
""" """
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
return session.get(url) return session.get(url)
def add_event(self, event): def add_event(self, event, force_out=None):
""" """
Add a new event Add a new event
:param event: Event as JSON object / string or XML to add :param event: Event as JSON object / string or XML to add
""" """
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'events') url = urljoin(self.root_url, 'events')
if self.out_type == 'json': if self.out_type == 'json':
if isinstance(event, basestring): if isinstance(event, basestring):
@ -180,14 +172,14 @@ class PyMISP(object):
else: else:
return session.post(url, data=event) return session.post(url, data=event)
def update_event(self, event_id, event): def update_event(self, event_id, event, force_out=None):
""" """
Update an event Update an event
:param event_id: Event id to update :param event_id: Event id to update
:param event: Event as JSON object / string or XML to add :param event: Event as JSON object / string or XML to add
""" """
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
if self.out_type == 'json': if self.out_type == 'json':
if isinstance(event, basestring): if isinstance(event, basestring):
@ -197,18 +189,18 @@ class PyMISP(object):
else: else:
return session.post(url, data=event) return session.post(url, data=event)
def delete_event(self, event_id): def delete_event(self, event_id, force_out=None):
""" """
Delete an event Delete an event
:param event_id: Event id to delete :param event_id: Event id to delete
""" """
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
return session.delete(url) return session.delete(url)
def delete_attribute(self, attribute_id): def delete_attribute(self, attribute_id, force_out=None):
session = self.__prepare_session() session = self.__prepare_session(force_out)
url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id)) url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id))
return session.delete(url) return session.delete(url)
@ -281,7 +273,7 @@ class PyMISP(object):
def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False): def new_event(self, distribution=None, threat_level_id=None, analysis=None, info=None, date=None, published=False):
data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published) data = self._prepare_full_event(distribution, threat_level_id, analysis, info, date, published)
response = self.add_event(data) response = self.add_event(data, 'json')
return response.json() return response.json()
def publish(self, event): def publish(self, event):
@ -289,7 +281,7 @@ class PyMISP(object):
return {'message': 'Already published'} return {'message': 'Already published'}
event = self._prepare_update(event) event = self._prepare_update(event)
event['Event']['published'] = True event['Event']['published'] = True
response = self.update_event(event['Event']['id'], event) response = self.update_event(event['Event']['id'], event, 'json')
return response.json() return response.json()
# ##### File attributes ##### # ##### File attributes #####
@ -300,7 +292,7 @@ class PyMISP(object):
if a.get('distribution') is None: if a.get('distribution') is None:
a['distribution'] = event['Event']['distribution'] a['distribution'] = event['Event']['distribution']
event['Event']['Attribute'] = attributes event['Event']['Attribute'] = attributes
response = self.update_event(event['Event']['id'], event) response = self.update_event(event['Event']['id'], event, 'json')
return response.json() return response.json()
def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None): def add_hashes(self, event, category='Artifacts dropped', filename=None, md5=None, sha1=None, sha256=None, comment=None, to_ids=True, distribution=None):
@ -465,7 +457,7 @@ class PyMISP(object):
return self._upload_sample(to_post) return self._upload_sample(to_post)
def _upload_sample(self, to_post): def _upload_sample(self, to_post):
session = self.__prepare_session() session = self.__prepare_session('json')
url = urljoin(self.root_url, 'events/upload_sample') url = urljoin(self.root_url, 'events/upload_sample')
return session.post(url, data=json.dumps(to_post)) return session.post(url, data=json.dumps(to_post))
@ -473,9 +465,17 @@ class PyMISP(object):
# ######## REST Search ######### # ######## REST Search #########
# ############################## # ##############################
def __query(self, session, path, query):
if query.get('error') is not None:
return query
url = urljoin(self.root_url, 'events/{}'.format(path.lstrip('/')))
query = {'request': query}
r = session.post(url, data=json.dumps(query))
return r.json()
def search_all(self, value): def search_all(self, value):
query = {'value': value, 'searchall': 1} query = {'value': value, 'searchall': 1}
session = self.__prepare_session() session = self.__prepare_session('json')
return self.__query(session, 'restSearch/download', query) return self.__query(session, 'restSearch/download', query)
def __prepare_rest_search(self, values, not_values): def __prepare_rest_search(self, values, not_values):
@ -546,7 +546,7 @@ class PyMISP(object):
if last is not None: if last is not None:
query['last'] = last query['last'] = last
session = self.__prepare_session() session = self.__prepare_session('json')
return self.__query(session, 'restSearch/download', query) return self.__query(session, 'restSearch/download', query)
def get_attachement(self, event_id): def get_attachement(self, event_id):
@ -557,12 +557,12 @@ class PyMISP(object):
be fetched be fetched
""" """
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(event_id)) attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(event_id))
session = self.__prepare_session() session = self.__prepare_session('json')
return session.get(attach) return session.get(attach)
def get_yara(self, event_id): def get_yara(self, event_id):
to_post = {'request': {'eventid': event_id, 'type': 'yara'}} to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
session = self.__prepare_session() session = self.__prepare_session('json')
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post)) response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
result = response.json() result = response.json()
if response.status_code != 200: if response.status_code != 200:
@ -574,7 +574,7 @@ class PyMISP(object):
def download_samples(self, sample_hash=None, event_id=None, all_samples=False): def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}} to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
session = self.__prepare_session() session = self.__prepare_session('json')
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post)) response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
result = response.json() result = response.json()
if response.status_code != 200: if response.status_code != 200:
@ -651,7 +651,7 @@ class PyMISP(object):
""" """
Returns the version of the instance. Returns the version of the instance.
""" """
session = self.__prepare_session() session = self.__prepare_session('json')
url = urljoin(self.root_url, 'servers/getVersion') url = urljoin(self.root_url, 'servers/getVersion')
return session.get(url).json() return session.get(url).json()