diff --git a/pymisp/api.py b/pymisp/api.py index 294c679..3011ec0 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -1473,43 +1473,43 @@ class PyMISP(object): response = self._prepare_request('POST', url) return self._check_response(response) - def sighting_search(self, context='', async_callback=None, **kwargs): + def search_sightings(self, context='', async_callback=None, **kwargs): """Search sightings via the REST API :context: The context of the search, could be attribute, event or False - :param id: ID of the attribute or event if context is specified - :param type: Type of the sighting - :param from: From date - :param to: To date - :param last: Last published sighting (e.g. 5m, 3h, 7d) + :param context_id: ID of the attribute or event if context is specified + :param type_sighting: Type of the sighting + :param date_from: From date + :param date_to: To date + :param publish_timestamp: Last published sighting (e.g. 5m, 3h, 7d) :param org_id: The org_id :param source: The source of the sighting - :param includeAttribute: Should the result include attribute data - :param includeEvent: Should the result include event data + :param include_attribute: Should the result include attribute data + :param include_event: Should the result include event data :param async_callback: The function to run when results are returned :Example: - >>> misp.sighting_search({'last': '30d'}) # search sightings for the last 30 days on the instance + >>> misp.search_sightings({'publish_timestamp': '30d'}) # search sightings for the last 30 days on the instance [ ... ] - >>> misp.sighting_search('attribute', {'id': 6, 'includeAttribute': 1}) # return list of sighting for attribute 6 along with the attribute itself + >>> misp.search_sightings('attribute', {'id': 6, 'include_attribute': 1}) # return list of sighting for attribute 6 along with the attribute itself [ ... ] - >>> misp.sighting_search('event', {'id': 17, 'includeEvent': 1, 'org_id': 2}) # return list of sighting for event 17 filtered with org id 2 + >>> misp.search_sightings('event', {'id': 17, 'include_event': 1, 'org_id': 2}) # return list of sighting for event 17 filtered with org id 2 """ if context not in ['', 'attribute', 'event']: raise Exception('Context parameter must be empty, "attribute" or "event"') query = {} # Sighting: array('id', 'type', 'from', 'to', 'last', 'org_id', 'includeAttribute', 'includeEvent'); query['returnFormat'] = kwargs.pop('returnFormat', 'json') - query['id'] = kwargs.pop('id', None) - query['type'] = kwargs.pop('type', None) - query['from'] = kwargs.pop('from', None) - query['to'] = kwargs.pop('to', None) - query['last'] = kwargs.pop('last', None) + query['id'] = kwargs.pop('context_id', None) + query['type'] = kwargs.pop('type_sighting', None) + query['from'] = kwargs.pop('date_from', None) + query['to'] = kwargs.pop('date_to', None) + query['last'] = kwargs.pop('publish_timestamp', None) query['org_id'] = kwargs.pop('org_id', None) query['source'] = kwargs.pop('source', None) - query['includeAttribute'] = kwargs.pop('includeAttribute', None) - query['includeEvent'] = kwargs.pop('includeEvent', None) - + query['includeAttribute'] = kwargs.pop('include_attribute', None) + query['includeEvent'] = kwargs.pop('include_event', None) + # Cleanup query = {k: v for k, v in query.items() if v is not None} @@ -1518,7 +1518,7 @@ class PyMISP(object): # Create a session, make it async if and only if we have a callback controller = 'sightings' - return self.__query('restSearch/'+context, query, controller, async_callback) + return self.__query('restSearch/' + context, query, controller, async_callback) # ############## Sharing Groups ################## diff --git a/pymisp/aping.py b/pymisp/aping.py index 954099e..8cb855e 100644 --- a/pymisp/aping.py +++ b/pymisp/aping.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse -from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute +from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet +from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute, MISPSighting from typing import TypeVar, Optional, Tuple, List, Dict from datetime import date, datetime import json @@ -116,8 +116,94 @@ class ExpandedPyMISP(PyMISP): a.from_dict(**updated_attribute) return a + def search_sightings(self, context: Optional[str]=None, + context_id: Optional[SearchType]=None, + type_sighting: Optional[str]=None, + date_from: Optional[DateTypes]=None, + date_to: Optional[DateTypes]=None, + publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None, + org: Optional[SearchType]=None, + source: Optional[str]=None, + include_attribute: Optional[bool]=None, + include_event_meta: Optional[bool]=None, + pythonify: Optional[bool]=False + ): + '''Search sightings + + :param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes). + :param context_id: Only relevant if context is either "attribute" or "event". Then it is the relevant ID. + :param type_sighting: Type of sighting + :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. + :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. + :param publish_timestamp: Restrict the results by the last publish timestamp (newer than). + :param org: Search by the creator organisation by supplying the organisation identifier. + :param source: Source of the sighting + :param include_attribute: Include the attribute. + :param include_event_meta: Include the meta information of the event. + + Deprecated: + + :param last: synonym for publish_timestamp + + :Example: + + >>> misp.search_sightings(publish_timestamp='30d') # search sightings for the last 30 days on the instance + [ ... ] + >>> misp.search_sightings(context='attribute', context_id=6, include_attribute=True) # return list of sighting for attribute 6 along with the attribute itself + [ ... ] + >>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2 + ''' + query = {'returnFormat': 'json'} + if context is not None: + if context not in ['attribute', 'event']: + raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event']))) + url_path = f'sightings/restSearch/{context}' + else: + url_path = 'sightings/restSearch' + query['id'] = context_id + query['type'] = type_sighting + query['from'] = date_from + query['to'] = date_to + query['last'] = publish_timestamp + query['org_id'] = org + query['source'] = source + query['includeAttribute'] = include_attribute + query['includeEvent'] = include_event_meta + + url = urljoin(self.root_url, url_path) + # Remove None values. + # TODO: put that in self._prepare_request + query = {k: v for k, v in query.items() if v is not None} + response = self._prepare_request('POST', url, data=json.dumps(query)) + normalized_response = self._check_response(response) + if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and + normalized_response.get('errors')): + return normalized_response + elif pythonify: + to_return = [] + for s in normalized_response: + entries = {} + s_data = s['Sighting'] + if include_event_meta: + e = s_data.pop('Event') + me = MISPEvent() + me.from_dict(**e) + entries['event'] = me + if include_attribute: + a = s_data.pop('Attribute') + ma = MISPAttribute() + ma.from_dict(**a) + entries['attribute'] = ma + ms = MISPSighting() + ms.from_dict(**s_data) + entries['sighting'] = ms + to_return.append(entries) + return to_return + else: + return normalized_response + def search(self, controller: str='events', return_format: str='json', - page: Optional[int]=None, limit: Optional[int]=None, + limit: Optional[int]=None, page: Optional[int]=None, value: Optional[SearchParameterTypes]=None, type_attribute: Optional[SearchParameterTypes]=None, category: Optional[SearchParameterTypes]=None, @@ -141,21 +227,22 @@ class ExpandedPyMISP(PyMISP): sg_reference_only: Optional[bool]=None, eventinfo: Optional[str]=None, searchall: Optional[bool]=None, + requested_attributes: Optional[str]=None, include_context: Optional[bool]=None, includeContext: Optional[bool]=None, + headerless: Optional[bool]=None, pythonify: Optional[bool]=False, **kwargs): - ''' - Search in the MISP instance + '''Search in the MISP instance :param returnFormat: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload. - :param page: Page number (depending on the limit). - :param limit: Upper limit of elements to return per page. + :param limit: Limit the number of results returned, depending on the scope (for example 10 attributes or 10 full events). + :param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300). :param value: Search for the given value in the attributes' value field. :param type_attribute: The attribute type, any valid MISP attribute type is accepted. :param category: The attribute category, any valid MISP attribute category is accepted. :param org: Search by the creator organisation by supplying the organisation identifier. :param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query` - :param quickfilter: If set it makes the search ignore all of the other arguments, except for the auth key and value. MISP will return all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value fields, or in the attribute comment. + :param quickfilter: Enabling this (by passing "1" as the argument) will make the search ignore all of the other arguments, except for the auth key and value. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment. :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. :param eventid: The events that should be included / excluded from the search @@ -173,7 +260,9 @@ class ExpandedPyMISP(PyMISP): :param sg_reference_only: If this flag is set, sharing group objects will not be included, instead only the sharing group ID is set. :param eventinfo: Filter on the event's info field. :param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields. + :param requested_attributes: [CSV only] Select the fields that you wish to include in the CSV export. By setting event level fields additionally, includeContext is not required to get event metadata. :param include_context: [CSV Only] Include the event data with each attribute. + :param headerless: [CSV Only] The CSV created when this setting is set to true will not contain the header row. :param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM Deprecated: @@ -188,7 +277,7 @@ class ExpandedPyMISP(PyMISP): return_formats = ['openioc', 'json', 'xml', 'suricata', 'snort', 'text', 'rpz', 'csv', 'cache'] - if controller not in ['events', 'attributes', 'objects']: + if controller not in ['events', 'attributes', 'objects', 'sightings']: raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects']))) # Deprecated stuff / synonyms @@ -206,38 +295,25 @@ class ExpandedPyMISP(PyMISP): # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. # They are passed as-is. query = kwargs - if return_format is not None: - if return_format not in return_formats: - raise ValueError('return_format has to be in {}'.format(', '.join(return_formats))) - query['returnFormat'] = return_format - if page is not None: - query['page'] = page - if limit is not None: - query['limit'] = limit - if value is not None: - query['value'] = value - if type_attribute is not None: - query['type'] = type_attribute - if category is not None: - query['category'] = category - if org is not None: - query['org'] = org - if tags is not None: - query['tags'] = tags - if quickfilter is not None: - query['quickfilter'] = quickfilter - if date_from is not None: - query['from'] = self.make_timestamp(date_from) - if date_to is not None: - query['to'] = self.make_timestamp(date_to) - if eventid is not None: - query['eventid'] = eventid - if with_attachments is not None: - query['withAttachments'] = with_attachments - if metadata is not None: - query['metadata'] = metadata - if uuid is not None: - query['uuid'] = uuid + + if return_format not in return_formats: + raise ValueError('return_format has to be in {}'.format(', '.join(return_formats))) + query['returnFormat'] = return_format + + query['page'] = page + query['limit'] = limit + query['value'] = value + query['type'] = type_attribute + query['category'] = category + query['org'] = org + query['tags'] = tags + query['quickfilter'] = quickfilter + query['from'] = self.make_timestamp(date_from) + query['to'] = self.make_timestamp(date_to) + query['eventid'] = eventid + query['withAttachments'] = with_attachments + query['metadata'] = metadata + query['uuid'] = uuid if publish_timestamp is not None: if isinstance(publish_timestamp, (list, tuple)): query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1])) @@ -248,36 +324,33 @@ class ExpandedPyMISP(PyMISP): query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1])) else: query['timestamp'] = self.make_timestamp(timestamp) - if published is not None: - query['published'] = published - if enforce_warninglist is not None: - query['enforceWarninglist'] = enforce_warninglist + query['published'] = published + query['enforceWarninglist'] = enforce_warninglist if to_ids is not None: if str(to_ids) not in ['0', '1', 'exclude']: raise ValueError('to_ids has to be in {}'.format(', '.join(['0', '1', 'exclude']))) query['to_ids'] = to_ids - if deleted is not None: - query['deleted'] = deleted - if include_event_uuid is not None: - query['includeEventUuid'] = include_event_uuid + query['deleted'] = deleted + query['includeEventUuid'] = include_event_uuid if event_timestamp is not None: if isinstance(event_timestamp, (list, tuple)): query['event_timestamp'] = (self.make_timestamp(event_timestamp[0]), self.make_timestamp(event_timestamp[1])) else: query['event_timestamp'] = self.make_timestamp(event_timestamp) - if sg_reference_only is not None: - query['sgReferenceOnly'] = sg_reference_only - if eventinfo is not None: - query['eventinfo'] = eventinfo - if searchall is not None: - query['searchall'] = searchall - if include_context is not None: - query['includeContext'] = include_context + query['sgReferenceOnly'] = sg_reference_only + query['eventinfo'] = eventinfo + query['searchall'] = searchall + query['requested_attributes'] = requested_attributes + query['includeContext'] = include_context + query['headerless'] = headerless url = urljoin(self.root_url, f'{controller}/restSearch') + # Remove None values. + # TODO: put that in self._prepare_request + query = {k: v for k, v in query.items() if v is not None} response = self._prepare_request('POST', url, data=json.dumps(query)) normalized_response = self._check_response(response) - if return_format == 'csv' and pythonify: + if return_format == 'csv' and pythonify and not headerless: return self._csv_to_dict(normalized_response) elif isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and normalized_response.get('errors')): @@ -301,87 +374,6 @@ class ExpandedPyMISP(PyMISP): else: return normalized_response - def get_csv(self, - eventid: Optional[SearchType]=None, - ignore: Optional[bool]=None, - tags: Optional[SearchParameterTypes]=None, - category: Optional[SearchParameterTypes]=None, - type_attribute: Optional[SearchParameterTypes]=None, - include_context: Optional[bool]=None, includeContext: Optional[bool]=None, - date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None, - publish_timestamp: Optional[DateInterval]=None, # converted internally to last (consistent with search) - headerless: Optional[bool]=None, - enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None, - pythonify: Optional[bool]=False, - **kwargs): - ''' - Get MISP data in CSV format. - - :param eventid: Restrict the download to a single event - :param ignore: If true, the response includes attributes without the to_ids flag - :param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query` - :param category: The attribute category, any valid MISP attribute category is accepted. - :param type_attribute: The attribute type, any valid MISP attribute type is accepted. - :param include_context: Include the event data with each attribute. - :param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event. - :param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event. - :param publish_timestamp: Events published within the last x amount of time. This filter will use the published timestamp of the event. - :param headerless: The CSV created when this setting is set to true will not contain the header row. - :param enforceWarninglist: All attributes that have a hit on a warninglist will be excluded. - :param pythonify: Returns a list of dictionaries instead of the plain CSV - ''' - - # Deprecated stuff / synonyms - if includeContext is not None: - include_context = includeContext - if enforceWarninglist is not None: - enforce_warninglist = enforceWarninglist - - # Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized. - # They are passed as-is. - query = kwargs - if eventid is not None: - query['eventid'] = eventid - if ignore is not None: - query['ignore'] = ignore - if tags is not None: - query['tags'] = tags - if category is not None: - query['category'] = category - if type_attribute is not None: - query['type'] = type_attribute - if include_context is not None: - query['includeContext'] = include_context - if date_from is not None: - query['from'] = self.make_timestamp(date_from) - if date_to is not None: - query['to'] = self.make_timestamp(date_to) - if publish_timestamp is not None: - if isinstance(publish_timestamp, (list, tuple)): - query['last'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1])) - else: - query['last'] = self.make_timestamp(publish_timestamp) - if headerless is not None: - query['headerless'] = headerless - if enforce_warninglist is not None: - query['enforceWarninglist'] = enforce_warninglist - - url = urljoin(self.root_url, '/events/csv/download/') - response = self._prepare_request('POST', url, data=json.dumps(query)) - normalized_response = self._check_response(response) - if isinstance(normalized_response, str): - if pythonify and not headerless: - return self._csv_to_dict(normalized_response) - - return normalized_response - elif isinstance(normalized_response, dict): - # The server returned a dictionary, it contains the error message. - logger.critical(f'The server should have returned a CSV file as text. instead it returned an error message:\n{normalized_response}') - return normalized_response - else: - # Should not happen... - raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}') - def _csv_to_dict(self, csv_content): '''Makes a list of dict out of a csv file (requires headers)''' fieldnames, lines = csv_content.split('\n', 1) diff --git a/pymisp/data/misp-objects b/pymisp/data/misp-objects index 141a0c8..6e03108 160000 --- a/pymisp/data/misp-objects +++ b/pymisp/data/misp-objects @@ -1 +1 @@ -Subproject commit 141a0c8d4152c1be5d9872ee70d888cd63c737d5 +Subproject commit 6e03108fb104ae90617701aa5d0749cb932c821f diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index 0a48361..1991ef0 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -18,7 +18,7 @@ except ImportError as e: from uuid import uuid4 -travis_run = True +travis_run = False class TestComprehensive(unittest.TestCase): @@ -516,8 +516,10 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(events[0].id, first.id) # quickfilter - events = self.user_misp_connector.search(timestamp=timeframe, quickfilter='%bar%', pythonify=True) + events = self.user_misp_connector.search(timestamp=timeframe, + quickfilter='%bar%', pythonify=True) # FIXME: should return one event + # print(events) # self.assertEqual(len(events), 1) # self.assertEqual(events[0].id, second.id) @@ -567,13 +569,10 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(len(events), 1) # searchall - # FIXME: searchall doesn't seem to do anything - # second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment') - # second = self.user_misp_connector.update_event(second) - # events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True) - # self.assertEqual(len(events), 1) - # events = self.user_misp_connector.search(value='stuff', searchall=True, pythonify=True) - # self.assertEqual(len(events), 1) + second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment') + second = self.user_misp_connector.update_event(second) + events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True) + self.assertEqual(len(events), 1) # warninglist self.admin_misp_connector.update_warninglists() @@ -586,19 +585,19 @@ class TestComprehensive(unittest.TestCase): events = self.user_misp_connector.search(eventid=second.id, pythonify=True) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) - self.assertEqual(len(events[0].attributes), 4) + self.assertEqual(len(events[0].attributes), 5) events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False, pythonify=True) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) - self.assertEqual(len(events[0].attributes), 4) + self.assertEqual(len(events[0].attributes), 5) if not travis_run: - # FIXME: This is fialing on travis for no discernable reason... + # FIXME: This is failing on travis for no discernable reason... events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True, pythonify=True) self.assertEqual(len(events), 1) self.assertEqual(events[0].id, second.id) - self.assertEqual(len(events[0].attributes), 2) + self.assertEqual(len(events[0].attributes), 3) response = self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%') # disable ipv4 DNS. self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'}) @@ -607,7 +606,7 @@ class TestComprehensive(unittest.TestCase): self.assertEqual(len(attributes), 3) attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True) - self.assertEqual(len(attributes), 1) + self.assertEqual(len(attributes), 2) time.sleep(1) # make sure the next attribute is added one at least one second later @@ -644,74 +643,54 @@ class TestComprehensive(unittest.TestCase): # Delete event self.admin_misp_connector.delete_event(first.id) - def test_get_csv(self): + def test_sightings(self): first = self.create_simple_event() second = self.create_simple_event() - second.info = 'foo blah' - second.set_date('2018-09-01') - second.add_attribute('ip-src', '8.8.8.8') try: - first.attributes[0].comment = 'This is the original comment' first = self.user_misp_connector.add_event(first) - response = self.user_misp_connector.fast_publish(first.id, alert=False) - self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.') - - # default search, all attributes with to_ids == False - self.admin_misp_connector.fast_publish(first.id, alert=False) - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True) - # FIXME: Should not return anything (to_ids is False) - # self.assertEqual(len(csv), 0) - - # Also export attributes with to_ids set to false - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, ignore=True, pythonify=True) - self.assertEqual(len(csv), 1) - - # Default search, attribute with to_ids == True - first.attributes[0].to_ids = True - first = self.user_misp_connector.update_event(first) - self.admin_misp_connector.fast_publish(first.id, alert=False) - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True) - self.assertEqual(len(csv), 1) - self.assertEqual(csv[0]['value'], first.attributes[0].value) - - # eventid - csv = self.user_misp_connector.get_csv(eventid=first.id, pythonify=True) - self.assertEqual(len(csv), 1) - self.assertEqual(csv[0]['value'], first.attributes[0].value) - - # category - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True) - self.assertEqual(len(csv), 1) - self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True) - self.assertEqual(len(csv), 0) - - # type_attribute - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True) - self.assertEqual(len(csv), 1) - self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True) - self.assertEqual(len(csv), 0) - - # context - csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True) - self.assertEqual(len(csv), 1) - # print(csv[0]) - # FIXME: there is no context. - - # date_from date_to second = self.user_misp_connector.add_event(second) - csv = self.user_misp_connector.get_csv(date_from=date.today().isoformat(), pythonify=True) - self.assertEqual(len(csv), 1) - self.assertEqual(csv[0]['value'], first.attributes[0].value) - csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', pythonify=True) - self.assertEqual(len(csv), 2) - # headerless - csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', headerless=True) - # FIXME: The header is here. - # print(csv) + current_ts = int(time.time()) + self.user_misp_connector.sighting(value=first.attributes[0].value) + self.user_misp_connector.sighting(value=second.attributes[0].value, + source='Testcases', + type='1') + s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, include_attribute=True, + include_event_meta=True, pythonify=True) + self.assertEqual(len(s), 2) + self.assertEqual(s[0]['event'].id, first.id) + self.assertEqual(s[0]['attribute'].id, first.attributes[0].id) + + s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, + source='Testcases', + include_attribute=True, + include_event_meta=True, + pythonify=True) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['event'].id, second.id) + self.assertEqual(s[0]['attribute'].id, second.attributes[0].id) + + s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, + type_sighting='1', + include_attribute=True, + include_event_meta=True, + pythonify=True) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['event'].id, second.id) + self.assertEqual(s[0]['attribute'].id, second.attributes[0].id) + + s = self.user_misp_connector.search_sightings(context='event', + context_id=first.id, + pythonify=True) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['sighting'].event_id, str(first.id)) + + s = self.user_misp_connector.search_sightings(context='attribute', + context_id=second.attributes[0].id, + pythonify=True) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['sighting'].attribute_id, str(second.attributes[0].id)) finally: # Delete event self.admin_misp_connector.delete_event(first.id) @@ -725,8 +704,8 @@ class TestComprehensive(unittest.TestCase): second.set_date('2018-09-01') second.add_attribute('ip-src', '8.8.8.8') try: - second = self.user_misp_connector.add_event(second) first = self.user_misp_connector.add_event(first) + second = self.user_misp_connector.add_event(second) response = self.user_misp_connector.fast_publish(first.id, alert=False) self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.') @@ -774,6 +753,21 @@ class TestComprehensive(unittest.TestCase): csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', headerless=True) # FIXME: The header is here. # print(csv) + # Expects 2 lines after removing the empty ones. + # self.assertEqual(len(csv.strip().split('\n')), 2) + + # include_context + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True, pythonify=True) + event_context_keys = ['event_info', 'event_member_org', 'event_source_org', 'event_distribution', 'event_threat_level_id', 'event_analysis', 'event_date', 'event_tag', 'event_timestamp'] + for k in event_context_keys: + self.assertTrue(k in csv[0]) + + # requested_attributes + columns = ['value', 'event_id'] + csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns, pythonify=True) + self.assertEqual(len(csv[0].keys()), 2) + for k in columns: + self.assertTrue(k in csv[0]) finally: # Delete event