diff --git a/pyintel471/api.py b/pyintel471/api.py index 15fd28e..244b495 100644 --- a/pyintel471/api.py +++ b/pyintel471/api.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- - import requests import logging import base64 +from datetime import datetime logger = logging.getLogger('pyintel471') @@ -33,56 +33,83 @@ class PyIntel471: # TODO? handle the status codes return s.send(prepped) - def detailled_filters(self, text: str=None, ipAddress: str=None, url: str=None, contactInfoEmail: str=None, - post: str=None, privateMessage: str=None, actor: str=None, entity: str=None, forum: str=None, - ioc: str=None, report: str=None, reportTag: str=None, reportLocation: str=None, reportAdmiraltyCode: str=None, - event: str=None, indicator: str=None, yara: str=None, nids: str=None, malwareReport: str=None, eventType: str=None, - indicatorType: str=None, nidsType: str=None, threatType: str=None, threatUid: str=None, malwareFamily: str=None, - malwareFamilyProfileUid: str=None, confidence: str=None, intelRequirement: str=None): + def url_parameters(self, createdFrom: datetime=None, createdUntil: datetime=None, lastUpdatedFrom: datetime=None, lastUpdatedUntil: datetime=None, + sort: str='relevance', offset: int=0, count: int=10, prettyPrint: bool=True, responseFormat: str=None): + '''Create a string with the parameters to append to the URL. + + :param createdFrom:Search reports starting from given creation time (including). Object field: created. Empty indicates unbounded. + :param createdUntil:Search reports starting from given creation time (including). Object field: created. Empty indicates unbounded. + :param lastUpdatedFrom: Search results starting from given last updated time (including). Empty indicates unbounded. + :param lastUpdatedUntil: Search results ending before given last updated time (excluding). Empty indicates unbounded. + :param sort: Sort results by relevance or an object native time. Allowed values: "relevance" (default), "earliest" or "latest" + :param offset: Skip leading number of records. Default: 0 + :param count: Returns given number of records starting from offset position. Default value: 10. Size range: 0-100 + :param prettyPrint: Formats output json in human-readable form if present. Empty indicates Json. Allowed values: "csv". + ''' f = locals() f.pop('self') - return self.prepare_filters(f) + # Some parameters have to be renamed due to reserved words in Python. + if createdFrom: + f['from'] = f.pop('createdFrom') + if createdUntil: + f['until'] = f.pop('createdUntil') + f['format'] = f.pop('responseFormat') + return self.__prepare_url_path(f) - def prepare_filters(self, filters: dict): - '''filters example: {'url': 'injectsview.com', 'contactInfoEmail': 'santinosunny1@gmail.com'}''' - authorized_filter_types = ['text', 'ipAddress', 'url', 'contactInfoEmail', 'post', 'privateMessage', 'actor', 'entity', 'forum', 'ioc', 'report', 'reportTag', - 'reportLocation', 'reportAdmiraltyCode', 'event', 'indicator', 'yara', 'nids', 'malwareReport', 'eventType', 'indicatorType', - 'nidsType', 'threatType', 'threatUid', 'malwareFamily', 'malwareFamilyProfileUid', 'confidence', 'intelRequirement'] - to_return = '' - for f, value in filters.items(): - if f not in authorized_filter_types: - raise Exception('filter_type ({}) can only be in {}'.format(f, ', '.join(authorized_filter_types))) - if value is not None: - to_return += f'{f}={value}&' - if not to_return: - raise Exception('You have to pass at least one filter.') - return to_return + def search_filters(self, text: str=None, ipAddress: str=None, url: str=None, contactInfoEmail: str=None, + post: str=None, privateMessage: str=None, actor: str=None, entity: str=None, forum: str=None, + ioc: str=None, report: str=None, reportTag: str=None, reportLocation: str=None, reportAdmiraltyCode: str=None, + event: str=None, indicator: str=None, yara: str=None, nids: str=None, malwareReport: str=None, eventType: str=None, + indicatorType: str=None, nidsType: str=None, threatType: str=None, threatUid: str=None, malwareFamily: str=None, + malwareFamilyProfileUid: str=None, confidence: str=None, intelRequirement: str=None): - def search(self, prepared_filters: str, created_from: int=None, created_until: int=None, last_updated_from: int=None, last_updated_until: int=None, - sort: str='relevance', offset: int=0, count: int=10, pretty_print: bool=True, response_format: str=None): - url_path = prepared_filters - if created_from is not None: - url_path += f'from={created_from}&' - if created_until is not None: - url_path += f'until={created_until}&' - if last_updated_from is not None: - url_path += f'lastUpdatedFrom={last_updated_from}&' - if last_updated_until is not None: - url_path += f'lastUpdatedUntil={last_updated_until}&' - if sort: - if sort not in ['relevance', 'earliest', 'latest']: - raise Exception('sort ({}) can only be in {}'.format(sort, ', '.join(['relevance', 'earliest', 'latest']))) - url_path += f'sort={sort}&' - if offset is not None: - url_path += f'offset={offset}&' - if count is not None: - if not (0 <= count <= 100): - raise Exception(f'count ({count}) has to be between 0 and 100') - url_path += f'count={count}&' - if pretty_print: - url_path += f'prettyPrint={pretty_print}&' - if response_format: - url_path += f'format={response_format}&' - url_path = url_path.rstrip('&') - full_url = f'https://api.intel471.com/v1/search?{url_path}' + '''Returns selection of results matching filter criteria. + + :param text: Search text everywhere + :param ipAddress: IP address search + :param url: URL search + :param contactInfoEmail: E-mail address search + :param post: Forum post search + :param privateMessage: Forum private message search + :param actor: Actor search + :param entity: Entity search + :param forum: Search posts in specific forum + :param ioc: Indicators of compromise search + :param report: Report search + :param reportTag: Search reports by tag + :param reportLocation: Search reports by location + :param reportAdmiraltyCode: Search reports by admiralty code + :param event: Free text event search + :param indicator: Free text indicator search + :param yara: Free text YARAs search + :param nids: Free text NIDS search + :param malwareReport: Free text malware reports search + :param eventType: Search events by type + :param indicatorType: Search indicators by type + :param nidsType: Search NIDS by type + :param threatType: Search events, indicators, YARAs, NIDS and malware reports by threat type. + :param threatUid: Search events, indicators, YARAs, NIDS and malware reports by threat uid. + :param malwareFamily: Search events, indicators, YARAs, NIDS and malware reports by malware family + :param malwareFamilyProfileUid: Search events, indicators, YARAs, NIDS and malware reports by malware family profile UID + ''' + f = locals() + f.pop('self') + return self.__prepare_url_path(f) + + def __prepare_url_path(self, params: dict): + to_return = [] + for key, value in params.items(): + if value is None: + continue + if isinstance(value, datetime): + # Date and time entries are UNIX timestamp multiplied by 1000 + value = int(value.timestamp() * 1000) + to_return.append(f'{key}={value}') + return '&'.join(to_return) + + def search(self, filters: str, parameters: str=None): + if parameters is None: + full_url = f'https://api.intel471.com/v1/search?{filters}' + else: + full_url = f'https://api.intel471.com/v1/search?{filters}&{parameters}' return self._prepare_request('GET', full_url) diff --git a/tests/test.py b/tests/test.py index f745aa0..8047f21 100644 --- a/tests/test.py +++ b/tests/test.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import unittest import json +from datetime import datetime from pyintel471 import PyIntel471 from .keys import email, authkey @@ -13,6 +14,7 @@ class TestBasic(unittest.TestCase): self.intel = PyIntel471(email, authkey) def test_search(self): - f = self.intel.detailled_filters(malwareFamily='lokibot') - r = self.intel.search(f) + f = self.intel.search_filters(malwareFamily='lokibot') + p = self.intel.url_parameters(createdFrom=datetime(2018, 11, 11), createdUntil=datetime(2018, 11, 12)) + r = self.intel.search(filters=f, parameters=p) print(json.dumps(r.json(), indent=2))