mirror of https://github.com/MISP/PyMISP
parent
2a0d6566ee
commit
ab09c0a1dc
|
@ -7,7 +7,7 @@ import argparse
|
||||||
|
|
||||||
|
|
||||||
def init(url, key):
|
def init(url, key):
|
||||||
return PyMISP(url, key, True, 'json')
|
return PyMISP(url, key, True)
|
||||||
|
|
||||||
|
|
||||||
def fetch(m, all_events, event):
|
def fetch(m, all_events, event):
|
||||||
|
|
|
@ -126,7 +126,7 @@ class PyMISP(object):
|
||||||
self.types = self.describe_types['result']['types']
|
self.types = self.describe_types['result']['types']
|
||||||
self.category_type_mapping = self.describe_types['result']['category_type_mappings']
|
self.category_type_mapping = self.describe_types['result']['category_type_mappings']
|
||||||
|
|
||||||
def __prepare_session(self):
|
def __prepare_session(self, output='json'):
|
||||||
"""
|
"""
|
||||||
Prepare the headers of the session
|
Prepare the headers of the session
|
||||||
"""
|
"""
|
||||||
|
@ -137,8 +137,8 @@ class PyMISP(object):
|
||||||
session.proxies = self.proxies
|
session.proxies = self.proxies
|
||||||
session.headers.update(
|
session.headers.update(
|
||||||
{'Authorization': self.key,
|
{'Authorization': self.key,
|
||||||
'Accept': 'application/json',
|
'Accept': 'application/{}'.format(output),
|
||||||
'content-type': 'application/json'})
|
'content-type': 'application/{}'.format(output)})
|
||||||
return session
|
return session
|
||||||
|
|
||||||
def flatten_error_messages(self, response):
|
def flatten_error_messages(self, response):
|
||||||
|
@ -378,13 +378,13 @@ class PyMISP(object):
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_tag(self, event, tag):
|
def add_tag(self, event, tag):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
||||||
response = session.post(urljoin(self.root_url, 'events/addTag'), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, 'events/addTag'), data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def remove_tag(self, event, tag):
|
def remove_tag(self, event, tag):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
||||||
response = session.post(urljoin(self.root_url, 'events/removeTag'), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, 'events/removeTag'), data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -694,7 +694,7 @@ class PyMISP(object):
|
||||||
return self._upload_sample(to_post)
|
return self._upload_sample(to_post)
|
||||||
|
|
||||||
def _upload_sample(self, to_post):
|
def _upload_sample(self, to_post):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'events/upload_sample')
|
url = urljoin(self.root_url, 'events/upload_sample')
|
||||||
response = session.post(url, data=json.dumps(to_post))
|
response = session.post(url, data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -719,7 +719,7 @@ class PyMISP(object):
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def proposal_view(self, event_id=None, proposal_id=None):
|
def proposal_view(self, event_id=None, proposal_id=None):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
if proposal_id is not None and event_id is not None:
|
if proposal_id is not None and event_id is not None:
|
||||||
return {'error': 'You can only view an event ID or a proposal ID'}
|
return {'error': 'You can only view an event ID or a proposal ID'}
|
||||||
if event_id is not None:
|
if event_id is not None:
|
||||||
|
@ -729,19 +729,19 @@ class PyMISP(object):
|
||||||
return self.__query_proposal(session, 'view', id)
|
return self.__query_proposal(session, 'view', id)
|
||||||
|
|
||||||
def proposal_add(self, event_id, attribute):
|
def proposal_add(self, event_id, attribute):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query_proposal(session, 'add', event_id, attribute)
|
return self.__query_proposal(session, 'add', event_id, attribute)
|
||||||
|
|
||||||
def proposal_edit(self, attribute_id, attribute):
|
def proposal_edit(self, attribute_id, attribute):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query_proposal(session, 'edit', attribute_id, attribute)
|
return self.__query_proposal(session, 'edit', attribute_id, attribute)
|
||||||
|
|
||||||
def proposal_accept(self, proposal_id):
|
def proposal_accept(self, proposal_id):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query_proposal(session, 'accept', proposal_id)
|
return self.__query_proposal(session, 'accept', proposal_id)
|
||||||
|
|
||||||
def proposal_discard(self, proposal_id):
|
def proposal_discard(self, proposal_id):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query_proposal(session, 'discard', proposal_id)
|
return self.__query_proposal(session, 'discard', proposal_id)
|
||||||
|
|
||||||
# ##############################
|
# ##############################
|
||||||
|
@ -798,14 +798,14 @@ class PyMISP(object):
|
||||||
buildup_url += '/search{}:{}'.format(rule, joined)
|
buildup_url += '/search{}:{}'.format(rule, joined)
|
||||||
else:
|
else:
|
||||||
buildup_url += '/search{}:{}'.format(rule, allowed[rule])
|
buildup_url += '/search{}:{}'.format(rule, allowed[rule])
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, buildup_url)
|
url = urljoin(self.root_url, buildup_url)
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def search_all(self, value):
|
def search_all(self, value):
|
||||||
query = {'value': value, 'searchall': 1}
|
query = {'value': value, 'searchall': 1}
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query(session, 'restSearch/download', query)
|
return self.__query(session, 'restSearch/download', query)
|
||||||
|
|
||||||
def __prepare_rest_search(self, values, not_values):
|
def __prepare_rest_search(self, values, not_values):
|
||||||
|
@ -876,7 +876,7 @@ class PyMISP(object):
|
||||||
if last is not None:
|
if last is not None:
|
||||||
query['last'] = last
|
query['last'] = last
|
||||||
|
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
return self.__query(session, 'restSearch/download', query, controller)
|
return self.__query(session, 'restSearch/download', query, controller)
|
||||||
|
|
||||||
def get_attachement(self, event_id):
|
def get_attachement(self, event_id):
|
||||||
|
@ -887,13 +887,13 @@ class PyMISP(object):
|
||||||
be fetched
|
be fetched
|
||||||
"""
|
"""
|
||||||
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(event_id))
|
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(event_id))
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
response = session.get(attach)
|
response = session.get(attach)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_yara(self, event_id):
|
def get_yara(self, event_id):
|
||||||
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
|
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
|
||||||
result = self._check_response(response)
|
result = self._check_response(response)
|
||||||
if result.get('error') is not None:
|
if result.get('error') is not None:
|
||||||
|
@ -905,7 +905,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
|
def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
|
||||||
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
|
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
|
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
|
||||||
result = self._check_response(response)
|
result = self._check_response(response)
|
||||||
if result.get('error') is not None:
|
if result.get('error') is not None:
|
||||||
|
@ -964,7 +964,7 @@ class PyMISP(object):
|
||||||
# ########## Tags ##########
|
# ########## Tags ##########
|
||||||
|
|
||||||
def get_all_tags(self, quiet=False):
|
def get_all_tags(self, quiet=False):
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'tags')
|
url = urljoin(self.root_url, 'tags')
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
r = self._check_response(response)
|
r = self._check_response(response)
|
||||||
|
@ -978,7 +978,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def new_tag(self, name=None, colour="#00ace6", exportable=False):
|
def new_tag(self, name=None, colour="#00ace6", exportable=False):
|
||||||
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
|
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'tags/add')
|
url = urljoin(self.root_url, 'tags/add')
|
||||||
response = session.post(url, data=json.dumps(to_post))
|
response = session.post(url, data=json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -1006,7 +1006,7 @@ class PyMISP(object):
|
||||||
"""
|
"""
|
||||||
Returns the version of the instance.
|
Returns the version of the instance.
|
||||||
"""
|
"""
|
||||||
session = self.__prepare_session('json')
|
session = self.__prepare_session()
|
||||||
url = urljoin(self.root_url, 'servers/getVersion')
|
url = urljoin(self.root_url, 'servers/getVersion')
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
Loading…
Reference in New Issue