|
|
|
@ -9,18 +9,27 @@ import datetime
|
|
|
|
|
import os
|
|
|
|
|
import base64
|
|
|
|
|
import re
|
|
|
|
|
import warnings
|
|
|
|
|
import functools
|
|
|
|
|
import logging
|
|
|
|
|
import warnings
|
|
|
|
|
from io import BytesIO, open
|
|
|
|
|
import zipfile
|
|
|
|
|
|
|
|
|
|
from . import __version__
|
|
|
|
|
from .exceptions import PyMISPError, SearchError, NoURL, NoKey
|
|
|
|
|
from .mispevent import MISPEvent, MISPAttribute
|
|
|
|
|
from .abstract import MISPEncode
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('pymisp')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from urllib.parse import urljoin
|
|
|
|
|
# Least dirty way to support python 2 and 3
|
|
|
|
|
basestring = str
|
|
|
|
|
unicode = str
|
|
|
|
|
except ImportError:
|
|
|
|
|
from urlparse import urljoin
|
|
|
|
|
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.5")
|
|
|
|
|
from io import BytesIO, open
|
|
|
|
|
import zipfile
|
|
|
|
|
logger.warning("You're using python 2, it is strongly recommended to use python >=3.5")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import requests
|
|
|
|
@ -35,23 +44,6 @@ try:
|
|
|
|
|
except ImportError:
|
|
|
|
|
ASYNC_OK = False
|
|
|
|
|
|
|
|
|
|
from . import __version__
|
|
|
|
|
from .exceptions import PyMISPError, SearchError, MissingDependency, NoURL, NoKey
|
|
|
|
|
from .mispevent import MISPEvent, MISPAttribute
|
|
|
|
|
from .abstract import MISPEncode
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Least dirty way to support python 2 and 3
|
|
|
|
|
try:
|
|
|
|
|
basestring
|
|
|
|
|
unicode
|
|
|
|
|
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
|
|
|
|
|
except NameError:
|
|
|
|
|
basestring = str
|
|
|
|
|
unicode = str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def deprecated(func):
|
|
|
|
|
'''This is a decorator which can be used to mark functions
|
|
|
|
@ -99,14 +91,15 @@ class PyMISP(object):
|
|
|
|
|
self.cert = cert
|
|
|
|
|
self.asynch = asynch
|
|
|
|
|
if asynch and not ASYNC_OK:
|
|
|
|
|
warnings.warn("You turned on Async, but don't have requests_futures installed")
|
|
|
|
|
logger.critical("You turned on Async, but don't have requests_futures installed")
|
|
|
|
|
self.asynch = False
|
|
|
|
|
|
|
|
|
|
self.resources_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'data')
|
|
|
|
|
if out_type != 'json':
|
|
|
|
|
raise PyMISPError('The only output type supported by PyMISP is JSON. If you still rely on XML, use PyMISP v2.4.49')
|
|
|
|
|
if debug is not None:
|
|
|
|
|
warnings.warn('debug is deprecated, configure logging in api client')
|
|
|
|
|
if debug:
|
|
|
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
logger.info('To configure logging in your script, leave it to None and use the following: import logging; logging.getLogger(\'pymisp\').setLevel(logging.DEBUG)')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Make sure the MISP instance is working and the URL is valid
|
|
|
|
@ -127,8 +120,7 @@ class PyMISP(object):
|
|
|
|
|
raise PyMISPError('Unable to connect to MISP ({}). Please make sure the API key and the URL are correct (http/https is required): {}'.format(self.root_url, e))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
response = session.get(urljoin(self.root_url, 'attributes/describeTypes.json'))
|
|
|
|
|
response = self.__prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
|
|
|
|
|
describe_types = self._check_response(response)
|
|
|
|
|
if describe_types.get('error'):
|
|
|
|
|
for e in describe_types.get('error'):
|
|
|
|
@ -146,24 +138,32 @@ class PyMISP(object):
|
|
|
|
|
self.category_type_mapping = self.describe_types['category_type_mappings']
|
|
|
|
|
self.sane_default = self.describe_types['sane_defaults']
|
|
|
|
|
|
|
|
|
|
def __prepare_session(self, output='json', async_implemented=False):
|
|
|
|
|
"""Prepare the session headers"""
|
|
|
|
|
|
|
|
|
|
if not HAVE_REQUESTS:
|
|
|
|
|
raise MissingDependency('Missing dependency, install requests (`pip install requests`)')
|
|
|
|
|
if self.asynch and async_implemented:
|
|
|
|
|
session = FuturesSession()
|
|
|
|
|
def __prepare_request(self, request_type, url, data=None,
|
|
|
|
|
background_callback=None, output_type='json'):
|
|
|
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
|
|
|
logger.debug('{} - {}'.format(request_type, url))
|
|
|
|
|
if data is not None:
|
|
|
|
|
logger.debug(data)
|
|
|
|
|
if data is None:
|
|
|
|
|
req = requests.Request(request_type, url)
|
|
|
|
|
else:
|
|
|
|
|
session = requests.Session()
|
|
|
|
|
session.verify = self.ssl
|
|
|
|
|
session.proxies = self.proxies
|
|
|
|
|
session.cert = self.cert
|
|
|
|
|
session.headers.update(
|
|
|
|
|
req = requests.Request(request_type, url, data=data)
|
|
|
|
|
if self.asynch and background_callback is not None:
|
|
|
|
|
s = FuturesSession()
|
|
|
|
|
else:
|
|
|
|
|
s = requests.Session()
|
|
|
|
|
prepped = s.prepare_request(req)
|
|
|
|
|
prepped.headers.update(
|
|
|
|
|
{'Authorization': self.key,
|
|
|
|
|
'Accept': 'application/{}'.format(output),
|
|
|
|
|
'content-type': 'application/{}'.format(output),
|
|
|
|
|
'Accept': 'application/{}'.format(output_type),
|
|
|
|
|
'content-type': 'application/{}'.format(output_type),
|
|
|
|
|
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
|
|
|
|
|
return session
|
|
|
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
|
|
|
logger.debug(prepped.headers)
|
|
|
|
|
if self.asynch and background_callback is not None:
|
|
|
|
|
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
|
|
|
|
|
else:
|
|
|
|
|
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
|
|
|
|
|
|
|
|
|
|
# #####################
|
|
|
|
|
# ### Core helpers ####
|
|
|
|
@ -209,15 +209,16 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def _check_response(self, response):
|
|
|
|
|
"""Check if the response from the server is not an unexpected error"""
|
|
|
|
|
errors = []
|
|
|
|
|
if response.status_code >= 500:
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
errors.append(response.json())
|
|
|
|
|
logger.critical('Something bad happened on the server-side: {}'.format(response.json()))
|
|
|
|
|
try:
|
|
|
|
|
to_return = response.json()
|
|
|
|
|
except ValueError:
|
|
|
|
|
logger.debug(response.text)
|
|
|
|
|
raise PyMISPError('Unknown error: {}'.format(response.text))
|
|
|
|
|
# It the server didn't return a JSON blob, we've a problem.
|
|
|
|
|
raise PyMISPError('Unknown error (something is very broken server-side: {}'.format(response.text))
|
|
|
|
|
|
|
|
|
|
errors = []
|
|
|
|
|
if isinstance(to_return, (list, str)):
|
|
|
|
|
to_return = {'response': to_return}
|
|
|
|
|
if to_return.get('error'):
|
|
|
|
@ -300,13 +301,11 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
Warning, there's a limit on the number of results
|
|
|
|
|
"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events/index')
|
|
|
|
|
if filters is not None:
|
|
|
|
|
filters = json.dumps(filters)
|
|
|
|
|
response = session.post(url, data=filters)
|
|
|
|
|
if filters is None:
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
else:
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(filters))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_event(self, event_id):
|
|
|
|
@ -314,9 +313,8 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
:param event_id: Event id to get
|
|
|
|
|
"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_event(self, event):
|
|
|
|
@ -324,14 +322,12 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
:param event: Event as JSON object / string to add
|
|
|
|
|
"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events')
|
|
|
|
|
if isinstance(event, MISPEvent):
|
|
|
|
|
event = json.dumps(event, cls=MISPEncode)
|
|
|
|
|
if isinstance(event, basestring):
|
|
|
|
|
response = session.post(url, data=event)
|
|
|
|
|
else:
|
|
|
|
|
response = session.post(url, data=json.dumps(event))
|
|
|
|
|
event = event.to_json()
|
|
|
|
|
elif not isinstance(event, basestring):
|
|
|
|
|
event = json.dumps(event)
|
|
|
|
|
response = self.__prepare_request('POST', url, event)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def update_event(self, event_id, event):
|
|
|
|
@ -340,14 +336,12 @@ class PyMISP(object):
|
|
|
|
|
:param event_id: Event id to update
|
|
|
|
|
:param event: Event as JSON object / string to add
|
|
|
|
|
"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
|
|
|
|
if isinstance(event, MISPEvent):
|
|
|
|
|
event = json.dumps(event, cls=MISPEncode)
|
|
|
|
|
if isinstance(event, basestring):
|
|
|
|
|
response = session.post(url, data=event)
|
|
|
|
|
else:
|
|
|
|
|
response = session.post(url, data=json.dumps(event))
|
|
|
|
|
event = event.to_json()
|
|
|
|
|
elif not isinstance(event, basestring):
|
|
|
|
|
event = json.dumps(event)
|
|
|
|
|
response = self.__prepare_request('POST', url, event)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def delete_event(self, event_id):
|
|
|
|
@ -355,26 +349,23 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
:param event_id: Event id to delete
|
|
|
|
|
"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
|
|
|
|
response = session.delete(url)
|
|
|
|
|
response = self.__prepare_request('DELETE', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def delete_attribute(self, attribute_id, hard_delete=False):
|
|
|
|
|
"""Delete an attribute by ID"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if hard_delete:
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
|
|
|
|
|
else:
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def pushEventToZMQ(self, event_id):
|
|
|
|
|
"""Force push an event on ZMQ"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ##############################################
|
|
|
|
@ -407,12 +398,11 @@ class PyMISP(object):
|
|
|
|
|
event_id = full_event.id
|
|
|
|
|
if full_event.published:
|
|
|
|
|
return {'error': 'Already published'}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if not alert:
|
|
|
|
|
url = urljoin(self.root_url, 'events/publish/{}'.format(event_id))
|
|
|
|
|
else:
|
|
|
|
|
url = urljoin(self.root_url, 'events/alert/{}'.format(event_id))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def change_threat_level(self, event, threat_level_id):
|
|
|
|
@ -437,20 +427,18 @@ class PyMISP(object):
|
|
|
|
|
"""Tag an event or an attribute"""
|
|
|
|
|
if not self._valid_uuid(uuid):
|
|
|
|
|
raise PyMISPError('Invalid UUID')
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'tags/attachTagToObject')
|
|
|
|
|
to_post = {'uuid': uuid, 'tag': tag}
|
|
|
|
|
path = 'tags/attachTagToObject'
|
|
|
|
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def untag(self, uuid, tag):
|
|
|
|
|
"""Untag an event or an attribute"""
|
|
|
|
|
if not self._valid_uuid(uuid):
|
|
|
|
|
raise PyMISPError('Invalid UUID')
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'tags/removeTagFromObject')
|
|
|
|
|
to_post = {'uuid': uuid, 'tag': tag}
|
|
|
|
|
path = 'tags/removeTagFromObject'
|
|
|
|
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ##### File attributes #####
|
|
|
|
@ -480,9 +468,8 @@ class PyMISP(object):
|
|
|
|
|
if proposal:
|
|
|
|
|
response = self.proposal_add(eventID_to_update, a)
|
|
|
|
|
else:
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/add/{}'.format(eventID_to_update))
|
|
|
|
|
response = self._check_response(session.post(url, data=json.dumps(a, cls=MISPEncode)))
|
|
|
|
|
response = self.__prepare_request('POST', url, a.to_json())
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def add_named_attribute(self, event, type_value, value, category=None, to_ids=False, comment=None, distribution=None, proposal=False, **kwargs):
|
|
|
|
@ -809,61 +796,54 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def _upload_sample(self, to_post, event_id=None):
|
|
|
|
|
"""Helper to upload a sample"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if event_id is None:
|
|
|
|
|
url = urljoin(self.root_url, 'events/upload_sample')
|
|
|
|
|
else:
|
|
|
|
|
url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id))
|
|
|
|
|
logger.info(to_post)
|
|
|
|
|
response = session.post(url, data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ############################
|
|
|
|
|
# ######## Proposals #########
|
|
|
|
|
# ############################
|
|
|
|
|
|
|
|
|
|
def __query_proposal(self, session, path, id, attribute=None):
|
|
|
|
|
def __query_proposal(self, path, id, attribute=None):
|
|
|
|
|
"""Helper to prepare a query to handle proposals"""
|
|
|
|
|
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
|
|
|
|
if path in ['add', 'edit']:
|
|
|
|
|
query = {'request': {'ShadowAttribute': attribute}}
|
|
|
|
|
response = session.post(url, data=json.dumps(query, cls=MISPEncode))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
|
|
|
|
|
elif path == 'view':
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
else: # accept or discard
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def proposal_view(self, event_id=None, proposal_id=None):
|
|
|
|
|
"""View a proposal"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if proposal_id is not None and event_id is not None:
|
|
|
|
|
return {'error': 'You can only view an event ID or a proposal ID'}
|
|
|
|
|
if event_id is not None:
|
|
|
|
|
id = event_id
|
|
|
|
|
else:
|
|
|
|
|
id = proposal_id
|
|
|
|
|
return self.__query_proposal(session, 'view', id)
|
|
|
|
|
return self.__query_proposal('view', id)
|
|
|
|
|
|
|
|
|
|
def proposal_add(self, event_id, attribute):
|
|
|
|
|
"""Add a proposal"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query_proposal(session, 'add', event_id, attribute)
|
|
|
|
|
return self.__query_proposal('add', event_id, attribute)
|
|
|
|
|
|
|
|
|
|
def proposal_edit(self, attribute_id, attribute):
|
|
|
|
|
"""Edit a proposal"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query_proposal(session, 'edit', attribute_id, attribute)
|
|
|
|
|
return self.__query_proposal('edit', attribute_id, attribute)
|
|
|
|
|
|
|
|
|
|
def proposal_accept(self, proposal_id):
|
|
|
|
|
"""Accept a proposal"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query_proposal(session, 'accept', proposal_id)
|
|
|
|
|
return self.__query_proposal('accept', proposal_id)
|
|
|
|
|
|
|
|
|
|
def proposal_discard(self, proposal_id):
|
|
|
|
|
"""Discard a proposal"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query_proposal(session, 'discard', proposal_id)
|
|
|
|
|
return self.__query_proposal('discard', proposal_id)
|
|
|
|
|
|
|
|
|
|
# ##############################
|
|
|
|
|
# ###### Attribute update ######
|
|
|
|
@ -874,8 +854,7 @@ class PyMISP(object):
|
|
|
|
|
if to_ids not in [0, 1]:
|
|
|
|
|
raise Exception('to_ids can only be 0 or 1')
|
|
|
|
|
query = {"to_ids": to_ids}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query(session, 'edit/{}'.format(attribute_uuid), query, controller='attributes')
|
|
|
|
|
return self.__query('edit/{}'.format(attribute_uuid), query, controller='attributes')
|
|
|
|
|
|
|
|
|
|
# ##############################
|
|
|
|
|
# ###### Attribute update ######
|
|
|
|
@ -891,27 +870,24 @@ class PyMISP(object):
|
|
|
|
|
query['adhereToWarninglists'] = adhereToWarninglists
|
|
|
|
|
if distribution is not None:
|
|
|
|
|
query['distribution'] = distribution
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query(session, 'freeTextImport/{}'.format(event_id), query, controller='events')
|
|
|
|
|
return self.__query('freeTextImport/{}'.format(event_id), query, controller='events')
|
|
|
|
|
|
|
|
|
|
# ##############################
|
|
|
|
|
# ######## REST Search #########
|
|
|
|
|
# ##############################
|
|
|
|
|
|
|
|
|
|
def __query(self, session, path, query, controller='events', async_callback=None):
|
|
|
|
|
def __query(self, path, query, controller='events', async_callback=None):
|
|
|
|
|
"""Helper to prepare a search query"""
|
|
|
|
|
if query.get('error') is not None:
|
|
|
|
|
return query
|
|
|
|
|
if controller not in ['events', 'attributes']:
|
|
|
|
|
raise Exception('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes'])))
|
|
|
|
|
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
|
|
|
|
|
logger.debug('URL: %s', url)
|
|
|
|
|
logger.debug('Query: %s', query)
|
|
|
|
|
|
|
|
|
|
if ASYNC_OK and isinstance(session, FuturesSession) and async_callback:
|
|
|
|
|
response = session.post(url, data=json.dumps(query), background_callback=async_callback)
|
|
|
|
|
if ASYNC_OK and async_callback:
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(query), async_callback)
|
|
|
|
|
else:
|
|
|
|
|
response = session.post(url, data=json.dumps(query))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(query))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
|
|
|
|
@ -957,13 +933,12 @@ class PyMISP(object):
|
|
|
|
|
if not set(param).issubset(rule_levels[rule]):
|
|
|
|
|
raise SearchError('Values in your {} are invalid, has to be in {}'.format(rule, ', '.join(str(x) for x in rule_levels[rule])))
|
|
|
|
|
to_post[rule] = '|'.join(str(x) for x in param)
|
|
|
|
|
session = self.__prepare_session(async_implemented=(async_callback is not None))
|
|
|
|
|
url = urljoin(self.root_url, buildup_url)
|
|
|
|
|
|
|
|
|
|
if self.asynch and async_callback:
|
|
|
|
|
response = session.post(url, data=json.dumps(to_post), background_callback=async_callback)
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post), async_callback)
|
|
|
|
|
else:
|
|
|
|
|
response = session.post(url, data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
res = self._check_response(response)
|
|
|
|
|
if normalize:
|
|
|
|
|
to_return = {'response': []}
|
|
|
|
@ -976,8 +951,7 @@ class PyMISP(object):
|
|
|
|
|
def search_all(self, value):
|
|
|
|
|
"""Search a value in the whole database"""
|
|
|
|
|
query = {'value': value, 'searchall': 1}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
return self.__query(session, 'restSearch/download', query)
|
|
|
|
|
return self.__query('restSearch/download', query)
|
|
|
|
|
|
|
|
|
|
def __prepare_rest_search(self, values, not_values):
|
|
|
|
|
"""Prepare a search, generate the chain processed by the server
|
|
|
|
@ -1087,8 +1061,7 @@ class PyMISP(object):
|
|
|
|
|
raise SearchError('Unused parameter: {}'.format(', '.join(kwargs.keys())))
|
|
|
|
|
|
|
|
|
|
# Create a session, make it async if and only if we have a callback
|
|
|
|
|
session = self.__prepare_session(async_implemented=(async_callback is not None))
|
|
|
|
|
return self.__query(session, 'restSearch/download', query, controller, async_callback)
|
|
|
|
|
return self.__query('restSearch/download', query, controller, async_callback)
|
|
|
|
|
|
|
|
|
|
def get_attachment(self, attribute_id):
|
|
|
|
|
"""Get an attachement (not a malware sample) by attribute ID.
|
|
|
|
@ -1096,9 +1069,8 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
:param attribute_id: Attribute ID to fetched
|
|
|
|
|
"""
|
|
|
|
|
attach = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(attribute_id))
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
response = session.get(attach)
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/downloadAttachment/download/{}'.format(attribute_id))
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
try:
|
|
|
|
|
response.json()
|
|
|
|
|
# The query fails, response contains a json blob
|
|
|
|
@ -1109,9 +1081,9 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def get_yara(self, event_id):
|
|
|
|
|
"""Get the yara rules from an event"""
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/restSearch')
|
|
|
|
|
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
response = session.post(urljoin(self.root_url, 'attributes/restSearch'), data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
|
|
|
|
|
result = self._check_response(response)
|
|
|
|
|
if result.get('error') is not None:
|
|
|
|
|
return False, result.get('error')
|
|
|
|
@ -1122,9 +1094,9 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def download_samples(self, sample_hash=None, event_id=None, all_samples=False):
|
|
|
|
|
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch"""
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/downloadSample')
|
|
|
|
|
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
response = session.post(urljoin(self.root_url, 'attributes/downloadSample'), data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
|
|
|
|
|
result = self._check_response(response)
|
|
|
|
|
if result.get('error') is not None:
|
|
|
|
|
return False, result.get('error')
|
|
|
|
@ -1160,9 +1132,8 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def get_all_tags(self, quiet=False):
|
|
|
|
|
"""Get all the tags used on the instance"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'tags')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
r = self._check_response(response)
|
|
|
|
|
if not quiet or r.get('errors'):
|
|
|
|
|
return r
|
|
|
|
@ -1175,9 +1146,8 @@ class PyMISP(object):
|
|
|
|
|
def new_tag(self, name=None, colour="#00ace6", exportable=False):
|
|
|
|
|
"""Create a new tag"""
|
|
|
|
|
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable}}
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'tags/add')
|
|
|
|
|
response = session.post(url, data=json.dumps(to_post))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ########## Version ##########
|
|
|
|
@ -1197,16 +1167,14 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def get_recommended_api_version(self):
|
|
|
|
|
"""Returns the recommended API version from the server"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_version(self):
|
|
|
|
|
"""Returns the version of the instance."""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'servers/getVersion.json')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_version_master(self):
|
|
|
|
@ -1222,19 +1190,17 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def get_attributes_statistics(self, context='type', percentage=None):
|
|
|
|
|
"""Get attributes statistics from the MISP instance"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if (context != 'category'):
|
|
|
|
|
context = 'type'
|
|
|
|
|
if percentage is not None:
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
|
|
|
|
|
else:
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_tags_statistics(self, percentage=None, name_sort=None):
|
|
|
|
|
"""Get tags statistics from the MISP instance"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if percentage is not None:
|
|
|
|
|
percentage = 'true'
|
|
|
|
|
else:
|
|
|
|
@ -1244,32 +1210,29 @@ class PyMISP(object):
|
|
|
|
|
else:
|
|
|
|
|
name_sort = 'false'
|
|
|
|
|
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ############## Sightings ##################
|
|
|
|
|
|
|
|
|
|
def sighting_per_id(self, attribute_id):
|
|
|
|
|
"""Add a sighting to an attribute (by attribute ID)"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def sighting_per_uuid(self, attribute_uuid):
|
|
|
|
|
"""Add a sighting to an attribute (by attribute UUID)"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def set_sightings(self, sightings):
|
|
|
|
|
"""Push a sighting (python dictionary)"""
|
|
|
|
|
if isinstance(sightings, dict):
|
|
|
|
|
sightings = json.dumps(sightings)
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'sightings/add/')
|
|
|
|
|
response = session.post(url, data=sightings)
|
|
|
|
|
response = self.__prepare_request('POST', url, sightings)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def sighting_per_json(self, json_file):
|
|
|
|
@ -1282,9 +1245,8 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def get_sharing_groups(self):
|
|
|
|
|
"""Get the existing sharing groups"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'sharing_groups.json')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['response']
|
|
|
|
|
|
|
|
|
|
# ############## Users ##################
|
|
|
|
@ -1330,57 +1292,49 @@ class PyMISP(object):
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
def get_users_list(self):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['response']
|
|
|
|
|
|
|
|
|
|
def get_user(self, user_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/view/{}'.format(user_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_user(self, email, org_id, role_id, **kwargs):
|
|
|
|
|
new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/add/')
|
|
|
|
|
response = session.post(url, data=json.dumps(new_user))
|
|
|
|
|
new_user = self._set_user_parameters(**dict(email=email, org_id=org_id, role_id=role_id, **kwargs))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(new_user))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_user_json(self, json_file):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/add/')
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_user_fields_list(self):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/add/')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_user(self, user_id, **kwargs):
|
|
|
|
|
edit_user = self._set_user_parameters(**kwargs)
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(edit_user))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(edit_user))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_user_json(self, json_file, user_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def delete_user(self, user_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/users/delete/{}'.format(user_id))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ############## Organisations ##################
|
|
|
|
@ -1406,64 +1360,56 @@ class PyMISP(object):
|
|
|
|
|
return organisation
|
|
|
|
|
|
|
|
|
|
def get_organisations_list(self, scope="local"):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
scope = scope.lower()
|
|
|
|
|
if scope not in ["local", "external", "all"]:
|
|
|
|
|
raise ValueError("Authorized fields are 'local','external' or 'all'")
|
|
|
|
|
url = urljoin(self.root_url, 'organisations/index/scope:{}'.format(scope))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['response']
|
|
|
|
|
|
|
|
|
|
def get_organisation(self, organisation_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'organisations/view/{}'.format(organisation_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_organisation(self, name, **kwargs):
|
|
|
|
|
new_org = self._set_organisation_parameters(**dict(name=name, **kwargs))
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if 'local' in new_org:
|
|
|
|
|
if new_org.get('local') is False:
|
|
|
|
|
if 'uuid' not in new_org:
|
|
|
|
|
raise PyMISPError('A remote org MUST have a valid uuid')
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
|
|
|
|
response = session.post(url, data=json.dumps(new_org))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(new_org))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_organisation_json(self, json_file):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_organisation_fields_list(self):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_organisation(self, org_id, **kwargs):
|
|
|
|
|
edit_org = self._set_organisation_parameters(**kwargs)
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(edit_org))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(edit_org))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_organisation_json(self, json_file, org_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def delete_organisation(self, org_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'admin/organisations/delete/{}'.format(org_id))
|
|
|
|
|
response = session.post(url)
|
|
|
|
|
response = self.__prepare_request('POST', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ############## Servers ##################
|
|
|
|
@ -1526,17 +1472,15 @@ class PyMISP(object):
|
|
|
|
|
new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
|
|
|
|
|
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
|
|
|
|
submitted_client_cert, None, None)
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'servers/add')
|
|
|
|
|
response = session.post(url, data=json.dumps(new_server))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(new_server))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_server_json(self, json_file):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'servers/add')
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False,
|
|
|
|
@ -1545,35 +1489,31 @@ class PyMISP(object):
|
|
|
|
|
new_server = self._set_server_parameters(url, name, authkey, organisation, internal,
|
|
|
|
|
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
|
|
|
|
submitted_client_cert, delete_cert, delete_client_cert)
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(new_server))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(new_server))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def edit_server_json(self, json_file, server_id):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
with open(json_file, 'r') as f:
|
|
|
|
|
jdata = json.load(f)
|
|
|
|
|
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
|
|
|
|
response = session.post(url, data=json.dumps(jdata))
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
# ############## Roles ##################
|
|
|
|
|
|
|
|
|
|
def get_roles_list(self):
|
|
|
|
|
"""Get the list of existing roles"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, '/roles')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['response']
|
|
|
|
|
|
|
|
|
|
# ############## Tags ##################
|
|
|
|
|
|
|
|
|
|
def get_tags_list(self):
|
|
|
|
|
"""Get the list of existing tags"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, '/tags')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['Tag']
|
|
|
|
|
|
|
|
|
|
# ##############################################
|
|
|
|
@ -1584,9 +1524,8 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def download_all_suricata(self):
|
|
|
|
|
"""Download all suricata rules events."""
|
|
|
|
|
suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download')
|
|
|
|
|
session = self.__prepare_session('rules')
|
|
|
|
|
response = session.get(suricata_rules)
|
|
|
|
|
url = urljoin(self.root_url, 'events/nids/suricata/download')
|
|
|
|
|
response = self.__prepare_request('GET', url, output_type='rules')
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def download_suricata_rule_event(self, event_id):
|
|
|
|
@ -1594,18 +1533,16 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
:param event_id: ID of the event to download (same as get)
|
|
|
|
|
"""
|
|
|
|
|
template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
|
|
|
|
session = self.__prepare_session('rules')
|
|
|
|
|
response = session.get(template)
|
|
|
|
|
url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
|
|
|
|
response = self.__prepare_request('GET', url, output_type='rules')
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
# ############## Text ###############
|
|
|
|
|
|
|
|
|
|
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
|
|
|
|
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
|
|
|
|
session = self.__prepare_session('txt')
|
|
|
|
|
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url, output_type='txt')
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
# ############## STIX ##############
|
|
|
|
@ -1615,12 +1552,10 @@ class PyMISP(object):
|
|
|
|
|
if tags:
|
|
|
|
|
if isinstance(tags, list):
|
|
|
|
|
tags = "&&".join(tags)
|
|
|
|
|
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
|
|
|
|
|
event_id, with_attachments, tags, from_date, to_date))
|
|
|
|
|
logger.debug("Getting STIX event from %s", url)
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_stix(self, **kwargs):
|
|
|
|
@ -1632,58 +1567,50 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def fetch_feed(self, feed_id):
|
|
|
|
|
"""Fetch one single feed"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def view_feeds(self):
|
|
|
|
|
"""Get the content of all the feeds"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def view_feed(self, feed_ids):
|
|
|
|
|
"""Get the content of a single feed"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/view/{}'.format(feed_ids))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def cache_feeds_all(self):
|
|
|
|
|
""" Cache all the feeds"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/all')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def cache_feed(self, feed_id):
|
|
|
|
|
"""Cache a specific feed"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id))
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def cache_feeds_freetext(self):
|
|
|
|
|
"""Cache all the freetext feeds"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def cache_feeds_misp(self):
|
|
|
|
|
"""Cache all the MISP feeds"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/misp')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def compare_feeds(self):
|
|
|
|
|
"""Generate the comparison matrix for all the MISP feeds"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'feeds/compareFeeds')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def cache_all_feeds(self):
|
|
|
|
@ -1697,23 +1624,20 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
def add_object(self, event_id, template_id, misp_object):
|
|
|
|
|
"""Add an object"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
|
|
|
|
|
response = session.post(url, data=misp_object.to_json())
|
|
|
|
|
response = self.__prepare_request('POST', url, misp_object.to_json())
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def add_object_reference(self, misp_object_reference):
|
|
|
|
|
"""Add a reference to an object"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'object_references/add')
|
|
|
|
|
response = session.post(url, data=misp_object_reference.to_json())
|
|
|
|
|
response = self.__prepare_request('POST', url, misp_object_reference.to_json())
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
def get_object_templates_list(self):
|
|
|
|
|
"""Returns the list of Object templates available on the MISP instance"""
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
url = urljoin(self.root_url, 'objectTemplates')
|
|
|
|
|
response = session.get(url)
|
|
|
|
|
response = self.__prepare_request('GET', url)
|
|
|
|
|
return self._check_response(response)['response']
|
|
|
|
|
|
|
|
|
|
def get_object_template_id(self, object_uuid):
|
|
|
|
@ -1730,7 +1654,6 @@ class PyMISP(object):
|
|
|
|
|
|
|
|
|
|
@deprecated
|
|
|
|
|
def add_tag(self, event, tag, attribute=False):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if attribute:
|
|
|
|
|
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
|
|
|
|
path = 'attributes/addTag'
|
|
|
|
@ -1740,17 +1663,18 @@ class PyMISP(object):
|
|
|
|
|
event = event["Event"]
|
|
|
|
|
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
|
|
|
|
path = 'events/addTag'
|
|
|
|
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
|
|
|
url = urljoin(self.root_url, path)
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|
|
|
|
|
|
@deprecated
|
|
|
|
|
def remove_tag(self, event, tag, attribute=False):
|
|
|
|
|
session = self.__prepare_session()
|
|
|
|
|
if attribute:
|
|
|
|
|
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
|
|
|
|
path = 'attributes/removeTag'
|
|
|
|
|
else:
|
|
|
|
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
|
|
|
|
path = 'events/removeTag'
|
|
|
|
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
|
|
|
url = urljoin(self.root_url, path)
|
|
|
|
|
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
|
|
|
|
return self._check_response(response)
|
|
|
|
|