diff --git a/Pipfile.lock b/Pipfile.lock index bf45ecc..37b2d03 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -140,13 +140,13 @@ "pymispwarninglists": { "editable": true, "git": "https://github.com/MISP/PyMISPWarningLists.git", - "ref": "1901e2e54db829fb3c50dd034f2632874aa779db" + "ref": "52b0a0f93045861330c134385f88441f212f6421" }, "pyrsistent": { "hashes": [ - "sha256:50cffebc87ca91b9d4be2dcc2e479272bcb466b5a0487b6c271f7ddea6917e14" + "sha256:34b47fa169d6006b32e99d4b3c4031f155e6e68ebcc107d6454852e8e0ee6533" ], - "version": "==0.15.3" + "version": "==0.15.4" }, "python-dateutil": { "hashes": [ @@ -311,47 +311,48 @@ }, "coverage": { "hashes": [ - "sha256:3684fabf6b87a369017756b551cef29e505cb155ddb892a7a29277b978da88b9", - "sha256:39e088da9b284f1bd17c750ac672103779f7954ce6125fd4382134ac8d152d74", - "sha256:3c205bc11cc4fcc57b761c2da73b9b72a59f8d5ca89979afb0c1c6f9e53c7390", - "sha256:465ce53a8c0f3a7950dfb836438442f833cf6663d407f37d8c52fe7b6e56d7e8", - "sha256:48020e343fc40f72a442c8a1334284620f81295256a6b6ca6d8aa1350c763bbe", - "sha256:5296fc86ab612ec12394565c500b412a43b328b3907c0d14358950d06fd83baf", - "sha256:5f61bed2f7d9b6a9ab935150a6b23d7f84b8055524e7be7715b6513f3328138e", - "sha256:68a43a9f9f83693ce0414d17e019daee7ab3f7113a70c79a3dd4c2f704e4d741", - "sha256:6b8033d47fe22506856fe450470ccb1d8ba1ffb8463494a15cfc96392a288c09", - "sha256:7ad7536066b28863e5835e8cfeaa794b7fe352d99a8cded9f43d1161be8e9fbd", - "sha256:7bacb89ccf4bedb30b277e96e4cc68cd1369ca6841bde7b005191b54d3dd1034", - "sha256:839dc7c36501254e14331bcb98b27002aa415e4af7ea039d9009409b9d2d5420", - "sha256:8f9a95b66969cdea53ec992ecea5406c5bd99c9221f539bca1e8406b200ae98c", - "sha256:932c03d2d565f75961ba1d3cec41ddde00e162c5b46d03f7423edcb807734eab", - "sha256:988529edadc49039d205e0aa6ce049c5ccda4acb2d6c3c5c550c17e8c02c05ba", - "sha256:998d7e73548fe395eeb294495a04d38942edb66d1fa61eb70418871bc621227e", - "sha256:9de60893fb447d1e797f6bf08fdf0dbcda0c1e34c1b06c92bd3a363c0ea8c609", - "sha256:9e80d45d0c7fcee54e22771db7f1b0b126fb4a6c0a2e5afa72f66827207ff2f2", - "sha256:a545a3dfe5082dc8e8c3eb7f8a2cf4f2870902ff1860bd99b6198cfd1f9d1f49", - "sha256:a5d8f29e5ec661143621a8f4de51adfb300d7a476224156a39a392254f70687b", - "sha256:aca06bfba4759bbdb09bf52ebb15ae20268ee1f6747417837926fae990ebc41d", - "sha256:bb23b7a6fd666e551a3094ab896a57809e010059540ad20acbeec03a154224ce", - "sha256:bfd1d0ae7e292105f29d7deaa9d8f2916ed8553ab9d5f39ec65bcf5deadff3f9", - "sha256:c62ca0a38958f541a73cf86acdab020c2091631c137bd359c4f5bddde7b75fd4", - "sha256:c709d8bda72cf4cd348ccec2a4881f2c5848fd72903c185f363d361b2737f773", - "sha256:c968a6aa7e0b56ecbd28531ddf439c2ec103610d3e2bf3b75b813304f8cb7723", - "sha256:df785d8cb80539d0b55fd47183264b7002077859028dfe3070cf6359bf8b2d9c", - "sha256:f406628ca51e0ae90ae76ea8398677a921b36f0bd71aab2099dfed08abd0322f", - "sha256:f46087bbd95ebae244a0eda01a618aff11ec7a069b15a3ef8f6b520db523dcf1", - "sha256:f8019c5279eb32360ca03e9fac40a12667715546eed5c5eb59eb381f2f501260", - "sha256:fc5f4d209733750afd2714e9109816a29500718b32dd9a5db01c0cb3a019b96a" + "sha256:08907593569fe59baca0bf152c43f3863201efb6113ecb38ce7e97ce339805a6", + "sha256:0be0f1ed45fc0c185cfd4ecc19a1d6532d72f86a2bac9de7e24541febad72650", + "sha256:141f08ed3c4b1847015e2cd62ec06d35e67a3ac185c26f7635f4406b90afa9c5", + "sha256:19e4df788a0581238e9390c85a7a09af39c7b539b29f25c89209e6c3e371270d", + "sha256:23cc09ed395b03424d1ae30dcc292615c1372bfba7141eb85e11e50efaa6b351", + "sha256:245388cda02af78276b479f299bbf3783ef0a6a6273037d7c60dc73b8d8d7755", + "sha256:331cb5115673a20fb131dadd22f5bcaf7677ef758741312bee4937d71a14b2ef", + "sha256:386e2e4090f0bc5df274e720105c342263423e77ee8826002dcffe0c9533dbca", + "sha256:3a794ce50daee01c74a494919d5ebdc23d58873747fa0e288318728533a3e1ca", + "sha256:60851187677b24c6085248f0a0b9b98d49cba7ecc7ec60ba6b9d2e5574ac1ee9", + "sha256:63a9a5fc43b58735f65ed63d2cf43508f462dc49857da70b8980ad78d41d52fc", + "sha256:6b62544bb68106e3f00b21c8930e83e584fdca005d4fffd29bb39fb3ffa03cb5", + "sha256:6ba744056423ef8d450cf627289166da65903885272055fb4b5e113137cfa14f", + "sha256:7494b0b0274c5072bddbfd5b4a6c6f18fbbe1ab1d22a41e99cd2d00c8f96ecfe", + "sha256:826f32b9547c8091679ff292a82aca9c7b9650f9fda3e2ca6bf2ac905b7ce888", + "sha256:93715dffbcd0678057f947f496484e906bf9509f5c1c38fc9ba3922893cda5f5", + "sha256:9a334d6c83dfeadae576b4d633a71620d40d1c379129d587faa42ee3e2a85cce", + "sha256:af7ed8a8aa6957aac47b4268631fa1df984643f07ef00acd374e456364b373f5", + "sha256:bf0a7aed7f5521c7ca67febd57db473af4762b9622254291fbcbb8cd0ba5e33e", + "sha256:bf1ef9eb901113a9805287e090452c05547578eaab1b62e4ad456fcc049a9b7e", + "sha256:c0afd27bc0e307a1ffc04ca5ec010a290e49e3afbe841c5cafc5c5a80ecd81c9", + "sha256:dd579709a87092c6dbee09d1b7cfa81831040705ffa12a1b248935274aee0437", + "sha256:df6712284b2e44a065097846488f66840445eb987eb81b3cc6e4149e7b6982e1", + "sha256:e07d9f1a23e9e93ab5c62902833bf3e4b1f65502927379148b6622686223125c", + "sha256:e2ede7c1d45e65e209d6093b762e98e8318ddeff95317d07a27a2140b80cfd24", + "sha256:e4ef9c164eb55123c62411f5936b5c2e521b12356037b6e1c2617cef45523d47", + "sha256:eca2b7343524e7ba246cab8ff00cab47a2d6d54ada3b02772e908a45675722e2", + "sha256:eee64c616adeff7db37cc37da4180a3a5b6177f5c46b187894e633f088fb5b28", + "sha256:ef824cad1f980d27f26166f86856efe11eff9912c4fed97d3804820d43fa550c", + "sha256:efc89291bd5a08855829a3c522df16d856455297cf35ae827a37edac45f466a7", + "sha256:fa964bae817babece5aa2e8c1af841bebb6d0b9add8e637548809d040443fee0", + "sha256:ff37757e068ae606659c28c3bd0d923f9d29a85de79bf25b2b34b148473b5025" ], - "version": "==4.5.3" + "version": "==4.5.4" }, "coveralls": { "hashes": [ - "sha256:d3d49234bffd41e91b241a69f0ebb9f64d7f0515711a76134d53d4647e7eb509", - "sha256:dafabcff87425fa2ab3122dee21229afbb4d6692cfdacc6bb895f7dfa8b2c849" + "sha256:9bc5a1f92682eef59f688a8f280207190d9a6afb84cef8f567fa47631a784060", + "sha256:fb51cddef4bc458de347274116df15d641a735d3f0a580a9472174e2e62f408c" ], "index": "pypi", - "version": "==1.8.1" + "version": "==1.8.2" }, "decorator": { "hashes": [ @@ -488,10 +489,10 @@ }, "packaging": { "hashes": [ - "sha256:0c98a5d0be38ed775798ece1b9727178c4469d9c3b4ada66e8e6b7849f8732af", - "sha256:9e1cbf8c12b1f1ce0bb5344b8d7ecf66a6f8a6e91bcb0c84593ed6d3ab5c4ab3" + "sha256:a7ac867b97fdc07ee80a8058fe4435ccd274ecc3b0ed61d852d7d53055528cf9", + "sha256:c491ca87294da7cc01902edbe30a5bc6c4c28172b5138ab4e4aa1b9d7bfaeafe" ], - "version": "==19.0" + "version": "==19.1" }, "pillow": { "hashes": [ @@ -563,16 +564,16 @@ }, "pyparsing": { "hashes": [ - "sha256:530d8bf8cc93a34019d08142593cf4d78a05c890da8cf87ffa3120af53772238", - "sha256:f78e99616b6f1a4745c0580e170251ef1bbafc0d0513e270c4bd281bf29d2800" + "sha256:6f98a7b9397e206d78cc01df10131398f1c8b8510a2f4d97d9abd82e1aacdd80", + "sha256:d9338df12903bbf5d65a0e4e87c2161968b10d2e489652bb47001d82a9b028b4" ], - "version": "==2.4.1" + "version": "==2.4.2" }, "pyrsistent": { "hashes": [ - "sha256:50cffebc87ca91b9d4be2dcc2e479272bcb466b5a0487b6c271f7ddea6917e14" + "sha256:34b47fa169d6006b32e99d4b3c4031f155e6e68ebcc107d6454852e8e0ee6533" ], - "version": "==0.15.3" + "version": "==0.15.4" }, "python-dateutil": { "hashes": [ @@ -590,10 +591,10 @@ }, "pytz": { "hashes": [ - "sha256:303879e36b721603cc54604edcac9d20401bdbe31e1e4fdee5b9f98d5d31dfda", - "sha256:d747dd3d23d77ef44c6a3526e274af6efeb0a6f1afd5a69ba4d5be4098c8e141" + "sha256:26c0b32e437e54a18161324a2fca3c4b9846b74a8dccddd843113109e1116b32", + "sha256:c894d57500a4cd2d5c71114aaab77dbab5eabd9022308ce5ac9bb93a60a6f0c7" ], - "version": "==2019.1" + "version": "==2019.2" }, "recommonmark": { "hashes": [ @@ -679,10 +680,10 @@ }, "sphinx-autodoc-typehints": { "hashes": [ - "sha256:19fe0b426b7c008181f67f816060da7f046bd8a42723f67a685d26d875bcefd7", - "sha256:f9c06acfec80766fe8f542a6d6a042e751fcf6ce2e2711a7dc00d8b6daf8aa36" + "sha256:8eb1e2bc248d316a9faeca086c6133623f6d45770e342738158249356989b95c", + "sha256:cedf37dde99096e3024ffcd498ee917c2ccf667e04e23d868d481eae2cb84910" ], - "version": "==1.6.0" + "version": "==1.7.0" }, "sphinxcontrib-applehelp": { "hashes": [ diff --git a/pymisp/aping.py b/pymisp/aping.py index f3486c1..4ec63d5 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -12,6 +12,8 @@ import requests from requests.auth import AuthBase import re from uuid import UUID +import warnings +import sys from . import __version__ from .exceptions import MISPServerError, PyMISPUnexpectedResponse, PyMISPNotImplementedYet, PyMISPError, NoURL, NoKey @@ -78,6 +80,9 @@ class ExpandedPyMISP(PyMISP): elif pymisp_version_tup[:3] < recommended_version_tup: logger.warning(f"The version of PyMISP recommended by the MI)SP instance ({response['version']}) is newer than the one you're using now ({__version__}). Please upgrade PyMISP.") + misp_version = self.misp_instance_version + if 'version' in misp_version: + self._misp_version = tuple(int(v) for v in misp_version['version'].split('.')) except Exception as e: raise PyMISPError(f'Unable to connect to MISP ({self.root_url}). Please make sure the API key and the URL are correct (http/https is required): {e}') @@ -149,8 +154,31 @@ class ExpandedPyMISP(PyMISP): def toggle_global_pythonify(self): self.global_pythonify = not self.global_pythonify + def _old_misp(self, minimal_version_required: tuple, removal_date: Union[str, date, datetime], method: str=None, message: str=None): + if self._misp_version >= minimal_version_required: + return False + if isinstance(removal_date, (datetime, date)): + removal_date = removal_date.isoformat() + to_print = f'The instance of MISP you are using is outdated. Unless you update your MISP instance, {method} will stop working after {removal_date}.' + if message: + to_print += f' {message}' + warnings.warn(to_print, DeprecationWarning) + return True + # ## BEGIN Event ## + def events(self, pythonify: bool=False): + events = self._prepare_request('GET', 'events') + events = self._check_response(events, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in events: + return events + to_return = [] + for event in events: + e = MISPEvent() + e.from_dict(**event) + to_return.append(e) + return to_return + def get_event(self, event: Union[MISPEvent, int, str, UUID], pythonify: bool=False): '''Get an event from a MISP instance''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) @@ -240,7 +268,6 @@ class ExpandedPyMISP(PyMISP): def delete_object(self, misp_object: Union[MISPObject, int, str, UUID]): '''Delete an object from a MISP instance''' - # FIXME: MISP doesn't support DELETE on this endpoint object_id = self.__get_uuid_or_id_from_abstract_misp(misp_object) response = self._prepare_request('POST', f'objects/delete/{object_id}') return self._check_response(response, expect_json=True) @@ -296,6 +323,18 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Attribute ### + def attributes(self, pythonify: bool=False): + attributes = self._prepare_request('GET', f'attributes/index') + attributes = self._check_response(attributes, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in attributes: + return attributes + to_return = [] + for attribute in attributes: + a = MISPAttribute() + a.from_dict(**attribute) + to_return.append(a) + return to_return + def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False): '''Get an attribute from a MISP instance''' attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute) @@ -358,6 +397,22 @@ class ExpandedPyMISP(PyMISP): # ## BEGIN Attribute Proposal ### + def attribute_proposals(self, event: Union[MISPEvent, int, str, UUID]=None, pythonify: bool=False): + if event: + event_id = self.__get_uuid_or_id_from_abstract_misp(event) + attribute_proposals = self._prepare_request('GET', f'shadow_attributes/index/{event_id}') + else: + attribute_proposals = self._prepare_request('GET', f'shadow_attributes') + attribute_proposals = self._check_response(attribute_proposals, expect_json=True) + if not (self.global_pythonify or pythonify) or 'errors' in attribute_proposals: + return attribute_proposals + to_return = [] + for attribute_proposal in attribute_proposals: + a = MISPShadowAttribute() + a.from_dict(**attribute_proposal) + to_return.append(a) + return to_return + def get_attribute_proposal(self, proposal: Union[MISPShadowAttribute, int, str, UUID], pythonify: bool=False): proposal_id = self.__get_uuid_or_id_from_abstract_misp(proposal) attribute_proposal = self._prepare_request('GET', f'shadow_attributes/view/{proposal_id}') @@ -373,7 +428,6 @@ class ExpandedPyMISP(PyMISP): def add_attribute_proposal(self, event: Union[MISPEvent, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): '''Propose a new attribute in an event''' event_id = self.__get_uuid_or_id_from_abstract_misp(event) - # FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868 new_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/add/{event_id}', data=attribute) new_attribute_proposal = self._check_response(new_attribute_proposal, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in new_attribute_proposal: @@ -384,9 +438,11 @@ class ExpandedPyMISP(PyMISP): def update_attribute_proposal(self, initial_attribute: Union[MISPAttribute, int, str, UUID], attribute: MISPAttribute, pythonify: bool=False): '''Propose a change for an attribute''' - # FIXME: inconsistency in MISP: https://github.com/MISP/MISP/issues/4857 initial_attribute_id = self.__get_uuid_or_id_from_abstract_misp(initial_attribute) - attribute = {'ShadowAttribute': attribute} + if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name): + # Inconsistency in MISP: https://github.com/MISP/MISP/issues/4857 + # Fix: https://github.com/MISP/MISP/commit/d6a15438f7a53f589ddeabe2b14e65c92baf43d3 + attribute = {'ShadowAttribute': attribute} update_attribute_proposal = self._prepare_request('POST', f'shadow_attributes/edit/{initial_attribute_id}', data=attribute) update_attribute_proposal = self._check_response(update_attribute_proposal, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in update_attribute_proposal: @@ -421,19 +477,28 @@ class ExpandedPyMISP(PyMISP): def sightings(self, misp_entity: AbstractMISP, org: Union[MISPOrganisation, int, str, UUID]=None, pythonify: bool=False): """Get the list of sighting related to a MISPEvent or a MISPAttribute (depending on type of misp_entity)""" - # FIXME: https://github.com/MISP/MISP/issues/4875 if isinstance(misp_entity, MISPEvent): - scope = 'event' + context = 'event' elif isinstance(misp_entity, MISPAttribute): - scope = 'attribute' + context = 'attribute' else: raise PyMISPError('misp_entity can only be a MISPEvent or a MISPAttribute') if org is not None: org_id = self.__get_uuid_or_id_from_abstract_misp(org) - url = f'sightings/listSightings/{misp_entity.id}/{scope}/{org_id}' else: - url = f'sightings/listSightings/{misp_entity.id}/{scope}' - sightings = self._prepare_request('POST', url) + org_id = None + + if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name): + url = f'sightings/listSightings/{misp_entity.id}/{context}' + if org_id: + url = f'{url}/{org_id}' + sightings = self._prepare_request('POST', url) + else: + to_post = {'id': misp_entity.id, 'context': context} + if org_id: + to_post['org_id'] = org_id + sightings = self._prepare_request('POST', 'sightings/listSightings', data=to_post) + sightings = self._check_response(sightings, expect_json=True) if not (self.global_pythonify or pythonify) or 'errors' in sightings: return sightings @@ -926,7 +991,6 @@ class ExpandedPyMISP(PyMISP): def server_pull(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): '''Initialize a pull from a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) - # FIXME: POST & data if event: event_id = self.__get_uuid_or_id_from_abstract_misp(event) url = f'servers/pull/{server_id}/{event_id}' @@ -939,7 +1003,6 @@ class ExpandedPyMISP(PyMISP): def server_push(self, server: Union[MISPServer, int, str, UUID], event: Union[MISPEvent, int, str, UUID]=None): '''Initialize a push to a sync server''' server_id = self.__get_uuid_or_id_from_abstract_misp(server) - # FIXME: POST & data if event: event_id = self.__get_uuid_or_id_from_abstract_misp(event) url = f'servers/push/{server_id}/{event_id}' @@ -970,8 +1033,10 @@ class ExpandedPyMISP(PyMISP): """Add a new sharing group""" sharing_group = self._prepare_request('POST', f'sharing_groups/add', data=sharing_group) sharing_group = self._check_response(sharing_group, expect_json=True) - # FIXME: https://github.com/MISP/MISP/issues/4882 - sharing_group = sharing_group[0] + if self._old_misp((2, 4, 112), '2020-01-01', sys._getframe().f_code.co_name) and isinstance(sharing_group, list): + # https://github.com/MISP/MISP/issues/4882 + # https://github.com/MISP/MISP/commit/d75c6c9e3b7874fd0f083445126743873e5c53c4 + sharing_group = sharing_group[0] if not (self.global_pythonify or pythonify) or 'errors' in sharing_group: return sharing_group s = MISPSharingGroup() @@ -1539,16 +1604,16 @@ class ExpandedPyMISP(PyMISP): response = self._prepare_request('POST', f'events/pushEventToZMQ/{event_id}.json') return self._check_response(response, expect_json=True) - def direct_call(self, url: str, data: dict=None, params: dict={}): + def direct_call(self, url: str, data: dict=None, params: dict={}, kw_params: dict={}): '''Very lightweight call that posts a data blob (python dictionary or json string) on the URL''' if data is None: - response = self._prepare_request('GET', url, params=params) + response = self._prepare_request('GET', url, params=params, kw_params=kw_params) else: - response = self._prepare_request('POST', url, data=data, params=params) + response = self._prepare_request('POST', url, data=data, params=params, kw_params=kw_params) return self._check_response(response, lenient_response_type=True) def freetext(self, event: Union[MISPEvent, int, str, UUID], string: str, adhereToWarninglists: Union[bool, str]=False, - distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False): + distribution: int=None, returnMetaAttributes: bool=False, pythonify: bool=False, **kwargs): """Pass a text to the freetext importer""" event_id = self.__get_uuid_or_id_from_abstract_misp(event) query = {"value": string} @@ -1561,7 +1626,7 @@ class ExpandedPyMISP(PyMISP): query['distribution'] = distribution if returnMetaAttributes: query['returnMetaAttributes'] = returnMetaAttributes - attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query) + attributes = self._prepare_request('POST', f'events/freeTextImport/{event_id}', data=query, **kwargs) attributes = self._check_response(attributes, expect_json=True) if returnMetaAttributes or not (self.global_pythonify or pythonify) or 'errors' in attributes: return attributes @@ -1756,7 +1821,8 @@ class ExpandedPyMISP(PyMISP): def __repr__(self): return f'<{self.__class__.__name__}(url={self.root_url})' - def _prepare_request(self, request_type: str, url: str, data: dict={}, params: dict={}, output_type: str='json'): + def _prepare_request(self, request_type: str, url: str, data: dict={}, params: dict={}, + kw_params: dict={}, output_type: str='json'): '''Prepare a request for python-requests''' url = urljoin(self.root_url, url) if logger.isEnabledFor(logging.DEBUG): @@ -1770,6 +1836,10 @@ class ExpandedPyMISP(PyMISP): data = {k: v for k, v in data.items() if v is not None} data = json.dumps(data, cls=MISPEncode) + if kw_params: + # CakePHP params in URL + to_append_url = '/'.join([f'{k}:{v}' for k, v in kw_params.items()]) + url = f'{url}/{to_append_url}' req = requests.Request(request_type, url, data=data, params=params) with requests.Session() as s: user_agent = 'PyMISP {__version__} - Python {".".join(str(x) for x in sys.version_info[:2])}' diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index df30ee2..06445e8 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -616,9 +616,9 @@ class MISPEvent(AbstractMISP): return misp_shadow_attribute def get_attribute_tag(self, attribute_identifier): - '''Return the tags associated to an attribute or an object attribute. + """Return the tags associated to an attribute or an object attribute. :attribute_identifier: can be an ID, UUID, or the value. - ''' + """ tags = [] for a in self.attributes + [attribute for o in self.objects for attribute in o.attributes]: if ((hasattr(a, 'id') and a.id == attribute_identifier) @@ -629,10 +629,10 @@ class MISPEvent(AbstractMISP): return tags def add_attribute_tag(self, tag, attribute_identifier): - '''Add a tag to an existing attribute, raise an Exception if the attribute doesn't exists. + """Add a tag to an existing attribute, raise an Exception if the attribute doesn't exists. :tag: Tag name as a string, MISPTag instance, or dictionary :attribute_identifier: can be an ID, UUID, or the value. - ''' + """ attributes = [] for a in self.attributes + [attribute for o in self.objects for attribute in o.attributes]: if ((hasattr(a, 'id') and a.id == attribute_identifier) diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 571976e..15d3ebb 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -1152,8 +1152,7 @@ class TestComprehensive(unittest.TestCase): r = self.admin_misp_connector.enable_taxonomy(tax.id) self.assertEqual(r['message'], 'Taxonomy enabled') r = self.admin_misp_connector.enable_taxonomy_tags(tax.id) - # FIXME: https://github.com/MISP/MISP/issues/4865 - # self.assertEqual(r, []) + self.assertEqual(r['name'], 'The tag(s) has been saved.') r = self.admin_misp_connector.disable_taxonomy(tax.id) self.assertEqual(r['message'], 'Taxonomy disabled') @@ -1292,6 +1291,7 @@ class TestComprehensive(unittest.TestCase): second.distribution = Distribution.all_communities try: first = self.user_misp_connector.add_event(first) + second = self.admin_misp_connector.add_event(second, pythonify=True) # Get attribute attribute = self.user_misp_connector.get_attribute(first.attributes[0].id) self.assertEqual(first.attributes[0].uuid, attribute.uuid) @@ -1321,6 +1321,20 @@ class TestComprehensive(unittest.TestCase): # Get attribute proposal temp_new_proposal = self.user_misp_connector.get_attribute_proposal(new_proposal.id) self.assertEqual(temp_new_proposal.uuid, new_proposal.uuid) + # Get attribute proposal*S* + proposals = self.user_misp_connector.attribute_proposals() + self.assertTrue(isinstance(proposals, list)) + self.assertEqual(len(proposals), 3) + self.assertEqual(proposals[0].value, '5.2.3.4') + # Get proposals on a specific event + self.admin_misp_connector.add_attribute_proposal(second.id, {'type': 'ip-src', 'value': '123.123.123.1'}) + proposals = self.admin_misp_connector.attribute_proposals(pythonify=True) + self.assertTrue(isinstance(proposals, list)) + self.assertEqual(len(proposals), 4) + proposals = self.admin_misp_connector.attribute_proposals(second, pythonify=True) + self.assertTrue(isinstance(proposals, list)) + self.assertEqual(len(proposals), 1) + self.assertEqual(proposals[0].value, '123.123.123.1') # Accept attribute proposal - New attribute self.user_misp_connector.accept_attribute_proposal(new_proposal.id) first = self.user_misp_connector.get_event(first.id) @@ -1338,13 +1352,14 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(attribute.to_ids, False) # Test fallback to proposal if the user doesn't own the event - second = self.admin_misp_connector.add_event(second, pythonify=True) - # FIXME: attribute needs to be a complete MISPAttribute: https://github.com/MISP/MISP/issues/4868 prop_attr = MISPAttribute() prop_attr.from_dict(**{'type': 'ip-dst', 'value': '123.43.32.21'}) # Add attribute on event owned by someone else attribute = self.user_misp_connector.add_attribute(second.id, prop_attr) self.assertTrue(isinstance(attribute, MISPShadowAttribute)) + # Test if add proposal without category works - https://github.com/MISP/MISP/issues/4868 + attribute = self.user_misp_connector.add_attribute(second.id, {'type': 'ip-dst', 'value': '123.43.32.22'}) + self.assertTrue(isinstance(attribute, MISPShadowAttribute)) # Add attribute with the same value as an existing proposal prop_attr.uuid = str(uuid4()) attribute = self.admin_misp_connector.add_attribute(second.id, prop_attr, pythonify=True) @@ -1362,6 +1377,17 @@ class TestComprehensive(unittest.TestCase): # Delete attribute owned by user response = self.admin_misp_connector.delete_attribute(second.attributes[1].id) self.assertEqual(response['message'], 'Attribute deleted.') + + # Test attribute*S* + attributes = self.admin_misp_connector.attributes() + self.assertEqual(len(attributes), 5) + # attributes = self.user_misp_connector.attributes() + # self.assertEqual(len(attributes), 5) + # Test event*S* + events = self.admin_misp_connector.events() + self.assertEqual(len(events), 2) + events = self.user_misp_connector.events() + self.assertEqual(len(events), 2) finally: # Delete event self.admin_misp_connector.delete_event(first.id) @@ -1445,11 +1471,9 @@ class TestComprehensive(unittest.TestCase): users_stats = self.admin_misp_connector.users_statistics(context='tags') self.assertEqual(list(users_stats.keys()), ['flatData', 'treemap']) - # FIXME: https://github.com/MISP/MISP/issues/4880 - # users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram') + users_stats = self.admin_misp_connector.users_statistics(context='attributehistogram') + self.assertTrue(isinstance(users_stats, dict)) - # NOTE Not supported yet - # self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) self.user_misp_connector.add_sighting({'value': first.attributes[0].value}) users_stats = self.user_misp_connector.users_statistics(context='sightings') self.assertEqual(list(users_stats.keys()), ['toplist', 'eventids']) @@ -1480,23 +1504,36 @@ class TestComprehensive(unittest.TestCase): try: self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%', force_enable=True) first = self.user_misp_connector.add_event(first) + # disable_background_processing => returns the parsed data, before insertion r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=False, - distribution=2, returnMetaAttributes=False, pythonify=True) + distribution=2, returnMetaAttributes=False, pythonify=True, + kw_params={'disable_background_processing': 1}) self.assertTrue(isinstance(r, list)) self.assertEqual(r[0].value, '1.1.1.1') - - # FIXME: https://github.com/MISP/MISP/issues/4881 - # r_wl = self.user_misp_connector.freetext(first.id, '8.8.8.8 foo@bar.de', adhereToWarninglists=True, - # distribution=2, returnMetaAttributes=False) - # print(r_wl) + r = self.user_misp_connector.freetext(first.id, '9.9.9.9 foo@bar.com', adhereToWarninglists='soft', + distribution=2, returnMetaAttributes=False, pythonify=True, + kw_params={'disable_background_processing': 1}) + self.assertTrue(isinstance(r, list)) + self.assertEqual(r[0].value, '9.9.9.9') + event = self.user_misp_connector.get_event(first.id, pythonify=True) + self.assertEqual(event.attributes[3].value, '9.9.9.9') + self.assertFalse(event.attributes[3].to_ids) + # keep disable_background_processing enabled => returns the same ???? FIXME + r_wl = self.user_misp_connector.freetext(first.id, '8.8.8.8 foo@bar.de', adhereToWarninglists=True, + distribution=2, returnMetaAttributes=False, + kw_params={'disable_background_processing': 0}) + self.assertEqual(r_wl[0].value, '8.8.8.8') + event = self.user_misp_connector.get_event(first.id, pythonify=True) + for attribute in event.attributes: + self.assertFalse(attribute.value == '8.8.8.8') r = self.user_misp_connector.freetext(first.id, '1.1.1.1 foo@bar.de', adhereToWarninglists=True, distribution=2, returnMetaAttributes=True) self.assertTrue(isinstance(r, list)) self.assertTrue(isinstance(r[0]['types'], dict)) + finally: # NOTE: required, or the attributes are inserted *after* the event is deleted # FIXME: https://github.com/MISP/MISP/issues/4886 time.sleep(10) - finally: # Delete event self.admin_misp_connector.delete_event(first.id) @@ -1509,15 +1546,15 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(sharing_group.name, 'Testcases SG') self.assertEqual(sharing_group.releasability, 'Testing') # add org - # FIXME: https://github.com/MISP/MISP/issues/4884 - # r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group.id, - # self.test_org.id, extend=True) + r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group.id, + self.test_org.id, extend=True) + self.assertEqual(r['name'], 'Organisation added to the sharing group.') # delete org # FIXME: https://github.com/MISP/MISP/issues/4884 # r = self.admin_misp_connector.remove_org_from_sharing_group(sharing_group.id, - # self.test_org.id) - + # self.test_org.id) + # self.assertEqual(r['name'], 'Organisation deleted from the sharing group.', r) # Get list sharing_groups = self.admin_misp_connector.sharing_groups(pythonify=True) self.assertTrue(isinstance(sharing_groups, list)) @@ -1568,7 +1605,6 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(botvrij.url, "http://www.botvrij.eu/data/feed-osint") # Enable # MISP OSINT - print(feeds[0].id) feed = self.admin_misp_connector.enable_feed(feeds[0].id, pythonify=True) self.assertTrue(feed.enabled) feed = self.admin_misp_connector.enable_feed_cache(feeds[0].id, pythonify=True) @@ -1622,8 +1658,8 @@ class TestComprehensive(unittest.TestCase): servers = self.admin_misp_connector.servers(pythonify=True) self.assertEqual(servers[0].name, 'Updated name') # Delete - server = self.admin_misp_connector.delete_server(server.id) - # FIXME: https://github.com/MISP/MISP/issues/4889 + r = self.admin_misp_connector.delete_server(server.id) + self.assertEqual(r['name'], 'Server deleted') @unittest.skipIf(sys.version_info < (3, 6), 'Not supported on python < 3.6') def test_expansion(self):