From c0c163b47769dbc3c78c91fe5ada32837b43126c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Mon, 12 Nov 2018 14:13:05 +0100 Subject: [PATCH] new: Initial commit, only supports search --- .gitignore | 4 ++ pyintel471/__init__.py | 4 ++ pyintel471/api.py | 88 ++++++++++++++++++++++++++++++++++++++++++ setup.py | 27 +++++++++++++ tests/__init__.py | 0 tests/test.py | 18 +++++++++ 6 files changed, 141 insertions(+) create mode 100644 pyintel471/__init__.py create mode 100644 pyintel471/api.py create mode 100644 setup.py create mode 100644 tests/__init__.py create mode 100644 tests/test.py diff --git a/.gitignore b/.gitignore index 894a44c..bd9d865 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,7 @@ venv.bak/ # mypy .mypy_cache/ + + +# Perso +tests/keys.py diff --git a/pyintel471/__init__.py b/pyintel471/__init__.py new file mode 100644 index 0000000..7eaffcf --- /dev/null +++ b/pyintel471/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from .api import PyIntel471 diff --git a/pyintel471/api.py b/pyintel471/api.py new file mode 100644 index 0000000..15fd28e --- /dev/null +++ b/pyintel471/api.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + + +import requests +import logging +import base64 + + +logger = logging.getLogger('pyintel471') +logger.setLevel(logging.DEBUG) + + +class PyIntel471: + + def __init__(self, email: str, authkey: str): + self.auth = base64.b64encode(f'{email}:{authkey}'.encode()).decode() + + def _prepare_request(self, request_type: str, url: str, data=None): + if logger.isEnabledFor(logging.DEBUG): + logger.debug('{} - {}'.format(request_type, url)) + if data is not None: + logger.debug(data) + if data is None: + req = requests.Request(request_type, url) + else: + req = requests.Request(request_type, url, data=data) + with requests.Session() as s: + prepped = s.prepare_request(req) + prepped.headers.update({'Authorization': f'Basic {self.auth}'}) + if logger.isEnabledFor(logging.DEBUG): + logger.debug(prepped.headers) + # TODO? handle the status codes + return s.send(prepped) + + def detailled_filters(self, text: str=None, ipAddress: str=None, url: str=None, contactInfoEmail: str=None, + post: str=None, privateMessage: str=None, actor: str=None, entity: str=None, forum: str=None, + ioc: str=None, report: str=None, reportTag: str=None, reportLocation: str=None, reportAdmiraltyCode: str=None, + event: str=None, indicator: str=None, yara: str=None, nids: str=None, malwareReport: str=None, eventType: str=None, + indicatorType: str=None, nidsType: str=None, threatType: str=None, threatUid: str=None, malwareFamily: str=None, + malwareFamilyProfileUid: str=None, confidence: str=None, intelRequirement: str=None): + f = locals() + f.pop('self') + return self.prepare_filters(f) + + def prepare_filters(self, filters: dict): + '''filters example: {'url': 'injectsview.com', 'contactInfoEmail': 'santinosunny1@gmail.com'}''' + authorized_filter_types = ['text', 'ipAddress', 'url', 'contactInfoEmail', 'post', 'privateMessage', 'actor', 'entity', 'forum', 'ioc', 'report', 'reportTag', + 'reportLocation', 'reportAdmiraltyCode', 'event', 'indicator', 'yara', 'nids', 'malwareReport', 'eventType', 'indicatorType', + 'nidsType', 'threatType', 'threatUid', 'malwareFamily', 'malwareFamilyProfileUid', 'confidence', 'intelRequirement'] + to_return = '' + for f, value in filters.items(): + if f not in authorized_filter_types: + raise Exception('filter_type ({}) can only be in {}'.format(f, ', '.join(authorized_filter_types))) + if value is not None: + to_return += f'{f}={value}&' + if not to_return: + raise Exception('You have to pass at least one filter.') + return to_return + + def search(self, prepared_filters: str, created_from: int=None, created_until: int=None, last_updated_from: int=None, last_updated_until: int=None, + sort: str='relevance', offset: int=0, count: int=10, pretty_print: bool=True, response_format: str=None): + url_path = prepared_filters + if created_from is not None: + url_path += f'from={created_from}&' + if created_until is not None: + url_path += f'until={created_until}&' + if last_updated_from is not None: + url_path += f'lastUpdatedFrom={last_updated_from}&' + if last_updated_until is not None: + url_path += f'lastUpdatedUntil={last_updated_until}&' + if sort: + if sort not in ['relevance', 'earliest', 'latest']: + raise Exception('sort ({}) can only be in {}'.format(sort, ', '.join(['relevance', 'earliest', 'latest']))) + url_path += f'sort={sort}&' + if offset is not None: + url_path += f'offset={offset}&' + if count is not None: + if not (0 <= count <= 100): + raise Exception(f'count ({count}) has to be between 0 and 100') + url_path += f'count={count}&' + if pretty_print: + url_path += f'prettyPrint={pretty_print}&' + if response_format: + url_path += f'format={response_format}&' + url_path = url_path.rstrip('&') + full_url = f'https://api.intel471.com/v1/search?{url_path}' + return self._prepare_request('GET', full_url) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..24bbb7b --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from setuptools import setup + + +setup( + name='pyintel471', + version='0.1', + author='Raphaël Vinot', + author_email='raphael.vinot@circl.lu', + maintainer='Raphaël Vinot', + url='https://github.com/MISP/PyIntel471', + description='Python client for Intel471', + packages=['pyintel471'], + classifiers=[ + 'License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)', + 'Development Status :: 3 - Alpha', + 'Environment :: Console', + 'Operating System :: POSIX :: Linux', + 'Intended Audience :: Science/Research', + 'Intended Audience :: Telecommunications Industry', + 'Intended Audience :: Information Technology', + 'Programming Language :: Python :: 3', + 'Topic :: Security', + 'Topic :: Internet', + ] +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..f745aa0 --- /dev/null +++ b/tests/test.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import unittest +import json + +from pyintel471 import PyIntel471 +from .keys import email, authkey + + +class TestBasic(unittest.TestCase): + + def setUp(self): + self.intel = PyIntel471(email, authkey) + + def test_search(self): + f = self.intel.detailled_filters(malwareFamily='lokibot') + r = self.intel.search(f) + print(json.dumps(r.json(), indent=2))