From 483f7839c0ec3cd92cfe9794112c79ef2495df1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 11 Apr 2014 18:45:52 +0200 Subject: [PATCH] make the API a class --- pymisp/api.py | 273 +++++++++++++++++++++++++------------------------- 1 file changed, 135 insertions(+), 138 deletions(-) diff --git a/pymisp/api.py b/pymisp/api.py index 8491474..63c3d97 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -6,147 +6,144 @@ import requests -from apikey import key +class PyMISP(object): -URL=None -URL_TMPL = None -URL_XML_DOWNLOAD = None -URL_XML_DOWNLOAD_TMPL = None -OUTPUT_TYPE = 'json' - -def init_server(url, key): - global URL - global URL_TMPL - global URL_XML_DOWNLOAD - global URL_XML_DOWNLOAD_TMPL - URL = 'https://misp.circl.lu/events' - URL_TMPL = URL + '/{}' - URL_XML_DOWNLOAD = URL + '/xml/download' - URL_XML_DOWNLOAD_TMPL = URL_XML_DOWNLOAD + '/{}' + def __init__(self, url, key, out_type = 'json'): + self.url = url + self.key = key + self.out_type = out_type + self.rest = self.url + '/{}' -def __prepare_session(output_type=OUTPUT_TYPE): - """ - Prepare the headers of the session - """ - session = requests.Session() - session.headers.update({'Authorization': key, - 'Accept': 'application/' + output_type}) - return session - -################ REST API ################ - -# supports JSON and XML output - -def get_index(): - """ - Return the index. - - Warning, there's a limit on the number of results - """ - session = __prepare_session() - return session.get(URL, verify=False) - -def get_event(event_id): - """ - Get an event - """ - session = __prepare_session() - return session.get(URL_TMPL.format(event_id), verify=False) - -def add_event(event): - """ - Add a new event - """ - session = __prepare_session() - return session.post(URL, data=event, verify=False) - -def update_event(event_id, event): - """ - Update an event - """ - session = __prepare_session() - return session.post(URL_TMPL.format(event_id), data=event, verify=False) - -def delete_event(event_id): - """ - Delete an event - """ - session = __prepare_session() - return session.delete(URL_TMPL.format(event_id), verify=False) - -########################################## - -############### Export ############### - -# XML and Json - - -def download_all(): - """ - Download all event from the instance - """ - session = __prepare_session() - return session.get(URL_XML_DOWNLOAD, verify=False) - -def download(event_id): - """ - Download one event in XML - """ - session = __prepare_session() - return session.get(URL_XML_DOWNLOAD_TMPL.format(event_id), verify=False) - -######### REST Search ######### - -def __prepare_rest_search(values, not_values): - """ - Prepare a search - """ - to_return = '' - if values is not None: - if type(values) != type([]): - to_return += values + def __prepare_session(self, force_out=None): + """ + Prepare the headers of the session + """ + if force_out is not None: + out = force_out else: - to_return += '&&'.join(values) - if not_values is not None: - if len(to_return) > 0 : - to_return += '&&!' + out = self.out_type + session = requests.Session() + session.headers.update({'Authorization': self.key, + 'Accept': 'application/' + out}) + return session + + + ################ REST API ################ + + def get_index(self): + """ + Return the index. + + Warning, there's a limit on the number of results + """ + session = self.__prepare_session() + return session.get(self.url, verify=False) + + def get_event(self, event_id): + """ + Get an event + """ + session = self.__prepare_session() + return session.get(self.rest.format(event_id), verify=False) + + def add_event(self, event): + """ + Add a new event + """ + session = self.__prepare_session() + return session.post(self.url, data=event, verify=False) + + def update_event(self, event_id, event): + """ + Update an event + """ + session = self.__prepare_session() + return session.post(self.rest.format(event_id), data=event, + verify=False) + + def delete_event(self, event_id): + """ + Delete an event + """ + session = self.__prepare_session() + return session.delete(self.rest.format(event_id), verify=False) + + ######### REST Search ######### + + def __prepare_rest_search(self, values, not_values): + """ + Prepare a search + """ + to_return = '' + if values is not None: + if type(values) != type([]): + to_return += values + else: + to_return += '&&'.join(values) + if not_values is not None: + if len(to_return) > 0 : + to_return += '&&!' + else: + to_return += '!' + if type(values) != type([]): + to_return += not_values + else: + to_return += '&&!'.join(not_values) + return to_return + + def search(self, values=None, not_values=None, type_attribute=None, + category=None, org=None, tags=None, not_tags=None): + """ + Search via the Rest API + """ + search = self.url + '/events/restSearch/download/{}/{}/{}/{}/{}' + val = self.__prepare_rest_search(values, not_values).replace('/', '|') + tag = self.__prepare_rest_search(tags, not_tags).replace(':', ';') + if len(val) == 0: + val = 'null' + if len(tag) == 0: + tag = 'null' + if type_attribute is None: + type_attribute = 'null' + if category is None: + category = 'null' + if org is None: + org = 'null' + + session = self.__prepare_session() + return session.get(search.format(val, type_attribute, + category, org, tag), verify=False) + + def get_attachement(self, event_id): + """ + Get attachement of an event (not sample) + """ + attach = self.url + '/attributes/downloadAttachment/download/{}' + session = self.__prepare_session() + return session.get(attach.format(event_id), verify=False) + + + ############### Export ############### + + def download_all(self): + """ + Download all event from the instance + """ + xml = self.url + '/xml/download' + session = self.__prepare_session('xml') + return session.get(xml, verify=False) + + def download(self, event_id, with_attachement=False): + """ + Download one event in XML + """ + template = self.url + '/events/xml/download/{}/{}' + if with_attachement: + a = 'true' else: - to_return += '!' - if type(values) != type([]): - to_return += not_values - else: - to_return += '&&!'.join(not_values) - return to_return + a = 'false' + session = self.__prepare_session('xml') + return session.get(template.format(event_id, a), verify=False) -URL_SEARCH_TMPL = 'https://misp.circl.lu/events/restSearch/download/{}/{}/{}/{}/{}' - - -# NOTE: searching by tags works, not the other options -def search(values=None, not_values=None, type_attribute=None, - category=None, org=None, tags=None, not_tags=None): - v = __prepare_rest_search(values, not_values).replace('/', '|') - t = __prepare_rest_search(tags, not_tags).replace(':', ';') - if len(v) == 0: - v = 'null' - if len(t) == 0: - t = 'null' - if type_attribute is None: - type_attribute = 'null' - if category is None: - category = 'null' - if org is None: - org = 'null' - - session = __prepare_session() - return session.get(URL_SEARCH_TMPL.format(v, type_attribute, - category, org, t), verify=False) - -########################################## - -if __name__ == '__main__': - r = search(tags='OSINT') - print unicode(r.json()) - - #r = get_index() - #print r.text + ##########################################