mirror of https://github.com/MISP/PyMISP
new: Reworking the REST API (WiP)
parent
785423558b
commit
fcb83f7318
|
@ -2,6 +2,7 @@ __version__ = '2.4.93'
|
||||||
import logging
|
import logging
|
||||||
import functools
|
import functools
|
||||||
import warnings
|
import warnings
|
||||||
|
import sys
|
||||||
|
|
||||||
FORMAT = "%(levelname)s [%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s"
|
FORMAT = "%(levelname)s [%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s"
|
||||||
formatter = logging.Formatter(FORMAT)
|
formatter = logging.Formatter(FORMAT)
|
||||||
|
@ -31,9 +32,9 @@ def deprecated(func):
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat # noqa
|
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError # noqa
|
||||||
from .api import PyMISP # noqa
|
from .api import PyMISP # noqa
|
||||||
from .abstract import AbstractMISP, MISPEncode, MISPTag # noqa
|
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
|
||||||
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
|
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
|
||||||
from .tools import AbstractMISPObjectGenerator # noqa
|
from .tools import AbstractMISPObjectGenerator # noqa
|
||||||
from .tools import Neo4j # noqa
|
from .tools import Neo4j # noqa
|
||||||
|
@ -41,6 +42,8 @@ try:
|
||||||
from .tools import openioc # noqa
|
from .tools import openioc # noqa
|
||||||
from .tools import load_warninglists # noqa
|
from .tools import load_warninglists # noqa
|
||||||
from .tools import ext_lookups # noqa
|
from .tools import ext_lookups # noqa
|
||||||
|
if sys.version_info >= (3, 6):
|
||||||
|
from .aping import ExpandedPyMISP # noqa
|
||||||
logger.debug('pymisp loaded properly')
|
logger.debug('pymisp loaded properly')
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
logger.warning('Unable to load pymisp properly: {}'.format(e))
|
logger.warning('Unable to load pymisp properly: {}'.format(e))
|
||||||
|
|
|
@ -9,6 +9,7 @@ from json import JSONEncoder
|
||||||
import collections
|
import collections
|
||||||
import six # Remove that import when discarding python2 support.
|
import six # Remove that import when discarding python2 support.
|
||||||
import logging
|
import logging
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
from .exceptions import PyMISPInvalidFormat
|
from .exceptions import PyMISPInvalidFormat
|
||||||
|
|
||||||
|
@ -34,6 +35,28 @@ if six.PY2:
|
||||||
return timedelta(0)
|
return timedelta(0)
|
||||||
|
|
||||||
|
|
||||||
|
class Distribution(Enum):
|
||||||
|
your_organisation_only = 0
|
||||||
|
this_community_only = 1
|
||||||
|
connected_communities = 2
|
||||||
|
all_communities = 3
|
||||||
|
sharing_group = 4
|
||||||
|
inherit = 5
|
||||||
|
|
||||||
|
|
||||||
|
class ThreatLevel(Enum):
|
||||||
|
high = 1
|
||||||
|
medium = 2
|
||||||
|
low = 3
|
||||||
|
undefined = 4
|
||||||
|
|
||||||
|
|
||||||
|
class Analysis(Enum):
|
||||||
|
initial = 0
|
||||||
|
ongoing = 1
|
||||||
|
completed = 2
|
||||||
|
|
||||||
|
|
||||||
class MISPEncode(JSONEncoder):
|
class MISPEncode(JSONEncoder):
|
||||||
|
|
||||||
def default(self, obj):
|
def default(self, obj):
|
||||||
|
@ -41,6 +64,8 @@ class MISPEncode(JSONEncoder):
|
||||||
return obj.jsonable()
|
return obj.jsonable()
|
||||||
elif isinstance(obj, datetime.datetime):
|
elif isinstance(obj, datetime.datetime):
|
||||||
return obj.isoformat()
|
return obj.isoformat()
|
||||||
|
elif isinstance(obj, Enum):
|
||||||
|
return obj.value
|
||||||
return JSONEncoder.default(self, obj)
|
return JSONEncoder.default(self, obj)
|
||||||
|
|
||||||
|
|
||||||
|
|
193
pymisp/api.py
193
pymisp/api.py
|
@ -122,7 +122,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def get_live_query_acl(self):
|
def get_live_query_acl(self):
|
||||||
"""This should return an empty list, unless the ACL is outdated."""
|
"""This should return an empty list, unless the ACL is outdated."""
|
||||||
response = self.__prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json'))
|
response = self._prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json'))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_local_describe_types(self):
|
def get_local_describe_types(self):
|
||||||
|
@ -131,7 +131,7 @@ class PyMISP(object):
|
||||||
return describe_types['result']
|
return describe_types['result']
|
||||||
|
|
||||||
def get_live_describe_types(self):
|
def get_live_describe_types(self):
|
||||||
response = self.__prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
|
response = self._prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json'))
|
||||||
describe_types = self._check_response(response)
|
describe_types = self._check_response(response)
|
||||||
if describe_types.get('error'):
|
if describe_types.get('error'):
|
||||||
for e in describe_types.get('error'):
|
for e in describe_types.get('error'):
|
||||||
|
@ -141,8 +141,8 @@ class PyMISP(object):
|
||||||
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
|
raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.')
|
||||||
return describe_types
|
return describe_types
|
||||||
|
|
||||||
def __prepare_request(self, request_type, url, data=None,
|
def _prepare_request(self, request_type, url, data=None,
|
||||||
background_callback=None, output_type='json'):
|
background_callback=None, output_type='json'):
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
logger.debug('{} - {}'.format(request_type, url))
|
logger.debug('{} - {}'.format(request_type, url))
|
||||||
if data is not None:
|
if data is not None:
|
||||||
|
@ -152,21 +152,22 @@ class PyMISP(object):
|
||||||
else:
|
else:
|
||||||
req = requests.Request(request_type, url, data=data)
|
req = requests.Request(request_type, url, data=data)
|
||||||
if self.asynch and background_callback is not None:
|
if self.asynch and background_callback is not None:
|
||||||
s = FuturesSession()
|
local_session = FuturesSession
|
||||||
else:
|
else:
|
||||||
s = requests.Session()
|
local_session = requests.Session
|
||||||
prepped = s.prepare_request(req)
|
with local_session() as s:
|
||||||
prepped.headers.update(
|
prepped = s.prepare_request(req)
|
||||||
{'Authorization': self.key,
|
prepped.headers.update(
|
||||||
'Accept': 'application/{}'.format(output_type),
|
{'Authorization': self.key,
|
||||||
'content-type': 'application/{}'.format(output_type),
|
'Accept': 'application/{}'.format(output_type),
|
||||||
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
|
'content-type': 'application/{}'.format(output_type),
|
||||||
if logger.isEnabledFor(logging.DEBUG):
|
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
|
||||||
logger.debug(prepped.headers)
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
if self.asynch and background_callback is not None:
|
logger.debug(prepped.headers)
|
||||||
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
|
if self.asynch and background_callback is not None:
|
||||||
else:
|
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback)
|
||||||
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
|
else:
|
||||||
|
return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert)
|
||||||
|
|
||||||
# #####################
|
# #####################
|
||||||
# ### Core helpers ####
|
# ### Core helpers ####
|
||||||
|
@ -314,9 +315,9 @@ class PyMISP(object):
|
||||||
"""
|
"""
|
||||||
url = urljoin(self.root_url, 'events/index')
|
url = urljoin(self.root_url, 'events/index')
|
||||||
if filters is None:
|
if filters is None:
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
else:
|
else:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(filters))
|
response = self._prepare_request('POST', url, json.dumps(filters))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_event(self, event_id):
|
def get_event(self, event_id):
|
||||||
|
@ -325,7 +326,7 @@ class PyMISP(object):
|
||||||
:param event_id: Event id to get
|
:param event_id: Event id to get
|
||||||
"""
|
"""
|
||||||
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_event(self, event):
|
def add_event(self, event):
|
||||||
|
@ -338,7 +339,7 @@ class PyMISP(object):
|
||||||
event = event.to_json()
|
event = event.to_json()
|
||||||
elif not isinstance(event, basestring):
|
elif not isinstance(event, basestring):
|
||||||
event = json.dumps(event)
|
event = json.dumps(event)
|
||||||
response = self.__prepare_request('POST', url, event)
|
response = self._prepare_request('POST', url, event)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def update_attribute(self, attribute_id, attribute):
|
def update_attribute(self, attribute_id, attribute):
|
||||||
|
@ -352,7 +353,7 @@ class PyMISP(object):
|
||||||
attribute = attribute.to_json()
|
attribute = attribute.to_json()
|
||||||
elif not isinstance(attribute, basestring):
|
elif not isinstance(attribute, basestring):
|
||||||
attribute = json.dumps(attribute)
|
attribute = json.dumps(attribute)
|
||||||
response = self.__prepare_request('POST', url, attribute)
|
response = self._prepare_request('POST', url, attribute)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def update_event(self, event_id, event):
|
def update_event(self, event_id, event):
|
||||||
|
@ -366,7 +367,7 @@ class PyMISP(object):
|
||||||
event = event.to_json()
|
event = event.to_json()
|
||||||
elif not isinstance(event, basestring):
|
elif not isinstance(event, basestring):
|
||||||
event = json.dumps(event)
|
event = json.dumps(event)
|
||||||
response = self.__prepare_request('POST', url, event)
|
response = self._prepare_request('POST', url, event)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def delete_event(self, event_id):
|
def delete_event(self, event_id):
|
||||||
|
@ -375,7 +376,7 @@ class PyMISP(object):
|
||||||
:param event_id: Event id to delete
|
:param event_id: Event id to delete
|
||||||
"""
|
"""
|
||||||
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/{}'.format(event_id))
|
||||||
response = self.__prepare_request('DELETE', url)
|
response = self._prepare_request('DELETE', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def delete_attribute(self, attribute_id, hard_delete=False):
|
def delete_attribute(self, attribute_id, hard_delete=False):
|
||||||
|
@ -384,13 +385,13 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
|
url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id))
|
||||||
else:
|
else:
|
||||||
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
|
url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def pushEventToZMQ(self, event_id):
|
def pushEventToZMQ(self, event_id):
|
||||||
"""Force push an event on ZMQ"""
|
"""Force push an event on ZMQ"""
|
||||||
url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id))
|
url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ##############################################
|
# ##############################################
|
||||||
|
@ -419,7 +420,7 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'events/publish/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/publish/{}'.format(event_id))
|
||||||
else:
|
else:
|
||||||
url = urljoin(self.root_url, 'events/alert/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/alert/{}'.format(event_id))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def publish(self, event, alert=True):
|
def publish(self, event, alert=True):
|
||||||
|
@ -467,7 +468,7 @@ class PyMISP(object):
|
||||||
raise PyMISPError('Invalid UUID')
|
raise PyMISPError('Invalid UUID')
|
||||||
url = urljoin(self.root_url, 'tags/attachTagToObject')
|
url = urljoin(self.root_url, 'tags/attachTagToObject')
|
||||||
to_post = {'uuid': uuid, 'tag': tag}
|
to_post = {'uuid': uuid, 'tag': tag}
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def untag(self, uuid, tag):
|
def untag(self, uuid, tag):
|
||||||
|
@ -476,7 +477,7 @@ class PyMISP(object):
|
||||||
raise PyMISPError('Invalid UUID')
|
raise PyMISPError('Invalid UUID')
|
||||||
url = urljoin(self.root_url, 'tags/removeTagFromObject')
|
url = urljoin(self.root_url, 'tags/removeTagFromObject')
|
||||||
to_post = {'uuid': uuid, 'tag': tag}
|
to_post = {'uuid': uuid, 'tag': tag}
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ##### File attributes #####
|
# ##### File attributes #####
|
||||||
|
@ -519,8 +520,8 @@ class PyMISP(object):
|
||||||
data = attributes[0].to_json()
|
data = attributes[0].to_json()
|
||||||
else:
|
else:
|
||||||
data = attributes.to_json()
|
data = attributes.to_json()
|
||||||
# __prepare_request(...) returns a requests.Response Object
|
# _prepare_request(...) returns a requests.Response Object
|
||||||
resp = self.__prepare_request('POST', url, json.dumps(data, cls=MISPEncode))
|
resp = self._prepare_request('POST', url, json.dumps(data, cls=MISPEncode))
|
||||||
try:
|
try:
|
||||||
responses.append(resp.json())
|
responses.append(resp.json())
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -908,7 +909,7 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'events/upload_sample')
|
url = urljoin(self.root_url, 'events/upload_sample')
|
||||||
else:
|
else:
|
||||||
url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id))
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############################
|
# ############################
|
||||||
|
@ -920,11 +921,11 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id))
|
||||||
if path in ['add', 'edit']:
|
if path in ['add', 'edit']:
|
||||||
query = {'request': {'ShadowAttribute': attribute}}
|
query = {'request': {'ShadowAttribute': attribute}}
|
||||||
response = self.__prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
|
response = self._prepare_request('POST', url, json.dumps(query, cls=MISPEncode))
|
||||||
elif path == 'view':
|
elif path == 'view':
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
else: # accept or discard
|
else: # accept or discard
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def proposal_view(self, event_id=None, proposal_id=None):
|
def proposal_view(self, event_id=None, proposal_id=None):
|
||||||
|
@ -1000,9 +1001,9 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
|
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
|
||||||
|
|
||||||
if ASYNC_OK and async_callback:
|
if ASYNC_OK and async_callback:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(query), async_callback)
|
response = self._prepare_request('POST', url, json.dumps(query), async_callback)
|
||||||
else:
|
else:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(query))
|
response = self._prepare_request('POST', url, json.dumps(query))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
|
def search_index(self, published=None, eventid=None, tag=None, datefrom=None,
|
||||||
|
@ -1055,9 +1056,9 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, buildup_url)
|
url = urljoin(self.root_url, buildup_url)
|
||||||
|
|
||||||
if self.asynch and async_callback:
|
if self.asynch and async_callback:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post), async_callback)
|
response = self._prepare_request('POST', url, json.dumps(to_post), async_callback)
|
||||||
else:
|
else:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
res = self._check_response(response)
|
res = self._check_response(response)
|
||||||
if normalize:
|
if normalize:
|
||||||
to_return = {'response': []}
|
to_return = {'response': []}
|
||||||
|
@ -1188,7 +1189,7 @@ class PyMISP(object):
|
||||||
:param attribute_id: Attribute ID to fetched
|
:param attribute_id: Attribute ID to fetched
|
||||||
"""
|
"""
|
||||||
url = urljoin(self.root_url, 'attributes/download/{}'.format(attribute_id))
|
url = urljoin(self.root_url, 'attributes/download/{}'.format(attribute_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
try:
|
try:
|
||||||
response.json()
|
response.json()
|
||||||
# The query fails, response contains a json blob
|
# The query fails, response contains a json blob
|
||||||
|
@ -1201,7 +1202,7 @@ class PyMISP(object):
|
||||||
"""Get the yara rules from an event"""
|
"""Get the yara rules from an event"""
|
||||||
url = urljoin(self.root_url, 'attributes/restSearch')
|
url = urljoin(self.root_url, 'attributes/restSearch')
|
||||||
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
|
to_post = {'request': {'eventid': event_id, 'type': 'yara'}}
|
||||||
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
|
response = self._prepare_request('POST', url, data=json.dumps(to_post))
|
||||||
result = self._check_response(response)
|
result = self._check_response(response)
|
||||||
if result.get('error') is not None:
|
if result.get('error') is not None:
|
||||||
return False, result.get('error')
|
return False, result.get('error')
|
||||||
|
@ -1214,7 +1215,7 @@ class PyMISP(object):
|
||||||
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch"""
|
"""Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch"""
|
||||||
url = urljoin(self.root_url, 'attributes/downloadSample')
|
url = urljoin(self.root_url, 'attributes/downloadSample')
|
||||||
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
|
to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}}
|
||||||
response = self.__prepare_request('POST', url, data=json.dumps(to_post))
|
response = self._prepare_request('POST', url, data=json.dumps(to_post))
|
||||||
result = self._check_response(response)
|
result = self._check_response(response)
|
||||||
if result.get('error') is not None:
|
if result.get('error') is not None:
|
||||||
return False, result.get('error')
|
return False, result.get('error')
|
||||||
|
@ -1281,7 +1282,7 @@ class PyMISP(object):
|
||||||
def get_all_tags(self, quiet=False):
|
def get_all_tags(self, quiet=False):
|
||||||
"""Get all the tags used on the instance"""
|
"""Get all the tags used on the instance"""
|
||||||
url = urljoin(self.root_url, 'tags')
|
url = urljoin(self.root_url, 'tags')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
r = self._check_response(response)
|
r = self._check_response(response)
|
||||||
if not quiet or r.get('errors'):
|
if not quiet or r.get('errors'):
|
||||||
return r
|
return r
|
||||||
|
@ -1295,7 +1296,7 @@ class PyMISP(object):
|
||||||
"""Create a new tag"""
|
"""Create a new tag"""
|
||||||
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable, 'hide_tag': hide_tag}}
|
to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable, 'hide_tag': hide_tag}}
|
||||||
url = urljoin(self.root_url, 'tags/add')
|
url = urljoin(self.root_url, 'tags/add')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ########## Version ##########
|
# ########## Version ##########
|
||||||
|
@ -1316,13 +1317,13 @@ class PyMISP(object):
|
||||||
def get_recommended_api_version(self):
|
def get_recommended_api_version(self):
|
||||||
"""Returns the recommended API version from the server"""
|
"""Returns the recommended API version from the server"""
|
||||||
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
|
url = urljoin(self.root_url, 'servers/getPyMISPVersion.json')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_version(self):
|
def get_version(self):
|
||||||
"""Returns the version of the instance."""
|
"""Returns the version of the instance."""
|
||||||
url = urljoin(self.root_url, 'servers/getVersion.json')
|
url = urljoin(self.root_url, 'servers/getVersion.json')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_version_master(self):
|
def get_version_master(self):
|
||||||
|
@ -1344,7 +1345,7 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
|
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage))
|
||||||
else:
|
else:
|
||||||
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
|
url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_tags_statistics(self, percentage=None, name_sort=None):
|
def get_tags_statistics(self, percentage=None, name_sort=None):
|
||||||
|
@ -1358,7 +1359,7 @@ class PyMISP(object):
|
||||||
else:
|
else:
|
||||||
name_sort = 'false'
|
name_sort = 'false'
|
||||||
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
|
url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############## Sightings ##################
|
# ############## Sightings ##################
|
||||||
|
@ -1366,13 +1367,13 @@ class PyMISP(object):
|
||||||
def sighting_per_id(self, attribute_id):
|
def sighting_per_id(self, attribute_id):
|
||||||
"""Add a sighting to an attribute (by attribute ID)"""
|
"""Add a sighting to an attribute (by attribute ID)"""
|
||||||
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
|
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sighting_per_uuid(self, attribute_uuid):
|
def sighting_per_uuid(self, attribute_uuid):
|
||||||
"""Add a sighting to an attribute (by attribute UUID)"""
|
"""Add a sighting to an attribute (by attribute UUID)"""
|
||||||
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
|
url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def set_sightings(self, sightings):
|
def set_sightings(self, sightings):
|
||||||
|
@ -1385,7 +1386,7 @@ class PyMISP(object):
|
||||||
elif isinstance(sighting, dict):
|
elif isinstance(sighting, dict):
|
||||||
to_post = json.dumps(sighting)
|
to_post = json.dumps(sighting)
|
||||||
url = urljoin(self.root_url, 'sightings/add/')
|
url = urljoin(self.root_url, 'sightings/add/')
|
||||||
response = self.__prepare_request('POST', url, to_post)
|
response = self._prepare_request('POST', url, to_post)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sighting_per_json(self, json_file):
|
def sighting_per_json(self, json_file):
|
||||||
|
@ -1435,7 +1436,7 @@ class PyMISP(object):
|
||||||
org_id = ""
|
org_id = ""
|
||||||
uri = 'sightings/listSightings/{}/{}/{}'.format(element_id, scope, org_id)
|
uri = 'sightings/listSightings/{}/{}/{}'.format(element_id, scope, org_id)
|
||||||
url = urljoin(self.root_url, uri)
|
url = urljoin(self.root_url, uri)
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############## Sharing Groups ##################
|
# ############## Sharing Groups ##################
|
||||||
|
@ -1443,7 +1444,7 @@ class PyMISP(object):
|
||||||
def get_sharing_groups(self):
|
def get_sharing_groups(self):
|
||||||
"""Get the existing sharing groups"""
|
"""Get the existing sharing groups"""
|
||||||
url = urljoin(self.root_url, 'sharing_groups.json')
|
url = urljoin(self.root_url, 'sharing_groups.json')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)['response']
|
return self._check_response(response)['response']
|
||||||
|
|
||||||
# ############## Users ##################
|
# ############## Users ##################
|
||||||
|
@ -1590,14 +1591,14 @@ class PyMISP(object):
|
||||||
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
||||||
submitted_client_cert, None, None)
|
submitted_client_cert, None, None)
|
||||||
url = urljoin(self.root_url, 'servers/add')
|
url = urljoin(self.root_url, 'servers/add')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(new_server))
|
response = self._prepare_request('POST', url, json.dumps(new_server))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_server_json(self, json_file):
|
def add_server_json(self, json_file):
|
||||||
with open(json_file, 'rb') as f:
|
with open(json_file, 'rb') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'servers/add')
|
url = urljoin(self.root_url, 'servers/add')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
response = self._prepare_request('POST', url, json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False,
|
def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False,
|
||||||
|
@ -1607,14 +1608,14 @@ class PyMISP(object):
|
||||||
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
push, pull, self_signed, push_rules, pull_rules, submitted_cert,
|
||||||
submitted_client_cert, delete_cert, delete_client_cert)
|
submitted_client_cert, delete_cert, delete_client_cert)
|
||||||
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
||||||
response = self.__prepare_request('POST', url, json.dumps(new_server))
|
response = self._prepare_request('POST', url, json.dumps(new_server))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def edit_server_json(self, json_file, server_id):
|
def edit_server_json(self, json_file, server_id):
|
||||||
with open(json_file, 'rb') as f:
|
with open(json_file, 'rb') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
||||||
response = self.__prepare_request('POST', url, json.dumps(jdata))
|
response = self._prepare_request('POST', url, json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############## Roles ##################
|
# ############## Roles ##################
|
||||||
|
@ -1622,7 +1623,7 @@ class PyMISP(object):
|
||||||
def get_roles_list(self):
|
def get_roles_list(self):
|
||||||
"""Get the list of existing roles"""
|
"""Get the list of existing roles"""
|
||||||
url = urljoin(self.root_url, '/roles')
|
url = urljoin(self.root_url, '/roles')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)['response']
|
return self._check_response(response)['response']
|
||||||
|
|
||||||
# ############## Tags ##################
|
# ############## Tags ##################
|
||||||
|
@ -1630,43 +1631,43 @@ class PyMISP(object):
|
||||||
def get_tags_list(self):
|
def get_tags_list(self):
|
||||||
"""Get the list of existing tags"""
|
"""Get the list of existing tags"""
|
||||||
url = urljoin(self.root_url, '/tags')
|
url = urljoin(self.root_url, '/tags')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)['Tag']
|
return self._check_response(response)['Tag']
|
||||||
|
|
||||||
# ############## Taxonomies ##################
|
# ############## Taxonomies ##################
|
||||||
|
|
||||||
def get_taxonomies_list(self):
|
def get_taxonomies_list(self):
|
||||||
url = urljoin(self.root_url, '/taxonomies')
|
url = urljoin(self.root_url, '/taxonomies')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_taxonomy(self, taxonomy_id):
|
def get_taxonomy(self, taxonomy_id):
|
||||||
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
|
url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############## WarningLists ##################
|
# ############## WarningLists ##################
|
||||||
|
|
||||||
def get_warninglists(self):
|
def get_warninglists(self):
|
||||||
url = urljoin(self.root_url, '/warninglists')
|
url = urljoin(self.root_url, '/warninglists')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_warninglist(self, warninglist_id):
|
def get_warninglist(self, warninglist_id):
|
||||||
url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id))
|
url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ############## Galaxies/Clusters ##################
|
# ############## Galaxies/Clusters ##################
|
||||||
|
|
||||||
def get_galaxies(self):
|
def get_galaxies(self):
|
||||||
url = urljoin(self.root_url, '/galaxies')
|
url = urljoin(self.root_url, '/galaxies')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_galaxy(self, galaxy_id):
|
def get_galaxy(self, galaxy_id):
|
||||||
url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id))
|
url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ##############################################
|
# ##############################################
|
||||||
|
@ -1678,7 +1679,7 @@ class PyMISP(object):
|
||||||
def download_all_suricata(self):
|
def download_all_suricata(self):
|
||||||
"""Download all suricata rules events."""
|
"""Download all suricata rules events."""
|
||||||
url = urljoin(self.root_url, 'events/nids/suricata/download')
|
url = urljoin(self.root_url, 'events/nids/suricata/download')
|
||||||
response = self.__prepare_request('GET', url, output_type='rules')
|
response = self._prepare_request('GET', url, output_type='rules')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def download_suricata_rule_event(self, event_id):
|
def download_suricata_rule_event(self, event_id):
|
||||||
|
@ -1687,7 +1688,7 @@ class PyMISP(object):
|
||||||
:param event_id: ID of the event to download (same as get)
|
:param event_id: ID of the event to download (same as get)
|
||||||
"""
|
"""
|
||||||
url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
||||||
response = self.__prepare_request('GET', url, output_type='rules')
|
response = self._prepare_request('GET', url, output_type='rules')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# ############## Text ###############
|
# ############## Text ###############
|
||||||
|
@ -1695,7 +1696,7 @@ class PyMISP(object):
|
||||||
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
||||||
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
||||||
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
||||||
response = self.__prepare_request('GET', url, output_type='txt')
|
response = self._prepare_request('GET', url, output_type='txt')
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# ############## STIX ##############
|
# ############## STIX ##############
|
||||||
|
@ -1708,7 +1709,7 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
|
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
|
||||||
event_id, with_attachments, tags, from_date, to_date))
|
event_id, with_attachments, tags, from_date, to_date))
|
||||||
logger.debug("Getting STIX event from %s", url)
|
logger.debug("Getting STIX event from %s", url)
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_stix(self, **kwargs):
|
def get_stix(self, **kwargs):
|
||||||
|
@ -1743,9 +1744,9 @@ class PyMISP(object):
|
||||||
if last:
|
if last:
|
||||||
to_post['last'] = last
|
to_post['last'] = last
|
||||||
if to_post:
|
if to_post:
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post), output_type='json')
|
response = self._prepare_request('POST', url, json.dumps(to_post), output_type='json')
|
||||||
else:
|
else:
|
||||||
response = self.__prepare_request('POST', url, output_type='json')
|
response = self._prepare_request('POST', url, output_type='json')
|
||||||
return response.text
|
return response.text
|
||||||
|
|
||||||
# #######################################
|
# #######################################
|
||||||
|
@ -1754,32 +1755,32 @@ class PyMISP(object):
|
||||||
|
|
||||||
def _rest_list(self, urlpath):
|
def _rest_list(self, urlpath):
|
||||||
url = urljoin(self.root_url, urlpath)
|
url = urljoin(self.root_url, urlpath)
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def _rest_get_parameters(self, urlpath):
|
def _rest_get_parameters(self, urlpath):
|
||||||
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def _rest_view(self, urlpath, rest_id):
|
def _rest_view(self, urlpath, rest_id):
|
||||||
url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id))
|
url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def _rest_add(self, urlpath, obj):
|
def _rest_add(self, urlpath, obj):
|
||||||
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
url = urljoin(self.root_url, '{}/add'.format(urlpath))
|
||||||
response = self.__prepare_request('POST', url, obj.to_json())
|
response = self._prepare_request('POST', url, obj.to_json())
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def _rest_edit(self, urlpath, obj, rest_id):
|
def _rest_edit(self, urlpath, obj, rest_id):
|
||||||
url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id))
|
url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id))
|
||||||
response = self.__prepare_request('POST', url, obj.to_json())
|
response = self._prepare_request('POST', url, obj.to_json())
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def _rest_delete(self, urlpath, rest_id):
|
def _rest_delete(self, urlpath, rest_id):
|
||||||
url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id))
|
url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ###########################
|
# ###########################
|
||||||
|
@ -1817,37 +1818,37 @@ class PyMISP(object):
|
||||||
def fetch_feed(self, feed_id):
|
def fetch_feed(self, feed_id):
|
||||||
"""Fetch one single feed"""
|
"""Fetch one single feed"""
|
||||||
url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id))
|
url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def cache_feeds_all(self):
|
def cache_feeds_all(self):
|
||||||
""" Cache all the feeds"""
|
""" Cache all the feeds"""
|
||||||
url = urljoin(self.root_url, 'feeds/cacheFeeds/all')
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/all')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def cache_feed(self, feed_id):
|
def cache_feed(self, feed_id):
|
||||||
"""Cache a specific feed"""
|
"""Cache a specific feed"""
|
||||||
url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id))
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id))
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def cache_feeds_freetext(self):
|
def cache_feeds_freetext(self):
|
||||||
"""Cache all the freetext feeds"""
|
"""Cache all the freetext feeds"""
|
||||||
url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext')
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def cache_feeds_misp(self):
|
def cache_feeds_misp(self):
|
||||||
"""Cache all the MISP feeds"""
|
"""Cache all the MISP feeds"""
|
||||||
url = urljoin(self.root_url, 'feeds/cacheFeeds/misp')
|
url = urljoin(self.root_url, 'feeds/cacheFeeds/misp')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def compare_feeds(self):
|
def compare_feeds(self):
|
||||||
"""Generate the comparison matrix for all the MISP feeds"""
|
"""Generate the comparison matrix for all the MISP feeds"""
|
||||||
url = urljoin(self.root_url, 'feeds/compareFeeds')
|
url = urljoin(self.root_url, 'feeds/compareFeeds')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
@deprecated
|
@deprecated
|
||||||
|
@ -1877,7 +1878,7 @@ class PyMISP(object):
|
||||||
'''
|
'''
|
||||||
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation, 'extend': extend}
|
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation, 'extend': extend}
|
||||||
url = urljoin(self.root_url, '/sharingGroups/addOrg')
|
url = urljoin(self.root_url, '/sharingGroups/addOrg')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
|
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sharing_group_org_remove(self, sharing_group, organisation):
|
def sharing_group_org_remove(self, sharing_group, organisation):
|
||||||
|
@ -1887,7 +1888,7 @@ class PyMISP(object):
|
||||||
'''
|
'''
|
||||||
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation}
|
to_jsonify = {'sg_id': sharing_group, 'org_id': organisation}
|
||||||
url = urljoin(self.root_url, '/sharingGroups/removeOrg')
|
url = urljoin(self.root_url, '/sharingGroups/removeOrg')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
|
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sharing_group_server_add(self, sharing_group, server, all_orgs=False):
|
def sharing_group_server_add(self, sharing_group, server, all_orgs=False):
|
||||||
|
@ -1898,7 +1899,7 @@ class PyMISP(object):
|
||||||
'''
|
'''
|
||||||
to_jsonify = {'sg_id': sharing_group, 'server_id': server, 'all_orgs': all_orgs}
|
to_jsonify = {'sg_id': sharing_group, 'server_id': server, 'all_orgs': all_orgs}
|
||||||
url = urljoin(self.root_url, '/sharingGroups/addServer')
|
url = urljoin(self.root_url, '/sharingGroups/addServer')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
|
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sharing_group_server_remove(self, sharing_group, server):
|
def sharing_group_server_remove(self, sharing_group, server):
|
||||||
|
@ -1908,7 +1909,7 @@ class PyMISP(object):
|
||||||
'''
|
'''
|
||||||
to_jsonify = {'sg_id': sharing_group, 'server_id': server}
|
to_jsonify = {'sg_id': sharing_group, 'server_id': server}
|
||||||
url = urljoin(self.root_url, '/sharingGroups/removeServer')
|
url = urljoin(self.root_url, '/sharingGroups/removeServer')
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_jsonify))
|
response = self._prepare_request('POST', url, json.dumps(to_jsonify))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ###################
|
# ###################
|
||||||
|
@ -1935,7 +1936,7 @@ class PyMISP(object):
|
||||||
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
|
url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id))
|
||||||
else:
|
else:
|
||||||
url = urljoin(self.root_url, 'objects/add/{}'.format(event_id))
|
url = urljoin(self.root_url, 'objects/add/{}'.format(event_id))
|
||||||
response = self.__prepare_request('POST', url, misp_object.to_json())
|
response = self._prepare_request('POST', url, misp_object.to_json())
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def edit_object(self, misp_object, object_id=None):
|
def edit_object(self, misp_object, object_id=None):
|
||||||
|
@ -1949,31 +1950,31 @@ class PyMISP(object):
|
||||||
else:
|
else:
|
||||||
raise PyMISPError('In order to update an object, you have to provide an object ID (either in the misp_object, or as a parameter)')
|
raise PyMISPError('In order to update an object, you have to provide an object ID (either in the misp_object, or as a parameter)')
|
||||||
url = urljoin(self.root_url, 'objects/edit/{}'.format(param))
|
url = urljoin(self.root_url, 'objects/edit/{}'.format(param))
|
||||||
response = self.__prepare_request('POST', url, misp_object.to_json())
|
response = self._prepare_request('POST', url, misp_object.to_json())
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def delete_object(self, id):
|
def delete_object(self, id):
|
||||||
"""Deletes an object"""
|
"""Deletes an object"""
|
||||||
url = urljoin(self.root_url, 'objects/delete/{}'.format(id))
|
url = urljoin(self.root_url, 'objects/delete/{}'.format(id))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_object_reference(self, misp_object_reference):
|
def add_object_reference(self, misp_object_reference):
|
||||||
"""Add a reference to an object"""
|
"""Add a reference to an object"""
|
||||||
url = urljoin(self.root_url, 'object_references/add')
|
url = urljoin(self.root_url, 'object_references/add')
|
||||||
response = self.__prepare_request('POST', url, misp_object_reference.to_json())
|
response = self._prepare_request('POST', url, misp_object_reference.to_json())
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def delete_object_reference(self, id):
|
def delete_object_reference(self, id):
|
||||||
"""Deletes a reference to an object"""
|
"""Deletes a reference to an object"""
|
||||||
url = urljoin(self.root_url, 'object_references/delete/{}'.format(id))
|
url = urljoin(self.root_url, 'object_references/delete/{}'.format(id))
|
||||||
response = self.__prepare_request('POST', url)
|
response = self._prepare_request('POST', url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_object_templates_list(self):
|
def get_object_templates_list(self):
|
||||||
"""Returns the list of Object templates available on the MISP instance"""
|
"""Returns the list of Object templates available on the MISP instance"""
|
||||||
url = urljoin(self.root_url, 'objectTemplates')
|
url = urljoin(self.root_url, 'objectTemplates')
|
||||||
response = self.__prepare_request('GET', url)
|
response = self._prepare_request('GET', url)
|
||||||
return self._check_response(response)['response']
|
return self._check_response(response)['response']
|
||||||
|
|
||||||
def get_object_template_id(self, object_uuid):
|
def get_object_template_id(self, object_uuid):
|
||||||
|
@ -2000,7 +2001,7 @@ class PyMISP(object):
|
||||||
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
||||||
path = 'events/addTag'
|
path = 'events/addTag'
|
||||||
url = urljoin(self.root_url, path)
|
url = urljoin(self.root_url, path)
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
@deprecated
|
@deprecated
|
||||||
|
@ -2012,5 +2013,5 @@ class PyMISP(object):
|
||||||
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
||||||
path = 'events/removeTag'
|
path = 'events/removeTag'
|
||||||
url = urljoin(self.root_url, path)
|
url = urljoin(self.root_url, path)
|
||||||
response = self.__prepare_request('POST', url, json.dumps(to_post))
|
response = self._prepare_request('POST', url, json.dumps(to_post))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
|
@ -0,0 +1,153 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from .exceptions import MISPServerError
|
||||||
|
from .api import PyMISP, everything_broken
|
||||||
|
from typing import TypeVar, Optional, Tuple, List, Dict
|
||||||
|
from datetime import date, datetime
|
||||||
|
import json
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
|
SearchType = TypeVar('SearchType', str, int)
|
||||||
|
# str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]}
|
||||||
|
SearchParameterTypes = TypeVar('SearchParameterTypes', str, List[SearchType], Dict[str, SearchType])
|
||||||
|
DateTypes = TypeVar('DateTypes', datetime, date, SearchType, float)
|
||||||
|
DateInterval = TypeVar('DateInterval', DateTypes, Tuple[DateTypes, DateTypes])
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger('pymisp')
|
||||||
|
|
||||||
|
|
||||||
|
class ExpandedPyMISP(PyMISP):
|
||||||
|
|
||||||
|
def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None,
|
||||||
|
and_parameters: Optional[List[SearchType]]=None,
|
||||||
|
not_parameters: Optional[List[SearchType]]=None):
|
||||||
|
to_return = {}
|
||||||
|
if and_parameters:
|
||||||
|
to_return['AND'] = and_parameters
|
||||||
|
if not_parameters:
|
||||||
|
to_return['NOT'] = not_parameters
|
||||||
|
if or_parameters:
|
||||||
|
to_return['OR'] = or_parameters
|
||||||
|
return to_return
|
||||||
|
|
||||||
|
def make_timestamp(self, value: DateTypes):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return datetime.timestamp()
|
||||||
|
elif isinstance(value, date):
|
||||||
|
return datetime.combine(value, datetime.max.time()).timestamp()
|
||||||
|
elif isinstance(value, str):
|
||||||
|
if value.isdigit():
|
||||||
|
return value
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
float(value)
|
||||||
|
return value
|
||||||
|
except ValueError:
|
||||||
|
# The value can also be '1d', '10h', ...
|
||||||
|
return value
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
|
def _check_response(self, response):
|
||||||
|
"""Check if the response from the server is not an unexpected error"""
|
||||||
|
if response.status_code >= 500:
|
||||||
|
logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text))
|
||||||
|
raise MISPServerError('Error code 500:\n{}'.format(response.text))
|
||||||
|
elif 400 <= response.status_code < 500:
|
||||||
|
# The server returns a json message with the error details
|
||||||
|
error_message = response.json()
|
||||||
|
logger.error(f'Something went wrong ({response.status_code}): {error_message}')
|
||||||
|
return {'errors': [(response.status_code, error_message)]}
|
||||||
|
|
||||||
|
# At this point, we had no error.
|
||||||
|
if logger.isEnabledFor(logging.DEBUG):
|
||||||
|
logger.debug(response)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = response.json()
|
||||||
|
if response.get('response') is not None:
|
||||||
|
# Cleanup.
|
||||||
|
return response.get('response')
|
||||||
|
return response
|
||||||
|
except Exception:
|
||||||
|
return response.text
|
||||||
|
|
||||||
|
# TODO: Make that thing async & test it.
|
||||||
|
def search(self, controller: str='events', return_format: str='json',
|
||||||
|
value: Optional[SearchParameterTypes]=None,
|
||||||
|
type_attribute: Optional[SearchParameterTypes]=None,
|
||||||
|
category: Optional[SearchParameterTypes]=None,
|
||||||
|
org: Optional[SearchParameterTypes]=None,
|
||||||
|
tags: Optional[SearchParameterTypes]=None,
|
||||||
|
date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None,
|
||||||
|
eventid: Optional[SearchType]=None,
|
||||||
|
with_attachment: Optional[bool]=None,
|
||||||
|
metadata: Optional[bool]=None,
|
||||||
|
uuid: Optional[str]=None,
|
||||||
|
published: Optional[bool]=None,
|
||||||
|
searchall: Optional[bool]=None,
|
||||||
|
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
|
||||||
|
sg_reference_only: Optional[bool]=None,
|
||||||
|
publish_timestamp: Optional[DateInterval]=None,
|
||||||
|
timestamp: Optional[DateInterval]=None,
|
||||||
|
**kwargs):
|
||||||
|
|
||||||
|
if controller not in ['events', 'attributes', 'objects']:
|
||||||
|
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
|
||||||
|
|
||||||
|
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
|
||||||
|
# They are passed as-is.
|
||||||
|
query = kwargs
|
||||||
|
if return_format is not None:
|
||||||
|
query['returnFormat'] = return_format
|
||||||
|
if value is not None:
|
||||||
|
query['value'] = value
|
||||||
|
if type_attribute is not None:
|
||||||
|
query['type'] = type_attribute
|
||||||
|
if category is not None:
|
||||||
|
query['category'] = category
|
||||||
|
if org is not None:
|
||||||
|
query['org'] = org
|
||||||
|
if tags is not None:
|
||||||
|
query['tags'] = tags
|
||||||
|
if date_from is not None:
|
||||||
|
query['from'] = self.make_timestamp(date_from)
|
||||||
|
if date_to is not None:
|
||||||
|
query['to'] = self.make_timestamp(date_to)
|
||||||
|
if eventid is not None:
|
||||||
|
query['eventid'] = eventid
|
||||||
|
if with_attachment is not None:
|
||||||
|
query['withAttachments'] = with_attachment
|
||||||
|
if metadata is not None:
|
||||||
|
query['metadata'] = metadata
|
||||||
|
if uuid is not None:
|
||||||
|
query['uuid'] = uuid
|
||||||
|
if published is not None:
|
||||||
|
query['published'] = published
|
||||||
|
if searchall is not None:
|
||||||
|
query['searchall'] = searchall
|
||||||
|
if enforce_warninglist is not None:
|
||||||
|
query['enforceWarninglist'] = enforce_warninglist
|
||||||
|
if enforceWarninglist is not None:
|
||||||
|
# Alias for enforce_warninglist
|
||||||
|
query['enforceWarninglist'] = enforceWarninglist
|
||||||
|
if sg_reference_only is not None:
|
||||||
|
query['sgReferenceOnly'] = sg_reference_only
|
||||||
|
if publish_timestamp is not None:
|
||||||
|
if isinstance(publish_timestamp, (list, tuple)):
|
||||||
|
query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
|
||||||
|
else:
|
||||||
|
query['publish_timestamp'] = self.make_timestamp(publish_timestamp)
|
||||||
|
if timestamp is not None:
|
||||||
|
if isinstance(timestamp, (list, tuple)):
|
||||||
|
query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1]))
|
||||||
|
else:
|
||||||
|
query['timestamp'] = self.make_timestamp(timestamp)
|
||||||
|
|
||||||
|
url = urljoin(self.root_url, f'{controller}/restSearch')
|
||||||
|
response = self._prepare_request('POST', url, data=json.dumps(query))
|
||||||
|
return self._check_response(response)
|
|
@ -47,3 +47,6 @@ class UnknownMISPObjectTemplate(MISPObjectException):
|
||||||
|
|
||||||
class PyMISPInvalidFormat(PyMISPError):
|
class PyMISPInvalidFormat(PyMISPError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class MISPServerError(PyMISPError):
|
||||||
|
pass
|
||||||
|
|
|
@ -798,12 +798,22 @@ class MISPUser(AbstractMISP):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(MISPUser, self).__init__()
|
super(MISPUser, self).__init__()
|
||||||
|
|
||||||
|
def from_dict(self, **kwargs):
|
||||||
|
if kwargs.get('User'):
|
||||||
|
kwargs = kwargs.get('User')
|
||||||
|
super(MISPUser, self).from_dict(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class MISPOrganisation(AbstractMISP):
|
class MISPOrganisation(AbstractMISP):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(MISPOrganisation, self).__init__()
|
super(MISPOrganisation, self).__init__()
|
||||||
|
|
||||||
|
def from_dict(self, **kwargs):
|
||||||
|
if kwargs.get('Organisation'):
|
||||||
|
kwargs = kwargs.get('Organisation')
|
||||||
|
super(MISPOrganisation, self).from_dict(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class MISPFeed(AbstractMISP):
|
class MISPFeed(AbstractMISP):
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis
|
||||||
|
|
||||||
|
# from keys import url, key_admin
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
|
||||||
|
url = 'http://localhost:8080'
|
||||||
|
key_admin = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'
|
||||||
|
|
||||||
|
|
||||||
|
class TestComprehensive(unittest.TestCase):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.maxDiff = None
|
||||||
|
# Connect as admin
|
||||||
|
cls.admin_misp_connector = ExpandedPyMISP(url, key_admin)
|
||||||
|
# Creates an org
|
||||||
|
org = cls.admin_misp_connector.add_organisation(name='Test Org')
|
||||||
|
cls.test_org = MISPOrganisation()
|
||||||
|
cls.test_org.from_dict(**org)
|
||||||
|
# Creates a user
|
||||||
|
usr = cls.admin_misp_connector.add_user(email='testusr@user.local', org_id=cls.test_org.id, role_id=3)
|
||||||
|
cls.test_usr = MISPUser()
|
||||||
|
cls.test_usr.from_dict(**usr)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
# Delete user
|
||||||
|
cls.admin_misp_connector.delete_user(user_id=cls.test_usr.id)
|
||||||
|
# Delete org
|
||||||
|
cls.admin_misp_connector.delete_organisation(org_id=cls.test_org.id)
|
||||||
|
|
||||||
|
def create_event_org_only(self):
|
||||||
|
mispevent = MISPEvent()
|
||||||
|
mispevent.info = 'This is a test'
|
||||||
|
mispevent.distribution = Distribution.your_organisation_only
|
||||||
|
mispevent.threat_level_id = ThreatLevel.low
|
||||||
|
mispevent.analysis = Analysis.completed
|
||||||
|
mispevent.set_date("2017-12-31") # test the set date method
|
||||||
|
mispevent.add_attribute('text', str(uuid4()))
|
||||||
|
return mispevent
|
||||||
|
|
||||||
|
def create_event_with_tags(self):
|
||||||
|
mispevent = self.create_event_org_only()
|
||||||
|
mispevent.add_tag('tlp:white___test')
|
||||||
|
mispevent.attributes[0].add_tag('tlp:amber___test')
|
||||||
|
mispevent.add_attribute('text', str(uuid4()))
|
||||||
|
return mispevent
|
||||||
|
|
||||||
|
def test_search_value_event(self):
|
||||||
|
me = self.create_event_org_only()
|
||||||
|
# Create event
|
||||||
|
created_event = self.admin_misp_connector.add_event(me)
|
||||||
|
c_me = MISPEvent()
|
||||||
|
c_me.load(created_event)
|
||||||
|
# Search as admin
|
||||||
|
response = self.admin_misp_connector.search(value=me.attributes[0].value)
|
||||||
|
self.assertEqual(len(response), 1)
|
||||||
|
# Connect as user
|
||||||
|
user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
|
||||||
|
# Search as user
|
||||||
|
response = user_misp_connector.search(value=me.attributes[0].value)
|
||||||
|
self.assertEqual(response, [])
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(c_me.id)
|
||||||
|
|
||||||
|
def test_search_value_attribute(self):
|
||||||
|
me = self.create_event_org_only()
|
||||||
|
# Create event
|
||||||
|
created_event = self.admin_misp_connector.add_event(me)
|
||||||
|
c_me = MISPEvent()
|
||||||
|
c_me.load(created_event)
|
||||||
|
# Search as admin
|
||||||
|
response = self.admin_misp_connector.search(controller='attributes', value=me.attributes[0].value)
|
||||||
|
self.assertEqual(len(response), 1)
|
||||||
|
# Connect as user
|
||||||
|
user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
|
||||||
|
# Search as user
|
||||||
|
response = user_misp_connector.search(controller='attributes', value=me.attributes[0].value)
|
||||||
|
self.assertEqual(response, [])
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(c_me.id)
|
||||||
|
|
||||||
|
def test_search_tag_event(self):
|
||||||
|
me = self.create_event_with_tags()
|
||||||
|
# Create event
|
||||||
|
created_event = self.admin_misp_connector.add_event(me)
|
||||||
|
c_me = MISPEvent()
|
||||||
|
c_me.load(created_event)
|
||||||
|
# Search as admin
|
||||||
|
response = self.admin_misp_connector.search(tags='tlp:white___test')
|
||||||
|
self.assertEqual(len(response), 1)
|
||||||
|
# Connect as user
|
||||||
|
user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
|
||||||
|
# Search as user
|
||||||
|
response = user_misp_connector.search(value='tlp:white___test')
|
||||||
|
self.assertEqual(response, [])
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(c_me.id)
|
||||||
|
|
||||||
|
def test_search_tag_event_fancy(self):
|
||||||
|
# Create event
|
||||||
|
me = self.create_event_with_tags()
|
||||||
|
# Connect as user
|
||||||
|
user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
|
||||||
|
created_event = user_misp_connector.add_event(me)
|
||||||
|
to_delete = MISPEvent()
|
||||||
|
to_delete.load(created_event)
|
||||||
|
complex_query = user_misp_connector.build_complex_query(or_parameters=['tlp:white___test'], not_parameters=['tlp:amber___test'])
|
||||||
|
# Search as user
|
||||||
|
response = user_misp_connector.search(tags=complex_query)
|
||||||
|
for e in response:
|
||||||
|
to_validate = MISPEvent()
|
||||||
|
to_validate.load(e)
|
||||||
|
# FIXME Expected event without the tlp:amber attribute, broken for now
|
||||||
|
for a in to_validate.attributes:
|
||||||
|
print([t for t in a.tags if t.name == 'tlp:amber___test'])
|
||||||
|
# self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], [])
|
||||||
|
# Delete event
|
||||||
|
self.admin_misp_connector.delete_event(to_delete.id)
|
||||||
|
|
||||||
|
# def test_search_tag_attribute(self):
|
||||||
|
# me = self.create_event_with_tags()
|
||||||
|
# # Create event
|
||||||
|
# created_event = self.admin_misp_connector.add_event(me)
|
||||||
|
# c_me = MISPEvent()
|
||||||
|
# c_me.load(created_event)
|
||||||
|
# # Search as admin
|
||||||
|
# response = self.admin_misp_connector.search(controller='attributes', tags='tlp:white__test')
|
||||||
|
# print(response)
|
||||||
|
# self.assertEqual(len(response), 1)
|
||||||
|
# Connect as user
|
||||||
|
# user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey)
|
||||||
|
# Search as user
|
||||||
|
# response = user_misp_connector.search(controller='attributes', value='tlp:white__test')
|
||||||
|
# self.assertEqual(response, [])
|
||||||
|
# Delete event
|
||||||
|
# self.admin_misp_connector.delete_event(c_me.id)
|
Loading…
Reference in New Issue