new: Proper debug system

Make it easy to investigate the json blobs sent to the server.
pull/141/head
Raphaël Vinot 2017-11-08 17:33:55 -08:00
parent 4512a4eaca
commit f54a029e2a
4 changed files with 141 additions and 212 deletions

View File

@ -64,7 +64,7 @@ class AbstractMISP(collections.MutableMapping):
return self.to_dict() return self.to_dict()
def to_json(self): def to_json(self):
return json.dumps(self.to_dict(), cls=MISPEncode) return json.dumps(self, cls=MISPEncode)
def __getitem__(self, key): def __getitem__(self, key):
return getattr(self, key) return getattr(self, key)

View File

@ -11,6 +11,14 @@ import base64
import re import re
import functools import functools
import logging import logging
import warnings
from io import BytesIO, open
import zipfile
from . import __version__
from .exceptions import PyMISPError, SearchError, NoURL, NoKey
from .mispevent import MISPEvent, MISPAttribute
from .abstract import MISPEncode
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
@ -22,8 +30,6 @@ try:
except ImportError: except ImportError:
from urlparse import urljoin from urlparse import urljoin
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5") logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
from io import BytesIO, open
import zipfile
try: try:
import requests import requests
@ -38,10 +44,6 @@ try:
except ImportError: except ImportError:
ASYNC_OK = False ASYNC_OK = False
from . import __version__
from .exceptions import PyMISPError, SearchError, MissingDependency, NoURL, NoKey
from .mispevent import MISPEvent, MISPAttribute
from .abstract import MISPEncode
def deprecated(func): def deprecated(func):
'''This is a decorator which can be used to mark functions '''This is a decorator which can be used to mark functions
@ -89,7 +91,7 @@ class PyMISP(object):
self.cert = cert self.cert = cert
self.asynch = asynch self.asynch = asynch
if asynch and not ASYNC_OK: if asynch and not ASYNC_OK:
logger.warning("You turned on Async, but don't have requests_futures installed") logger.critical("You turned on Async, but don't have requests_futures installed")
self.asynch = False self.asynch = False
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data') self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
@ -118,8 +120,7 @@ class PyMISP(object):
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e)) raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
try: try:
session = self.__prepare_session() response = self.__prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
response = session.get(urljoin(self.root_url, 'attributes/describeTypes.json'))
describe_types = self._check_response(response) describe_types = self._check_response(response)
if describe_types.get('error'): if describe_types.get('error'):
for e in describe_types.get('error'): for e in describe_types.get('error'):
@ -137,26 +138,32 @@ class PyMISP(object):
self.category_type_mapping = self.describe_types['category_type_mappings'] self.category_type_mapping = self.describe_types['category_type_mappings']
self.sane_default = self.describe_types['sane_defaults'] self.sane_default = self.describe_types['sane_defaults']
def __prepare_session(self, output='json', async_implemented=False): def __prepare_request(self, request_type, url, data=None,
"""Prepare the session headers""" background_callback=None, output_type='json'):
if logger.isEnabledFor(logging.DEBUG):
if not HAVE_REQUESTS: logger.debug('{} - {}'.format(request_type, url))
raise MissingDependency('Missing dependency, install requests (`pip install requests`)') if data is not None:
if self.asynch and async_implemented: logger.debug(data)
session = FuturesSession() if data is None:
req = requests.Request(request_type, url)
else: else:
session = requests.Session() req = requests.Request(request_type, url, data=data)
session.verify = self.ssl if self.asynch and background_callback is not None:
session.proxies = self.proxies s = FuturesSession()
session.cert = self.cert else:
session.headers.update( s = requests.Session()
prepped = s.prepare_request(req)
prepped.headers.update(
{'Authorization': self.key, {'Authorization': self.key,
'Accept': 'application/{}'.format(output), 'Accept': 'application/{}'.format(output_type),
'content-type': 'application/{}'.format(output), 'content-type': 'application/{}'.format(output_type),
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)}) 'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug(session.headers) logger.debug(prepped.headers)
return session if self.asynch and background_callback is not None:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
else:
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
# ##################### # #####################
# ### Core helpers #### # ### Core helpers ####
@ -294,13 +301,11 @@ class PyMISP(object):
Warning, there's a limit on the number of results Warning, there's a limit on the number of results
""" """
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/index') url = urljoin(self.root_url, 'events/index')
if filters is not None: if filters is None:
filters = json.dumps(filters) response = self.__prepare_request('GET', url)
response = session.post(url, data=filters)
else: else:
response = session.get(url) response = self.__prepare_request('POST', url, json.dumps(filters))
return self._check_response(response) return self._check_response(response)
def get_event(self, event_id): def get_event(self, event_id):
@ -308,9 +313,8 @@ class PyMISP(object):
:param event_id: Event id to get :param event_id: Event id to get
""" """
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def add_event(self, event): def add_event(self, event):
@ -318,14 +322,12 @@ class PyMISP(object):
:param event: Event as JSON object / string to add :param event: Event as JSON object / string to add
""" """
session = self.__prepare_session()
url = urljoin(self.root_url, 'events') url = urljoin(self.root_url, 'events')
if isinstance(event, MISPEvent): if isinstance(event, MISPEvent):
event = json.dumps(event, cls=MISPEncode) event = event.to_json()
if isinstance(event, basestring): elif not isinstance(event, basestring):
response = session.post(url, data=event) event = json.dumps(event)
else: response = self.__prepare_request('POST', url, event)
response = session.post(url, data=json.dumps(event))
return self._check_response(response) return self._check_response(response)
def update_event(self, event_id, event): def update_event(self, event_id, event):
@ -334,14 +336,12 @@ class PyMISP(object):
:param event_id: Event id to update :param event_id: Event id to update
:param event: Event as JSON object / string to add :param event: Event as JSON object / string to add
""" """
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
if isinstance(event, MISPEvent): if isinstance(event, MISPEvent):
event = json.dumps(event, cls=MISPEncode) event = event.to_json()
if isinstance(event, basestring): elif not isinstance(event, basestring):
response = session.post(url, data=event) event = json.dumps(event)
else: response = self.__prepare_request('POST', url, event)
response = session.post(url, data=json.dumps(event))
return self._check_response(response) return self._check_response(response)
def delete_event(self, event_id): def delete_event(self, event_id):
@ -349,26 +349,23 @@ class PyMISP(object):
:param event_id: Event id to delete :param event_id: Event id to delete
""" """
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/{}'.format(event_id)) url = urljoin(self.root_url, 'events/{}'.format(event_id))
response = session.delete(url) response = self.__prepare_request('DELETE', url)
return self._check_response(response) return self._check_response(response)
def delete_attribute(self, attribute_id, hard_delete=False): def delete_attribute(self, attribute_id, hard_delete=False):
"""Delete an attribute by ID""" """Delete an attribute by ID"""
session = self.__prepare_session()
if hard_delete: if hard_delete:
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id)) url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
else: else:
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id)) url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def pushEventToZMQ(self, event_id): def pushEventToZMQ(self, event_id):
"""Force push an event on ZMQ""" """Force push an event on ZMQ"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id)) url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
# ############################################## # ##############################################
@ -401,12 +398,11 @@ class PyMISP(object):
event_id = full_event.id event_id = full_event.id
if full_event.published: if full_event.published:
return {'error': 'Already published'} return {'error': 'Already published'}
session = self.__prepare_session()
if not alert: if not alert:
url = urljoin(self.root_url, 'events/publish/{}'.format(event_id)) url = urljoin(self.root_url, 'events/publish/{}'.format(event_id))
else: else:
url = urljoin(self.root_url, 'events/alert/{}'.format(event_id)) url = urljoin(self.root_url, 'events/alert/{}'.format(event_id))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
def change_threat_level(self, event, threat_level_id): def change_threat_level(self, event, threat_level_id):
@ -431,20 +427,18 @@ class PyMISP(object):
"""Tag an event or an attribute""" """Tag an event or an attribute"""
if not self._valid_uuid(uuid): if not self._valid_uuid(uuid):
raise PyMISPError('Invalid UUID') raise PyMISPError('Invalid UUID')
session = self.__prepare_session() url = urljoin(self.root_url, 'tags/attachTagToObject')
to_post = {'uuid': uuid, 'tag': tag} to_post = {'uuid': uuid, 'tag': tag}
path = 'tags/attachTagToObject' response = self.__prepare_request('POST', url, json.dumps(to_post))
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
return self._check_response(response) return self._check_response(response)
def untag(self, uuid, tag): def untag(self, uuid, tag):
"""Untag an event or an attribute""" """Untag an event or an attribute"""
if not self._valid_uuid(uuid): if not self._valid_uuid(uuid):
raise PyMISPError('Invalid UUID') raise PyMISPError('Invalid UUID')
session = self.__prepare_session() url = urljoin(self.root_url, 'tags/removeTagFromObject')
to_post = {'uuid': uuid, 'tag': tag} to_post = {'uuid': uuid, 'tag': tag}
path = 'tags/removeTagFromObject' response = self.__prepare_request('POST', url, json.dumps(to_post))
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
return self._check_response(response) return self._check_response(response)
# ##### File attributes ##### # ##### File attributes #####
@ -474,9 +468,8 @@ class PyMISP(object):
if proposal: if proposal:
response = self.proposal_add(eventID_to_update, a) response = self.proposal_add(eventID_to_update, a)
else: else:
session = self.__prepare_session()
url = urljoin(self.root_url, 'attributes/add/{}'.format(eventID_to_update)) url = urljoin(self.root_url, 'attributes/add/{}'.format(eventID_to_update))
response = self._check_response(session.post(url, data=json.dumps(a, cls=MISPEncode))) response = self.__prepare_request('POST', url, a.to_json())
return response return response
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs): def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
@ -803,61 +796,54 @@ class PyMISP(object):
def _upload_sample(self, to_post, event_id=None): def _upload_sample(self, to_post, event_id=None):
"""Helper to upload a sample""" """Helper to upload a sample"""
session = self.__prepare_session()
if event_id is None: if event_id is None:
url = urljoin(self.root_url, 'events/upload_sample') url = urljoin(self.root_url, 'events/upload_sample')
else: else:
url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id)) url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id))
logger.info(to_post) response = self.__prepare_request('POST', url, json.dumps(to_post))
response = session.post(url, data=json.dumps(to_post))
return self._check_response(response) return self._check_response(response)
# ############################ # ############################
# ######## Proposals ######### # ######## Proposals #########
# ############################ # ############################
def __query_proposal(self, session, path, id, attribute=None): def __query_proposal(self, path, id, attribute=None):
"""Helper to prepare a query to handle proposals""" """Helper to prepare a query to handle proposals"""
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id)) url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
if path in ['add', 'edit']: if path in ['add', 'edit']:
query = {'request': {'ShadowAttribute': attribute}} query = {'request': {'ShadowAttribute': attribute}}
response = session.post(url, data=json.dumps(query, cls=MISPEncode)) response = self.__prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
elif path == 'view': elif path == 'view':
response = session.get(url) response = self.__prepare_request('GET', url)
else: # accept or discard else: # accept or discard
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
def proposal_view(self, event_id=None, proposal_id=None): def proposal_view(self, event_id=None, proposal_id=None):
"""View a proposal""" """View a proposal"""
session = self.__prepare_session()
if proposal_id is not None and event_id is not None: if proposal_id is not None and event_id is not None:
return {'error': 'You can only view an event ID or a proposal ID'} return {'error': 'You can only view an event ID or a proposal ID'}
if event_id is not None: if event_id is not None:
id = event_id id = event_id
else: else:
id = proposal_id id = proposal_id
return self.__query_proposal(session, 'view', id) return self.__query_proposal('view', id)
def proposal_add(self, event_id, attribute): def proposal_add(self, event_id, attribute):
"""Add a proposal""" """Add a proposal"""
session = self.__prepare_session() return self.__query_proposal('add', event_id, attribute)
return self.__query_proposal(session, 'add', event_id, attribute)
def proposal_edit(self, attribute_id, attribute): def proposal_edit(self, attribute_id, attribute):
"""Edit a proposal""" """Edit a proposal"""
session = self.__prepare_session() return self.__query_proposal('edit', attribute_id, attribute)
return self.__query_proposal(session, 'edit', attribute_id, attribute)
def proposal_accept(self, proposal_id): def proposal_accept(self, proposal_id):
"""Accept a proposal""" """Accept a proposal"""
session = self.__prepare_session() return self.__query_proposal('accept', proposal_id)
return self.__query_proposal(session, 'accept', proposal_id)
def proposal_discard(self, proposal_id): def proposal_discard(self, proposal_id):
"""Discard a proposal""" """Discard a proposal"""
session = self.__prepare_session() return self.__query_proposal('discard', proposal_id)
return self.__query_proposal(session, 'discard', proposal_id)
# ############################## # ##############################
# ###### Attribute update ###### # ###### Attribute update ######
@ -868,8 +854,7 @@ class PyMISP(object):
if to_ids not in [0, 1]: if to_ids not in [0, 1]:
raise Exception('to_ids can only be 0 or 1') raise Exception('to_ids can only be 0 or 1')
query = {"to_ids": to_ids} query = {"to_ids": to_ids}
session = self.__prepare_session() return self.__query('edit/{}'.format(attribute_uuid), query, controller='attributes')
return self.__query(session, 'edit/{}'.format(attribute_uuid), query, controller='attributes')
# ############################## # ##############################
# ###### Attribute update ###### # ###### Attribute update ######
@ -885,28 +870,24 @@ class PyMISP(object):
query['adhereToWarninglists'] = adhereToWarninglists query['adhereToWarninglists'] = adhereToWarninglists
if distribution is not None: if distribution is not None:
query['distribution'] = distribution query['distribution'] = distribution
session = self.__prepare_session() return self.__query('freeTextImport/{}'.format(event_id), query, controller='events')
return self.__query(session, 'freeTextImport/{}'.format(event_id), query, controller='events')
# ############################## # ##############################
# ######## REST Search ######### # ######## REST Search #########
# ############################## # ##############################
def __query(self, session, path, query, controller='events', async_callback=None): def __query(self, path, query, controller='events', async_callback=None):
"""Helper to prepare a search query""" """Helper to prepare a search query"""
if query.get('error') is not None: if query.get('error') is not None:
return query return query
if controller not in ['events', 'attributes']: if controller not in ['events', 'attributes']:
raise Exception('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes']))) raise Exception('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes'])))
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/'))) url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
if logger.isEnabledFor(logging.DEBUG):
logger.debug('URL: %s', url)
logger.debug('Query: %s', query)
if ASYNC_OK and isinstance(session, FuturesSession) and async_callback: if ASYNC_OK and async_callback:
response = session.post(url, data=json.dumps(query), background_callback=async_callback) response = self.__prepare_request('POST', url, json.dumps(query), async_callback)
else: else:
response = session.post(url, data=json.dumps(query)) response = self.__prepare_request('POST', url, json.dumps(query))
return self._check_response(response) return self._check_response(response)
def search_index(self, published=None, eventid=None, tag=None, datefrom=None, def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
@ -952,13 +933,12 @@ class PyMISP(object):
if not set(param).issubset(rule_levels[rule]): if not set(param).issubset(rule_levels[rule]):
raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule]))) raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule])))
to_post[rule] = '|'.join(str(x) for x in param) to_post[rule] = '|'.join(str(x) for x in param)
session = self.__prepare_session(async_implemented=(async_callback is not None))
url = urljoin(self.root_url, buildup_url) url = urljoin(self.root_url, buildup_url)
if self.asynch and async_callback: if self.asynch and async_callback:
response = session.post(url, data=json.dumps(to_post), background_callback=async_callback) response = self.__prepare_request('POST', url, json.dumps(to_post), async_callback)
else: else:
response = session.post(url, data=json.dumps(to_post)) response = self.__prepare_request('POST', url, json.dumps(to_post))
res = self._check_response(response) res = self._check_response(response)
if normalize: if normalize:
to_return = {'response': []} to_return = {'response': []}
@ -971,8 +951,7 @@ class PyMISP(object):
def search_all(self, value): def search_all(self, value):
"""Search a value in the whole database""" """Search a value in the whole database"""
query = {'value': value, 'searchall': 1} query = {'value': value, 'searchall': 1}
session = self.__prepare_session() return self.__query('restSearch/download', query)
return self.__query(session, 'restSearch/download', query)
def __prepare_rest_search(self, values, not_values): def __prepare_rest_search(self, values, not_values):
"""Prepare a search, generate the chain processed by the server """Prepare a search, generate the chain processed by the server
@ -1082,8 +1061,7 @@ class PyMISP(object):
raise SearchError('Unused parameter: {}'.format(', '.join(kwargs.keys()))) raise SearchError('Unused parameter: {}'.format(', '.join(kwargs.keys())))
# Create a session, make it async if and only if we have a callback # Create a session, make it async if and only if we have a callback
session = self.__prepare_session(async_implemented=(async_callback is not None)) return self.__query('restSearch/download', query, controller, async_callback)
return self.__query(session, 'restSearch/download', query, controller, async_callback)
def get_attachment(self, attribute_id): def get_attachment(self, attribute_id):
"""Get an attachement (not a malware sample) by attribute ID. """Get an attachement (not a malware sample) by attribute ID.
@ -1091,9 +1069,8 @@ class PyMISP(object):
:param attribute_id: Attribute ID to fetched :param attribute_id: Attribute ID to fetched
""" """
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(attribute_id)) url = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(attribute_id))
session = self.__prepare_session() response = self.__prepare_request('GET', url)
response = session.get(attach)
try: try:
response.json() response.json()
# The query fails, response contains a json blob # The query fails, response contains a json blob
@ -1104,9 +1081,9 @@ class PyMISP(object):
def get_yara(self, event_id): def get_yara(self, event_id):
"""Get the yara rules from an event""" """Get the yara rules from an event"""
url = urljoin(self.root_url, 'attributes/restSearch')
to_post = {'request': {'eventid': event_id, 'type': 'yara'}} to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
session = self.__prepare_session() response = self.__prepare_request('POST', url, data=json.dumps(to_post))
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
result = self._check_response(response) result = self._check_response(response)
if result.get('error') is not None: if result.get('error') is not None:
return False, result.get('error') return False, result.get('error')
@ -1117,9 +1094,9 @@ class PyMISP(object):
def download_samples(self, sample_hash=None, event_id=None, all_samples=False): def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch""" """Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch"""
url = urljoin(self.root_url, 'attributes/downloadSample')
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}} to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
session = self.__prepare_session() response = self.__prepare_request('POST', url, data=json.dumps(to_post))
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
result = self._check_response(response) result = self._check_response(response)
if result.get('error') is not None: if result.get('error') is not None:
return False, result.get('error') return False, result.get('error')
@ -1155,9 +1132,8 @@ class PyMISP(object):
def get_all_tags(self, quiet=False): def get_all_tags(self, quiet=False):
"""Get all the tags used on the instance""" """Get all the tags used on the instance"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'tags') url = urljoin(self.root_url, 'tags')
response = session.get(url) response = self.__prepare_request('GET', url)
r = self._check_response(response) r = self._check_response(response)
if not quiet or r.get('errors'): if not quiet or r.get('errors'):
return r return r
@ -1170,9 +1146,8 @@ class PyMISP(object):
def new_tag(self, name=None, colour="#00ace6", exportable=False): def new_tag(self, name=None, colour="#00ace6", exportable=False):
"""Create a new tag""" """Create a new tag"""
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}} to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
session = self.__prepare_session()
url = urljoin(self.root_url, 'tags/add') url = urljoin(self.root_url, 'tags/add')
response = session.post(url, data=json.dumps(to_post)) response = self.__prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response) return self._check_response(response)
# ########## Version ########## # ########## Version ##########
@ -1192,16 +1167,14 @@ class PyMISP(object):
def get_recommended_api_version(self): def get_recommended_api_version(self):
"""Returns the recommended API version from the server""" """Returns the recommended API version from the server"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json') url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def get_version(self): def get_version(self):
"""Returns the version of the instance.""" """Returns the version of the instance."""
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/getVersion.json') url = urljoin(self.root_url, 'servers/getVersion.json')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def get_version_master(self): def get_version_master(self):
@ -1217,19 +1190,17 @@ class PyMISP(object):
def get_attributes_statistics(self, context='type', percentage=None): def get_attributes_statistics(self, context='type', percentage=None):
"""Get attributes statistics from the MISP instance""" """Get attributes statistics from the MISP instance"""
session = self.__prepare_session()
if (context != 'category'): if (context != 'category'):
context = 'type' context = 'type'
if percentage is not None: if percentage is not None:
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage)) url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
else: else:
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context)) url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def get_tags_statistics(self, percentage=None, name_sort=None): def get_tags_statistics(self, percentage=None, name_sort=None):
"""Get tags statistics from the MISP instance""" """Get tags statistics from the MISP instance"""
session = self.__prepare_session()
if percentage is not None: if percentage is not None:
percentage = 'true' percentage = 'true'
else: else:
@ -1239,32 +1210,29 @@ class PyMISP(object):
else: else:
name_sort = 'false' name_sort = 'false'
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort)) url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
# ############## Sightings ################## # ############## Sightings ##################
def sighting_per_id(self, attribute_id): def sighting_per_id(self, attribute_id):
"""Add a sighting to an attribute (by attribute ID)""" """Add a sighting to an attribute (by attribute ID)"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id)) url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
def sighting_per_uuid(self, attribute_uuid): def sighting_per_uuid(self, attribute_uuid):
"""Add a sighting to an attribute (by attribute UUID)""" """Add a sighting to an attribute (by attribute UUID)"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid)) url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
def set_sightings(self, sightings): def set_sightings(self, sightings):
"""Push a sighting (python dictionary)""" """Push a sighting (python dictionary)"""
if isinstance(sightings, dict): if isinstance(sightings, dict):
sightings = json.dumps(sightings) sightings = json.dumps(sightings)
session = self.__prepare_session()
url = urljoin(self.root_url, 'sightings/add/') url = urljoin(self.root_url, 'sightings/add/')
response = session.post(url, data=sightings) response = self.__prepare_request('POST', url, sightings)
return self._check_response(response) return self._check_response(response)
def sighting_per_json(self, json_file): def sighting_per_json(self, json_file):
@ -1277,9 +1245,8 @@ class PyMISP(object):
def get_sharing_groups(self): def get_sharing_groups(self):
"""Get the existing sharing groups""" """Get the existing sharing groups"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'sharing_groups.json') url = urljoin(self.root_url, 'sharing_groups.json')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['response'] return self._check_response(response)['response']
# ############## Users ################## # ############## Users ##################
@ -1325,57 +1292,49 @@ class PyMISP(object):
return user return user
def get_users_list(self): def get_users_list(self):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users') url = urljoin(self.root_url, 'admin/users')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['response'] return self._check_response(response)['response']
def get_user(self, user_id): def get_user(self, user_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users/view/{}'.format(user_id)) url = urljoin(self.root_url, 'admin/users/view/{}'.format(user_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def add_user(self, email, org_id, role_id, **kwargs): def add_user(self, email, org_id, role_id, **kwargs):
new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users/add/') url = urljoin(self.root_url, 'admin/users/add/')
response = session.post(url, data=json.dumps(new_user)) new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
response = self.__prepare_request('POST', url, json.dumps(new_user))
return self._check_response(response) return self._check_response(response)
def add_user_json(self, json_file): def add_user_json(self, json_file):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'admin/users/add/') url = urljoin(self.root_url, 'admin/users/add/')
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
def get_user_fields_list(self): def get_user_fields_list(self):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users/add/') url = urljoin(self.root_url, 'admin/users/add/')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def edit_user(self, user_id, **kwargs): def edit_user(self, user_id, **kwargs):
edit_user = self._set_user_parameters(**kwargs) edit_user = self._set_user_parameters(**kwargs)
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
response = session.post(url, data=json.dumps(edit_user)) response = self.__prepare_request('POST', url, json.dumps(edit_user))
return self._check_response(response) return self._check_response(response)
def edit_user_json(self, json_file, user_id): def edit_user_json(self, json_file, user_id):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
def delete_user(self, user_id): def delete_user(self, user_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id)) url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
# ############## Organisations ################## # ############## Organisations ##################
@ -1401,64 +1360,56 @@ class PyMISP(object):
return organisation return organisation
def get_organisations_list(self, scope="local"): def get_organisations_list(self, scope="local"):
session = self.__prepare_session()
scope = scope.lower() scope = scope.lower()
if scope not in ["local", "external", "all"]: if scope not in ["local", "external", "all"]:
raise ValueError("Authorized fields are 'local','external' or 'all'") raise ValueError("Authorized fields are 'local','external' or 'all'")
url = urljoin(self.root_url, 'organisations/index/scope:{}'.format(scope)) url = urljoin(self.root_url, 'organisations/index/scope:{}'.format(scope))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['response'] return self._check_response(response)['response']
def get_organisation(self, organisation_id): def get_organisation(self, organisation_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id)) url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def add_organisation(self, name, **kwargs): def add_organisation(self, name, **kwargs):
new_org = self._set_organisation_parameters(**dict(name=name, **kwargs)) new_org = self._set_organisation_parameters(**dict(name=name, **kwargs))
session = self.__prepare_session()
if 'local' in new_org: if 'local' in new_org:
if new_org.get('local') is False: if new_org.get('local') is False:
if 'uuid' not in new_org: if 'uuid' not in new_org:
raise PyMISPError('A remote org MUST have a valid uuid') raise PyMISPError('A remote org MUST have a valid uuid')
url = urljoin(self.root_url, 'admin/organisations/add/') url = urljoin(self.root_url, 'admin/organisations/add/')
response = session.post(url, data=json.dumps(new_org)) response = self.__prepare_request('POST', url, json.dumps(new_org))
return self._check_response(response) return self._check_response(response)
def add_organisation_json(self, json_file): def add_organisation_json(self, json_file):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'admin/organisations/add/') url = urljoin(self.root_url, 'admin/organisations/add/')
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
def get_organisation_fields_list(self): def get_organisation_fields_list(self):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/organisations/add/') url = urljoin(self.root_url, 'admin/organisations/add/')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def edit_organisation(self, org_id, **kwargs): def edit_organisation(self, org_id, **kwargs):
edit_org = self._set_organisation_parameters(**kwargs) edit_org = self._set_organisation_parameters(**kwargs)
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
response = session.post(url, data=json.dumps(edit_org)) response = self.__prepare_request('POST', url, json.dumps(edit_org))
return self._check_response(response) return self._check_response(response)
def edit_organisation_json(self, json_file, org_id): def edit_organisation_json(self, json_file, org_id):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
def delete_organisation(self, org_id): def delete_organisation(self, org_id):
session = self.__prepare_session()
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id)) url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
response = session.post(url) response = self.__prepare_request('POST', url)
return self._check_response(response) return self._check_response(response)
# ############## Servers ################## # ############## Servers ##################
@ -1521,17 +1472,15 @@ class PyMISP(object):
new_server = self._set_server_parameters(url, name, authkey, organisation, internal, new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
push, pull, self_signed, push_rules, pull_rules, submitted_cert, push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, None, None) submitted_client_cert, None, None)
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/add') url = urljoin(self.root_url, 'servers/add')
response = session.post(url, data=json.dumps(new_server)) response = self.__prepare_request('POST', url, json.dumps(new_server))
return self._check_response(response) return self._check_response(response)
def add_server_json(self, json_file): def add_server_json(self, json_file):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'servers/add') url = urljoin(self.root_url, 'servers/add')
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False, def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False,
@ -1540,35 +1489,31 @@ class PyMISP(object):
new_server = self._set_server_parameters(url, name, authkey, organisation, internal, new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
push, pull, self_signed, push_rules, pull_rules, submitted_cert, push, pull, self_signed, push_rules, pull_rules, submitted_cert,
submitted_client_cert, delete_cert, delete_client_cert) submitted_client_cert, delete_cert, delete_client_cert)
session = self.__prepare_session()
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id)) url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = session.post(url, data=json.dumps(new_server)) response = self.__prepare_request('POST', url, json.dumps(new_server))
return self._check_response(response) return self._check_response(response)
def edit_server_json(self, json_file, server_id): def edit_server_json(self, json_file, server_id):
session = self.__prepare_session()
with open(json_file, 'r') as f: with open(json_file, 'r') as f:
jdata = json.load(f) jdata = json.load(f)
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id)) url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
response = session.post(url, data=json.dumps(jdata)) response = self.__prepare_request('POST', url, json.dumps(jdata))
return self._check_response(response) return self._check_response(response)
# ############## Roles ################## # ############## Roles ##################
def get_roles_list(self): def get_roles_list(self):
"""Get the list of existing roles""" """Get the list of existing roles"""
session = self.__prepare_session()
url = urljoin(self.root_url, '/roles') url = urljoin(self.root_url, '/roles')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['response'] return self._check_response(response)['response']
# ############## Tags ################## # ############## Tags ##################
def get_tags_list(self): def get_tags_list(self):
"""Get the list of existing tags""" """Get the list of existing tags"""
session = self.__prepare_session()
url = urljoin(self.root_url, '/tags') url = urljoin(self.root_url, '/tags')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['Tag'] return self._check_response(response)['Tag']
# ############################################## # ##############################################
@ -1579,9 +1524,8 @@ class PyMISP(object):
def download_all_suricata(self): def download_all_suricata(self):
"""Download all suricata rules events.""" """Download all suricata rules events."""
suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download') url = urljoin(self.root_url, 'events/nids/suricata/download')
session = self.__prepare_session('rules') response = self.__prepare_request('GET', url, output_type='rules')
response = session.get(suricata_rules)
return response return response
def download_suricata_rule_event(self, event_id): def download_suricata_rule_event(self, event_id):
@ -1589,18 +1533,16 @@ class PyMISP(object):
:param event_id: ID of the event to download (same as get) :param event_id: ID of the event to download (same as get)
""" """
template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id)) url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
session = self.__prepare_session('rules') response = self.__prepare_request('GET', url, output_type='rules')
response = session.get(template)
return response return response
# ############## Text ############### # ############## Text ###############
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False): def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise.""" """Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
session = self.__prepare_session('txt')
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished)) url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
response = session.get(url) response = self.__prepare_request('GET', url, output_type='txt')
return response return response
# ############## STIX ############## # ############## STIX ##############
@ -1610,12 +1552,10 @@ class PyMISP(object):
if tags: if tags:
if isinstance(tags, list): if isinstance(tags, list):
tags = "&&".join(tags) tags = "&&".join(tags)
session = self.__prepare_session()
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format( url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
event_id, with_attachments, tags, from_date, to_date)) event_id, with_attachments, tags, from_date, to_date))
logger.debug("Getting STIX event from %s", url) logger.debug("Getting STIX event from %s", url)
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def get_stix(self, **kwargs): def get_stix(self, **kwargs):
@ -1627,58 +1567,50 @@ class PyMISP(object):
def fetch_feed(self, feed_id): def fetch_feed(self, feed_id):
"""Fetch one single feed""" """Fetch one single feed"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id)) url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def view_feeds(self): def view_feeds(self):
"""Get the content of all the feeds""" """Get the content of all the feeds"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds') url = urljoin(self.root_url, 'feeds')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def view_feed(self, feed_ids): def view_feed(self, feed_ids):
"""Get the content of a single feed""" """Get the content of a single feed"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/view/{}'.format(feed_ids)) url = urljoin(self.root_url, 'feeds/view/{}'.format(feed_ids))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def cache_feeds_all(self): def cache_feeds_all(self):
""" Cache all the feeds""" """ Cache all the feeds"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/cacheFeeds/all') url = urljoin(self.root_url, 'feeds/cacheFeeds/all')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def cache_feed(self, feed_id): def cache_feed(self, feed_id):
"""Cache a specific feed""" """Cache a specific feed"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id)) url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id))
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def cache_feeds_freetext(self): def cache_feeds_freetext(self):
"""Cache all the freetext feeds""" """Cache all the freetext feeds"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext') url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def cache_feeds_misp(self): def cache_feeds_misp(self):
"""Cache all the MISP feeds""" """Cache all the MISP feeds"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/cacheFeeds/misp') url = urljoin(self.root_url, 'feeds/cacheFeeds/misp')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def compare_feeds(self): def compare_feeds(self):
"""Generate the comparison matrix for all the MISP feeds""" """Generate the comparison matrix for all the MISP feeds"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'feeds/compareFeeds') url = urljoin(self.root_url, 'feeds/compareFeeds')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response) return self._check_response(response)
def cache_all_feeds(self): def cache_all_feeds(self):
@ -1692,25 +1624,20 @@ class PyMISP(object):
def add_object(self, event_id, template_id, misp_object): def add_object(self, event_id, template_id, misp_object):
"""Add an object""" """Add an object"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id)) url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
if logger.isEnabledFor(logging.DEBUG): response = self.__prepare_request('POST', url, misp_object.to_json())
logger.debug(url)
response = session.post(url, data=misp_object.to_json())
return self._check_response(response) return self._check_response(response)
def add_object_reference(self, misp_object_reference): def add_object_reference(self, misp_object_reference):
"""Add a reference to an object""" """Add a reference to an object"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'object_references/add') url = urljoin(self.root_url, 'object_references/add')
response = session.post(url, data=misp_object_reference.to_json()) response = self.__prepare_request('POST', url, misp_object_reference.to_json())
return self._check_response(response) return self._check_response(response)
def get_object_templates_list(self): def get_object_templates_list(self):
"""Returns the list of Object templates available on the MISP instance""" """Returns the list of Object templates available on the MISP instance"""
session = self.__prepare_session()
url = urljoin(self.root_url, 'objectTemplates') url = urljoin(self.root_url, 'objectTemplates')
response = session.get(url) response = self.__prepare_request('GET', url)
return self._check_response(response)['response'] return self._check_response(response)['response']
def get_object_template_id(self, object_uuid): def get_object_template_id(self, object_uuid):
@ -1727,7 +1654,6 @@ class PyMISP(object):
@deprecated @deprecated
def add_tag(self, event, tag, attribute=False): def add_tag(self, event, tag, attribute=False):
session = self.__prepare_session()
if attribute: if attribute:
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
path = 'attributes/addTag' path = 'attributes/addTag'
@ -1737,17 +1663,18 @@ class PyMISP(object):
event = event["Event"] event = event["Event"]
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}} to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
path = 'events/addTag' path = 'events/addTag'
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) url = urljoin(self.root_url, path)
response = self.__prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response) return self._check_response(response)
@deprecated @deprecated
def remove_tag(self, event, tag, attribute=False): def remove_tag(self, event, tag, attribute=False):
session = self.__prepare_session()
if attribute: if attribute:
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
path = 'attributes/removeTag' path = 'attributes/removeTag'
else: else:
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}} to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
path = 'events/removeTag' path = 'events/removeTag'
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) url = urljoin(self.root_url, path)
response = self.__prepare_request('POST', url, json.dumps(to_post))
return self._check_response(response) return self._check_response(response)

View File

@ -40,6 +40,7 @@ class InvalidMISPObject(MISPObjectException):
"""Exception raised when an object doesn't respect the contrains in the definition""" """Exception raised when an object doesn't respect the contrains in the definition"""
pass pass
class UnknownMISPObjectTemplate(MISPObjectException): class UnknownMISPObjectTemplate(MISPObjectException):
"""Exception raised when the template is unknown""" """Exception raised when the template is unknown"""
pass pass

View File

@ -16,10 +16,11 @@ from collections import Counter
from .abstract import AbstractMISP from .abstract import AbstractMISP
from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError from .exceptions import UnknownMISPObjectTemplate, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError
import six # Remove that import when discarding python2 support.
import logging import logging
logger = logging.getLogger('pymisp') logger = logging.getLogger('pymisp')
import six # Remove that import when discarding python2 support.
if six.PY2: if six.PY2:
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5") logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
@ -139,7 +140,7 @@ class MISPAttribute(AbstractMISP):
try: try:
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1]) c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
return {self.uuid: True} return {self.uuid: True}
except: except Exception:
return {self.uuid: False} return {self.uuid: False}
def set_all_values(self, **kwargs): def set_all_values(self, **kwargs):
@ -252,7 +253,7 @@ class MISPAttribute(AbstractMISP):
else: else:
with f.open(name, pwd=b'infected') as unpacked: with f.open(name, pwd=b'infected') as unpacked:
self._malware_binary = BytesIO(unpacked.read()) self._malware_binary = BytesIO(unpacked.read())
except: except Exception:
# not a encrypted zip file, assuming it is a new malware sample # not a encrypted zip file, assuming it is a new malware sample
self._prepare_new_malware_sample() self._prepare_new_malware_sample()
@ -383,7 +384,7 @@ class MISPEvent(AbstractMISP):
try: try:
c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1]) c.verify(signed_data, signature=base64.b64decode(self.sig), verify=keys[:1])
to_return[self.uuid] = True to_return[self.uuid] = True
except: except Exception:
to_return[self.uuid] = False to_return[self.uuid] = False
for a in self.attributes: for a in self.attributes:
to_return.update(a.verify(gpg_uid)) to_return.update(a.verify(gpg_uid))
@ -393,7 +394,7 @@ class MISPEvent(AbstractMISP):
try: try:
c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1]) c.verify(to_verify_global, signature=base64.b64decode(self.global_sig), verify=keys[:1])
to_return['global'] = True to_return['global'] = True
except: except Exception:
to_return['global'] = False to_return['global'] = False
return to_return return to_return