diff --git a/pymisp/__init__.py b/pymisp/__init__.py index ea284ca..d0ec0a7 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -2,6 +2,7 @@ __version__ = '2.4.93' import logging import functools import warnings +import sys FORMAT = "%(levelname)s [%(filename)s:%(lineno)s - %(funcName)s() ] %(message)s" formatter = logging.Formatter(FORMAT) @@ -31,9 +32,9 @@ def deprecated(func): try: - from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat # noqa + from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError # noqa from .api import PyMISP # noqa - from .abstract import AbstractMISP, MISPEncode, MISPTag # noqa + from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa from .tools import AbstractMISPObjectGenerator # noqa from .tools import Neo4j # noqa @@ -41,6 +42,8 @@ try: from .tools import openioc # noqa from .tools import load_warninglists # noqa from .tools import ext_lookups # noqa + if sys.version_info >= (3, 6): + from .aping import ExpandedPyMISP # noqa logger.debug('pymisp loaded properly') except ImportError as e: logger.warning('Unable to load pymisp properly: {}'.format(e)) diff --git a/pymisp/abstract.py b/pymisp/abstract.py index 66bbd97..ba90aa0 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -9,6 +9,7 @@ from json import JSONEncoder import collections import six # Remove that import when discarding python2 support. import logging +from enum import Enum from .exceptions import PyMISPInvalidFormat @@ -34,6 +35,28 @@ if six.PY2: return timedelta(0) +class Distribution(Enum): + your_organisation_only = 0 + this_community_only = 1 + connected_communities = 2 + all_communities = 3 + sharing_group = 4 + inherit = 5 + + +class ThreatLevel(Enum): + high = 1 + medium = 2 + low = 3 + undefined = 4 + + +class Analysis(Enum): + initial = 0 + ongoing = 1 + completed = 2 + + class MISPEncode(JSONEncoder): def default(self, obj): @@ -41,6 +64,8 @@ class MISPEncode(JSONEncoder): return obj.jsonable() elif isinstance(obj, datetime.datetime): return obj.isoformat() + elif isinstance(obj, Enum): + return obj.value return JSONEncoder.default(self, obj) diff --git a/pymisp/api.py b/pymisp/api.py index 981779f..7cdf757 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -122,7 +122,7 @@ class PyMISP(object): def get_live_query_acl(self): """This should return an empty list, unless the ACL is outdated.""" - response = self.__prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json')) + response = self._prepare_request('GET', urljoin(self.root_url, 'events/queryACL.json')) return self._check_response(response) def get_local_describe_types(self): @@ -131,7 +131,7 @@ class PyMISP(object): return describe_types['result'] def get_live_describe_types(self): - response = self.__prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json')) + response = self._prepare_request('GET', urljoin(self.root_url, 'attributes/describeTypes.json')) describe_types = self._check_response(response) if describe_types.get('error'): for e in describe_types.get('error'): @@ -141,8 +141,8 @@ class PyMISP(object): raise PyMISPError('The MISP server your are trying to reach is outdated (<2.4.52). Please use PyMISP v2.4.51.1 (pip install -I PyMISP==v2.4.51.1) and/or contact your administrator.') return describe_types - def __prepare_request(self, request_type, url, data=None, - background_callback=None, output_type='json'): + def _prepare_request(self, request_type, url, data=None, + background_callback=None, output_type='json'): if logger.isEnabledFor(logging.DEBUG): logger.debug('{} - {}'.format(request_type, url)) if data is not None: @@ -152,21 +152,22 @@ class PyMISP(object): else: req = requests.Request(request_type, url, data=data) if self.asynch and background_callback is not None: - s = FuturesSession() + local_session = FuturesSession else: - s = requests.Session() - prepped = s.prepare_request(req) - prepped.headers.update( - {'Authorization': self.key, - 'Accept': 'application/{}'.format(output_type), - 'content-type': 'application/{}'.format(output_type), - 'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)}) - if logger.isEnabledFor(logging.DEBUG): - logger.debug(prepped.headers) - if self.asynch and background_callback is not None: - return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback) - else: - return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert) + local_session = requests.Session + with local_session() as s: + prepped = s.prepare_request(req) + prepped.headers.update( + {'Authorization': self.key, + 'Accept': 'application/{}'.format(output_type), + 'content-type': 'application/{}'.format(output_type), + 'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)}) + if logger.isEnabledFor(logging.DEBUG): + logger.debug(prepped.headers) + if self.asynch and background_callback is not None: + return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert, background_callback=background_callback) + else: + return s.send(prepped, verify=self.ssl, proxies=self.proxies, cert=self.cert) # ##################### # ### Core helpers #### @@ -314,9 +315,9 @@ class PyMISP(object): """ url = urljoin(self.root_url, 'events/index') if filters is None: - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) else: - response = self.__prepare_request('POST', url, json.dumps(filters)) + response = self._prepare_request('POST', url, json.dumps(filters)) return self._check_response(response) def get_event(self, event_id): @@ -325,7 +326,7 @@ class PyMISP(object): :param event_id: Event id to get """ url = urljoin(self.root_url, 'events/{}'.format(event_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def add_event(self, event): @@ -338,7 +339,7 @@ class PyMISP(object): event = event.to_json() elif not isinstance(event, basestring): event = json.dumps(event) - response = self.__prepare_request('POST', url, event) + response = self._prepare_request('POST', url, event) return self._check_response(response) def update_attribute(self, attribute_id, attribute): @@ -352,7 +353,7 @@ class PyMISP(object): attribute = attribute.to_json() elif not isinstance(attribute, basestring): attribute = json.dumps(attribute) - response = self.__prepare_request('POST', url, attribute) + response = self._prepare_request('POST', url, attribute) return self._check_response(response) def update_event(self, event_id, event): @@ -366,7 +367,7 @@ class PyMISP(object): event = event.to_json() elif not isinstance(event, basestring): event = json.dumps(event) - response = self.__prepare_request('POST', url, event) + response = self._prepare_request('POST', url, event) return self._check_response(response) def delete_event(self, event_id): @@ -375,7 +376,7 @@ class PyMISP(object): :param event_id: Event id to delete """ url = urljoin(self.root_url, 'events/{}'.format(event_id)) - response = self.__prepare_request('DELETE', url) + response = self._prepare_request('DELETE', url) return self._check_response(response) def delete_attribute(self, attribute_id, hard_delete=False): @@ -384,13 +385,13 @@ class PyMISP(object): url = urljoin(self.root_url, 'attributes/delete/{}/1'.format(attribute_id)) else: url = urljoin(self.root_url, 'attributes/delete/{}'.format(attribute_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def pushEventToZMQ(self, event_id): """Force push an event on ZMQ""" url = urljoin(self.root_url, 'events/pushEventToZMQ/{}.json'.format(event_id)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) # ############################################## @@ -419,7 +420,7 @@ class PyMISP(object): url = urljoin(self.root_url, 'events/publish/{}'.format(event_id)) else: url = urljoin(self.root_url, 'events/alert/{}'.format(event_id)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def publish(self, event, alert=True): @@ -467,7 +468,7 @@ class PyMISP(object): raise PyMISPError('Invalid UUID') url = urljoin(self.root_url, 'tags/attachTagToObject') to_post = {'uuid': uuid, 'tag': tag} - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) def untag(self, uuid, tag): @@ -476,7 +477,7 @@ class PyMISP(object): raise PyMISPError('Invalid UUID') url = urljoin(self.root_url, 'tags/removeTagFromObject') to_post = {'uuid': uuid, 'tag': tag} - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) # ##### File attributes ##### @@ -519,8 +520,8 @@ class PyMISP(object): data = attributes[0].to_json() else: data = attributes.to_json() - # __prepare_request(...) returns a requests.Response Object - resp = self.__prepare_request('POST', url, json.dumps(data, cls=MISPEncode)) + # _prepare_request(...) returns a requests.Response Object + resp = self._prepare_request('POST', url, json.dumps(data, cls=MISPEncode)) try: responses.append(resp.json()) except Exception: @@ -908,7 +909,7 @@ class PyMISP(object): url = urljoin(self.root_url, 'events/upload_sample') else: url = urljoin(self.root_url, 'events/upload_sample/{}'.format(event_id)) - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) # ############################ @@ -920,11 +921,11 @@ class PyMISP(object): url = urljoin(self.root_url, 'shadow_attributes/{}/{}'.format(path, id)) if path in ['add', 'edit']: query = {'request': {'ShadowAttribute': attribute}} - response = self.__prepare_request('POST', url, json.dumps(query, cls=MISPEncode)) + response = self._prepare_request('POST', url, json.dumps(query, cls=MISPEncode)) elif path == 'view': - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) else: # accept or discard - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def proposal_view(self, event_id=None, proposal_id=None): @@ -1000,9 +1001,9 @@ class PyMISP(object): url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/'))) if ASYNC_OK and async_callback: - response = self.__prepare_request('POST', url, json.dumps(query), async_callback) + response = self._prepare_request('POST', url, json.dumps(query), async_callback) else: - response = self.__prepare_request('POST', url, json.dumps(query)) + response = self._prepare_request('POST', url, json.dumps(query)) return self._check_response(response) def search_index(self, published=None, eventid=None, tag=None, datefrom=None, @@ -1055,9 +1056,9 @@ class PyMISP(object): url = urljoin(self.root_url, buildup_url) if self.asynch and async_callback: - response = self.__prepare_request('POST', url, json.dumps(to_post), async_callback) + response = self._prepare_request('POST', url, json.dumps(to_post), async_callback) else: - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) res = self._check_response(response) if normalize: to_return = {'response': []} @@ -1188,7 +1189,7 @@ class PyMISP(object): :param attribute_id: Attribute ID to fetched """ url = urljoin(self.root_url, 'attributes/download/{}'.format(attribute_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) try: response.json() # The query fails, response contains a json blob @@ -1201,7 +1202,7 @@ class PyMISP(object): """Get the yara rules from an event""" url = urljoin(self.root_url, 'attributes/restSearch') to_post = {'request': {'eventid': event_id, 'type': 'yara'}} - response = self.__prepare_request('POST', url, data=json.dumps(to_post)) + response = self._prepare_request('POST', url, data=json.dumps(to_post)) result = self._check_response(response) if result.get('error') is not None: return False, result.get('error') @@ -1214,7 +1215,7 @@ class PyMISP(object): """Download samples, by hash or event ID. If there are multiple samples in one event, use the all_samples switch""" url = urljoin(self.root_url, 'attributes/downloadSample') to_post = {'request': {'hash': sample_hash, 'eventID': event_id, 'allSamples': all_samples}} - response = self.__prepare_request('POST', url, data=json.dumps(to_post)) + response = self._prepare_request('POST', url, data=json.dumps(to_post)) result = self._check_response(response) if result.get('error') is not None: return False, result.get('error') @@ -1281,7 +1282,7 @@ class PyMISP(object): def get_all_tags(self, quiet=False): """Get all the tags used on the instance""" url = urljoin(self.root_url, 'tags') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) r = self._check_response(response) if not quiet or r.get('errors'): return r @@ -1295,7 +1296,7 @@ class PyMISP(object): """Create a new tag""" to_post = {'Tag': {'name': name, 'colour': colour, 'exportable': exportable, 'hide_tag': hide_tag}} url = urljoin(self.root_url, 'tags/add') - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) # ########## Version ########## @@ -1316,13 +1317,13 @@ class PyMISP(object): def get_recommended_api_version(self): """Returns the recommended API version from the server""" url = urljoin(self.root_url, 'servers/getPyMISPVersion.json') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_version(self): """Returns the version of the instance.""" url = urljoin(self.root_url, 'servers/getVersion.json') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_version_master(self): @@ -1344,7 +1345,7 @@ class PyMISP(object): url = urljoin(self.root_url, 'attributes/attributeStatistics/{}/{}'.format(context, percentage)) else: url = urljoin(self.root_url, 'attributes/attributeStatistics/{}'.format(context)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_tags_statistics(self, percentage=None, name_sort=None): @@ -1358,7 +1359,7 @@ class PyMISP(object): else: name_sort = 'false' url = urljoin(self.root_url, 'tags/tagStatistics/{}/{}'.format(percentage, name_sort)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) # ############## Sightings ################## @@ -1366,13 +1367,13 @@ class PyMISP(object): def sighting_per_id(self, attribute_id): """Add a sighting to an attribute (by attribute ID)""" url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_id)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def sighting_per_uuid(self, attribute_uuid): """Add a sighting to an attribute (by attribute UUID)""" url = urljoin(self.root_url, 'sightings/add/{}'.format(attribute_uuid)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def set_sightings(self, sightings): @@ -1385,7 +1386,7 @@ class PyMISP(object): elif isinstance(sighting, dict): to_post = json.dumps(sighting) url = urljoin(self.root_url, 'sightings/add/') - response = self.__prepare_request('POST', url, to_post) + response = self._prepare_request('POST', url, to_post) return self._check_response(response) def sighting_per_json(self, json_file): @@ -1435,7 +1436,7 @@ class PyMISP(object): org_id = "" uri = 'sightings/listSightings/{}/{}/{}'.format(element_id, scope, org_id) url = urljoin(self.root_url, uri) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) # ############## Sharing Groups ################## @@ -1443,7 +1444,7 @@ class PyMISP(object): def get_sharing_groups(self): """Get the existing sharing groups""" url = urljoin(self.root_url, 'sharing_groups.json') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response)['response'] # ############## Users ################## @@ -1590,14 +1591,14 @@ class PyMISP(object): push, pull, self_signed, push_rules, pull_rules, submitted_cert, submitted_client_cert, None, None) url = urljoin(self.root_url, 'servers/add') - response = self.__prepare_request('POST', url, json.dumps(new_server)) + response = self._prepare_request('POST', url, json.dumps(new_server)) return self._check_response(response) def add_server_json(self, json_file): with open(json_file, 'rb') as f: jdata = json.load(f) url = urljoin(self.root_url, 'servers/add') - response = self.__prepare_request('POST', url, json.dumps(jdata)) + response = self._prepare_request('POST', url, json.dumps(jdata)) return self._check_response(response) def edit_server(self, server_id, url=None, name=None, authkey=None, organisation=None, internal=None, push=False, @@ -1607,14 +1608,14 @@ class PyMISP(object): push, pull, self_signed, push_rules, pull_rules, submitted_cert, submitted_client_cert, delete_cert, delete_client_cert) url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id)) - response = self.__prepare_request('POST', url, json.dumps(new_server)) + response = self._prepare_request('POST', url, json.dumps(new_server)) return self._check_response(response) def edit_server_json(self, json_file, server_id): with open(json_file, 'rb') as f: jdata = json.load(f) url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id)) - response = self.__prepare_request('POST', url, json.dumps(jdata)) + response = self._prepare_request('POST', url, json.dumps(jdata)) return self._check_response(response) # ############## Roles ################## @@ -1622,7 +1623,7 @@ class PyMISP(object): def get_roles_list(self): """Get the list of existing roles""" url = urljoin(self.root_url, '/roles') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response)['response'] # ############## Tags ################## @@ -1630,43 +1631,43 @@ class PyMISP(object): def get_tags_list(self): """Get the list of existing tags""" url = urljoin(self.root_url, '/tags') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response)['Tag'] # ############## Taxonomies ################## def get_taxonomies_list(self): url = urljoin(self.root_url, '/taxonomies') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_taxonomy(self, taxonomy_id): url = urljoin(self.root_url, '/taxonomies/view/{}'.format(taxonomy_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) # ############## WarningLists ################## def get_warninglists(self): url = urljoin(self.root_url, '/warninglists') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_warninglist(self, warninglist_id): url = urljoin(self.root_url, '/warninglists/view/{}'.format(warninglist_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) # ############## Galaxies/Clusters ################## def get_galaxies(self): url = urljoin(self.root_url, '/galaxies') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_galaxy(self, galaxy_id): url = urljoin(self.root_url, '/galaxies/view/{}'.format(galaxy_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) # ############################################## @@ -1678,7 +1679,7 @@ class PyMISP(object): def download_all_suricata(self): """Download all suricata rules events.""" url = urljoin(self.root_url, 'events/nids/suricata/download') - response = self.__prepare_request('GET', url, output_type='rules') + response = self._prepare_request('GET', url, output_type='rules') return response def download_suricata_rule_event(self, event_id): @@ -1687,7 +1688,7 @@ class PyMISP(object): :param event_id: ID of the event to download (same as get) """ url = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id)) - response = self.__prepare_request('GET', url, output_type='rules') + response = self._prepare_request('GET', url, output_type='rules') return response # ############## Text ############### @@ -1695,7 +1696,7 @@ class PyMISP(object): def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False): """Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise.""" url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished)) - response = self.__prepare_request('GET', url, output_type='txt') + response = self._prepare_request('GET', url, output_type='txt') return response # ############## STIX ############## @@ -1708,7 +1709,7 @@ class PyMISP(object): url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format( event_id, with_attachments, tags, from_date, to_date)) logger.debug("Getting STIX event from %s", url) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def get_stix(self, **kwargs): @@ -1743,9 +1744,9 @@ class PyMISP(object): if last: to_post['last'] = last if to_post: - response = self.__prepare_request('POST', url, json.dumps(to_post), output_type='json') + response = self._prepare_request('POST', url, json.dumps(to_post), output_type='json') else: - response = self.__prepare_request('POST', url, output_type='json') + response = self._prepare_request('POST', url, output_type='json') return response.text # ####################################### @@ -1754,32 +1755,32 @@ class PyMISP(object): def _rest_list(self, urlpath): url = urljoin(self.root_url, urlpath) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def _rest_get_parameters(self, urlpath): url = urljoin(self.root_url, '{}/add'.format(urlpath)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def _rest_view(self, urlpath, rest_id): url = urljoin(self.root_url, '{}/view/{}'.format(urlpath, rest_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def _rest_add(self, urlpath, obj): url = urljoin(self.root_url, '{}/add'.format(urlpath)) - response = self.__prepare_request('POST', url, obj.to_json()) + response = self._prepare_request('POST', url, obj.to_json()) return self._check_response(response) def _rest_edit(self, urlpath, obj, rest_id): url = urljoin(self.root_url, '{}/edit/{}'.format(urlpath, rest_id)) - response = self.__prepare_request('POST', url, obj.to_json()) + response = self._prepare_request('POST', url, obj.to_json()) return self._check_response(response) def _rest_delete(self, urlpath, rest_id): url = urljoin(self.root_url, '{}/delete/{}'.format(urlpath, rest_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('POST', url) return self._check_response(response) # ########################### @@ -1817,37 +1818,37 @@ class PyMISP(object): def fetch_feed(self, feed_id): """Fetch one single feed""" url = urljoin(self.root_url, 'feeds/fetchFromFeed/{}'.format(feed_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def cache_feeds_all(self): """ Cache all the feeds""" url = urljoin(self.root_url, 'feeds/cacheFeeds/all') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def cache_feed(self, feed_id): """Cache a specific feed""" url = urljoin(self.root_url, 'feeds/cacheFeeds/{}'.format(feed_id)) - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def cache_feeds_freetext(self): """Cache all the freetext feeds""" url = urljoin(self.root_url, 'feeds/cacheFeeds/freetext') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def cache_feeds_misp(self): """Cache all the MISP feeds""" url = urljoin(self.root_url, 'feeds/cacheFeeds/misp') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) def compare_feeds(self): """Generate the comparison matrix for all the MISP feeds""" url = urljoin(self.root_url, 'feeds/compareFeeds') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response) @deprecated @@ -1877,7 +1878,7 @@ class PyMISP(object): ''' to_jsonify = {'sg_id': sharing_group, 'org_id': organisation, 'extend': extend} url = urljoin(self.root_url, '/sharingGroups/addOrg') - response = self.__prepare_request('POST', url, json.dumps(to_jsonify)) + response = self._prepare_request('POST', url, json.dumps(to_jsonify)) return self._check_response(response) def sharing_group_org_remove(self, sharing_group, organisation): @@ -1887,7 +1888,7 @@ class PyMISP(object): ''' to_jsonify = {'sg_id': sharing_group, 'org_id': organisation} url = urljoin(self.root_url, '/sharingGroups/removeOrg') - response = self.__prepare_request('POST', url, json.dumps(to_jsonify)) + response = self._prepare_request('POST', url, json.dumps(to_jsonify)) return self._check_response(response) def sharing_group_server_add(self, sharing_group, server, all_orgs=False): @@ -1898,7 +1899,7 @@ class PyMISP(object): ''' to_jsonify = {'sg_id': sharing_group, 'server_id': server, 'all_orgs': all_orgs} url = urljoin(self.root_url, '/sharingGroups/addServer') - response = self.__prepare_request('POST', url, json.dumps(to_jsonify)) + response = self._prepare_request('POST', url, json.dumps(to_jsonify)) return self._check_response(response) def sharing_group_server_remove(self, sharing_group, server): @@ -1908,7 +1909,7 @@ class PyMISP(object): ''' to_jsonify = {'sg_id': sharing_group, 'server_id': server} url = urljoin(self.root_url, '/sharingGroups/removeServer') - response = self.__prepare_request('POST', url, json.dumps(to_jsonify)) + response = self._prepare_request('POST', url, json.dumps(to_jsonify)) return self._check_response(response) # ################### @@ -1935,7 +1936,7 @@ class PyMISP(object): url = urljoin(self.root_url, 'objects/add/{}/{}'.format(event_id, template_id)) else: url = urljoin(self.root_url, 'objects/add/{}'.format(event_id)) - response = self.__prepare_request('POST', url, misp_object.to_json()) + response = self._prepare_request('POST', url, misp_object.to_json()) return self._check_response(response) def edit_object(self, misp_object, object_id=None): @@ -1949,31 +1950,31 @@ class PyMISP(object): else: raise PyMISPError('In order to update an object, you have to provide an object ID (either in the misp_object, or as a parameter)') url = urljoin(self.root_url, 'objects/edit/{}'.format(param)) - response = self.__prepare_request('POST', url, misp_object.to_json()) + response = self._prepare_request('POST', url, misp_object.to_json()) return self._check_response(response) def delete_object(self, id): """Deletes an object""" url = urljoin(self.root_url, 'objects/delete/{}'.format(id)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def add_object_reference(self, misp_object_reference): """Add a reference to an object""" url = urljoin(self.root_url, 'object_references/add') - response = self.__prepare_request('POST', url, misp_object_reference.to_json()) + response = self._prepare_request('POST', url, misp_object_reference.to_json()) return self._check_response(response) def delete_object_reference(self, id): """Deletes a reference to an object""" url = urljoin(self.root_url, 'object_references/delete/{}'.format(id)) - response = self.__prepare_request('POST', url) + response = self._prepare_request('POST', url) return self._check_response(response) def get_object_templates_list(self): """Returns the list of Object templates available on the MISP instance""" url = urljoin(self.root_url, 'objectTemplates') - response = self.__prepare_request('GET', url) + response = self._prepare_request('GET', url) return self._check_response(response)['response'] def get_object_template_id(self, object_uuid): @@ -2000,7 +2001,7 @@ class PyMISP(object): to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}} path = 'events/addTag' url = urljoin(self.root_url, path) - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) @deprecated @@ -2012,5 +2013,5 @@ class PyMISP(object): to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}} path = 'events/removeTag' url = urljoin(self.root_url, path) - response = self.__prepare_request('POST', url, json.dumps(to_post)) + response = self._prepare_request('POST', url, json.dumps(to_post)) return self._check_response(response) diff --git a/pymisp/aping.py b/pymisp/aping.py new file mode 100644 index 0000000..574ef36 --- /dev/null +++ b/pymisp/aping.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from .exceptions import MISPServerError +from .api import PyMISP, everything_broken +from typing import TypeVar, Optional, Tuple, List, Dict +from datetime import date, datetime +import json + +import logging +from urllib.parse import urljoin + +SearchType = TypeVar('SearchType', str, int) +# str: string to search / list: values to search (OR) / dict: {'OR': [list], 'NOT': [list], 'AND': [list]} +SearchParameterTypes = TypeVar('SearchParameterTypes', str, List[SearchType], Dict[str, SearchType]) +DateTypes = TypeVar('DateTypes', datetime, date, SearchType, float) +DateInterval = TypeVar('DateInterval', DateTypes, Tuple[DateTypes, DateTypes]) + + +logger = logging.getLogger('pymisp') + + +class ExpandedPyMISP(PyMISP): + + def build_complex_query(self, or_parameters: Optional[List[SearchType]]=None, + and_parameters: Optional[List[SearchType]]=None, + not_parameters: Optional[List[SearchType]]=None): + to_return = {} + if and_parameters: + to_return['AND'] = and_parameters + if not_parameters: + to_return['NOT'] = not_parameters + if or_parameters: + to_return['OR'] = or_parameters + return to_return + + def make_timestamp(self, value: DateTypes): + if isinstance(value, datetime): + return datetime.timestamp() + elif isinstance(value, date): + return datetime.combine(value, datetime.max.time()).timestamp() + elif isinstance(value, str): + if value.isdigit(): + return value + else: + try: + float(value) + return value + except ValueError: + # The value can also be '1d', '10h', ... + return value + else: + return value + + def _check_response(self, response): + """Check if the response from the server is not an unexpected error""" + if response.status_code >= 500: + logger.critical(everything_broken.format(response.request.headers, response.request.body, response.text)) + raise MISPServerError('Error code 500:\n{}'.format(response.text)) + elif 400 <= response.status_code < 500: + # The server returns a json message with the error details + error_message = response.json() + logger.error(f'Something went wrong ({response.status_code}): {error_message}') + return {'errors': [(response.status_code, error_message)]} + + # At this point, we had no error. + if logger.isEnabledFor(logging.DEBUG): + logger.debug(response) + + try: + response = response.json() + if response.get('response') is not None: + # Cleanup. + return response.get('response') + return response + except Exception: + return response.text + + # TODO: Make that thing async & test it. + def search(self, controller: str='events', return_format: str='json', + value: Optional[SearchParameterTypes]=None, + type_attribute: Optional[SearchParameterTypes]=None, + category: Optional[SearchParameterTypes]=None, + org: Optional[SearchParameterTypes]=None, + tags: Optional[SearchParameterTypes]=None, + date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None, + eventid: Optional[SearchType]=None, + with_attachment: Optional[bool]=None, + metadata: Optional[bool]=None, + uuid: Optional[str]=None, + published: Optional[bool]=None, + searchall: Optional[bool]=None, + enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None, + sg_reference_only: Optional[bool]=None, + publish_timestamp: Optional[DateInterval]=None, + timestamp: Optional[DateInterval]=None, + **kwargs): + + if controller not in ['events', 'attributes', 'objects']: + raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects']))) + + # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. + # They are passed as-is. + query = kwargs + if return_format is not None: + query['returnFormat'] = return_format + if value is not None: + query['value'] = value + if type_attribute is not None: + query['type'] = type_attribute + if category is not None: + query['category'] = category + if org is not None: + query['org'] = org + if tags is not None: + query['tags'] = tags + if date_from is not None: + query['from'] = self.make_timestamp(date_from) + if date_to is not None: + query['to'] = self.make_timestamp(date_to) + if eventid is not None: + query['eventid'] = eventid + if with_attachment is not None: + query['withAttachments'] = with_attachment + if metadata is not None: + query['metadata'] = metadata + if uuid is not None: + query['uuid'] = uuid + if published is not None: + query['published'] = published + if searchall is not None: + query['searchall'] = searchall + if enforce_warninglist is not None: + query['enforceWarninglist'] = enforce_warninglist + if enforceWarninglist is not None: + # Alias for enforce_warninglist + query['enforceWarninglist'] = enforceWarninglist + if sg_reference_only is not None: + query['sgReferenceOnly'] = sg_reference_only + if publish_timestamp is not None: + if isinstance(publish_timestamp, (list, tuple)): + query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1])) + else: + query['publish_timestamp'] = self.make_timestamp(publish_timestamp) + if timestamp is not None: + if isinstance(timestamp, (list, tuple)): + query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1])) + else: + query['timestamp'] = self.make_timestamp(timestamp) + + url = urljoin(self.root_url, f'{controller}/restSearch') + response = self._prepare_request('POST', url, data=json.dumps(query)) + return self._check_response(response) diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index 967e9b7..6c426db 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -47,3 +47,6 @@ class UnknownMISPObjectTemplate(MISPObjectException): class PyMISPInvalidFormat(PyMISPError): pass + +class MISPServerError(PyMISPError): + pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 0a916d9..d3b5c1c 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -798,12 +798,22 @@ class MISPUser(AbstractMISP): def __init__(self): super(MISPUser, self).__init__() + def from_dict(self, **kwargs): + if kwargs.get('User'): + kwargs = kwargs.get('User') + super(MISPUser, self).from_dict(**kwargs) + class MISPOrganisation(AbstractMISP): def __init__(self): super(MISPOrganisation, self).__init__() + def from_dict(self, **kwargs): + if kwargs.get('Organisation'): + kwargs = kwargs.get('Organisation') + super(MISPOrganisation, self).from_dict(**kwargs) + class MISPFeed(AbstractMISP): diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py new file mode 100644 index 0000000..b57a5d9 --- /dev/null +++ b/tests/test_comprehensive.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import unittest + +from pymisp import ExpandedPyMISP, MISPEvent, MISPOrganisation, MISPUser, Distribution, ThreatLevel, Analysis + +# from keys import url, key_admin +from uuid import uuid4 + + +url = 'http://localhost:8080' +key_admin = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo' + + +class TestComprehensive(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.maxDiff = None + # Connect as admin + cls.admin_misp_connector = ExpandedPyMISP(url, key_admin) + # Creates an org + org = cls.admin_misp_connector.add_organisation(name='Test Org') + cls.test_org = MISPOrganisation() + cls.test_org.from_dict(**org) + # Creates a user + usr = cls.admin_misp_connector.add_user(email='testusr@user.local', org_id=cls.test_org.id, role_id=3) + cls.test_usr = MISPUser() + cls.test_usr.from_dict(**usr) + + @classmethod + def tearDownClass(cls): + # Delete user + cls.admin_misp_connector.delete_user(user_id=cls.test_usr.id) + # Delete org + cls.admin_misp_connector.delete_organisation(org_id=cls.test_org.id) + + def create_event_org_only(self): + mispevent = MISPEvent() + mispevent.info = 'This is a test' + mispevent.distribution = Distribution.your_organisation_only + mispevent.threat_level_id = ThreatLevel.low + mispevent.analysis = Analysis.completed + mispevent.set_date("2017-12-31") # test the set date method + mispevent.add_attribute('text', str(uuid4())) + return mispevent + + def create_event_with_tags(self): + mispevent = self.create_event_org_only() + mispevent.add_tag('tlp:white___test') + mispevent.attributes[0].add_tag('tlp:amber___test') + mispevent.add_attribute('text', str(uuid4())) + return mispevent + + def test_search_value_event(self): + me = self.create_event_org_only() + # Create event + created_event = self.admin_misp_connector.add_event(me) + c_me = MISPEvent() + c_me.load(created_event) + # Search as admin + response = self.admin_misp_connector.search(value=me.attributes[0].value) + self.assertEqual(len(response), 1) + # Connect as user + user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) + # Search as user + response = user_misp_connector.search(value=me.attributes[0].value) + self.assertEqual(response, []) + # Delete event + self.admin_misp_connector.delete_event(c_me.id) + + def test_search_value_attribute(self): + me = self.create_event_org_only() + # Create event + created_event = self.admin_misp_connector.add_event(me) + c_me = MISPEvent() + c_me.load(created_event) + # Search as admin + response = self.admin_misp_connector.search(controller='attributes', value=me.attributes[0].value) + self.assertEqual(len(response), 1) + # Connect as user + user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) + # Search as user + response = user_misp_connector.search(controller='attributes', value=me.attributes[0].value) + self.assertEqual(response, []) + # Delete event + self.admin_misp_connector.delete_event(c_me.id) + + def test_search_tag_event(self): + me = self.create_event_with_tags() + # Create event + created_event = self.admin_misp_connector.add_event(me) + c_me = MISPEvent() + c_me.load(created_event) + # Search as admin + response = self.admin_misp_connector.search(tags='tlp:white___test') + self.assertEqual(len(response), 1) + # Connect as user + user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) + # Search as user + response = user_misp_connector.search(value='tlp:white___test') + self.assertEqual(response, []) + # Delete event + self.admin_misp_connector.delete_event(c_me.id) + + def test_search_tag_event_fancy(self): + # Create event + me = self.create_event_with_tags() + # Connect as user + user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) + created_event = user_misp_connector.add_event(me) + to_delete = MISPEvent() + to_delete.load(created_event) + complex_query = user_misp_connector.build_complex_query(or_parameters=['tlp:white___test'], not_parameters=['tlp:amber___test']) + # Search as user + response = user_misp_connector.search(tags=complex_query) + for e in response: + to_validate = MISPEvent() + to_validate.load(e) + # FIXME Expected event without the tlp:amber attribute, broken for now + for a in to_validate.attributes: + print([t for t in a.tags if t.name == 'tlp:amber___test']) + # self.assertEqual([t for t in a.tags if t.name == 'tlp:amber___test'], []) + # Delete event + self.admin_misp_connector.delete_event(to_delete.id) + +# def test_search_tag_attribute(self): +# me = self.create_event_with_tags() +# # Create event +# created_event = self.admin_misp_connector.add_event(me) +# c_me = MISPEvent() +# c_me.load(created_event) +# # Search as admin +# response = self.admin_misp_connector.search(controller='attributes', tags='tlp:white__test') +# print(response) +# self.assertEqual(len(response), 1) + # Connect as user +# user_misp_connector = ExpandedPyMISP(url, self.test_usr.authkey) + # Search as user +# response = user_misp_connector.search(controller='attributes', value='tlp:white__test') +# self.assertEqual(response, []) + # Delete event +# self.admin_misp_connector.delete_event(c_me.id)