|
|
|
@ -1,10 +1,10 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import requests |
|
|
|
|
import logging |
|
|
|
|
import base64 |
|
|
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('pyintel471') |
|
|
|
@ -33,56 +33,83 @@ class PyIntel471: |
|
|
|
|
# TODO? handle the status codes |
|
|
|
|
return s.send(prepped) |
|
|
|
|
|
|
|
|
|
def detailled_filters(self, text: str=None, ipAddress: str=None, url: str=None, contactInfoEmail: str=None, |
|
|
|
|
post: str=None, privateMessage: str=None, actor: str=None, entity: str=None, forum: str=None, |
|
|
|
|
ioc: str=None, report: str=None, reportTag: str=None, reportLocation: str=None, reportAdmiraltyCode: str=None, |
|
|
|
|
event: str=None, indicator: str=None, yara: str=None, nids: str=None, malwareReport: str=None, eventType: str=None, |
|
|
|
|
indicatorType: str=None, nidsType: str=None, threatType: str=None, threatUid: str=None, malwareFamily: str=None, |
|
|
|
|
malwareFamilyProfileUid: str=None, confidence: str=None, intelRequirement: str=None): |
|
|
|
|
def url_parameters(self, createdFrom: datetime=None, createdUntil: datetime=None, lastUpdatedFrom: datetime=None, lastUpdatedUntil: datetime=None, |
|
|
|
|
sort: str='relevance', offset: int=0, count: int=10, prettyPrint: bool=True, responseFormat: str=None): |
|
|
|
|
'''Create a string with the parameters to append to the URL. |
|
|
|
|
|
|
|
|
|
:param createdFrom:Search reports starting from given creation time (including). Object field: created. Empty indicates unbounded. |
|
|
|
|
:param createdUntil:Search reports starting from given creation time (including). Object field: created. Empty indicates unbounded. |
|
|
|
|
:param lastUpdatedFrom: Search results starting from given last updated time (including). Empty indicates unbounded. |
|
|
|
|
:param lastUpdatedUntil: Search results ending before given last updated time (excluding). Empty indicates unbounded. |
|
|
|
|
:param sort: Sort results by relevance or an object native time. Allowed values: "relevance" (default), "earliest" or "latest" |
|
|
|
|
:param offset: Skip leading number of records. Default: 0 |
|
|
|
|
:param count: Returns given number of records starting from offset position. Default value: 10. Size range: 0-100 |
|
|
|
|
:param prettyPrint: Formats output json in human-readable form if present. Empty indicates Json. Allowed values: "csv". |
|
|
|
|
''' |
|
|
|
|
f = locals() |
|
|
|
|
f.pop('self') |
|
|
|
|
# Some parameters have to be renamed due to reserved words in Python. |
|
|
|
|
if createdFrom: |
|
|
|
|
f['from'] = f.pop('createdFrom') |
|
|
|
|
if createdUntil: |
|
|
|
|
f['until'] = f.pop('createdUntil') |
|
|
|
|
f['format'] = f.pop('responseFormat') |
|
|
|
|
return self.__prepare_url_path(f) |
|
|
|
|
|
|
|
|
|
def search_filters(self, text: str=None, ipAddress: str=None, url: str=None, contactInfoEmail: str=None, |
|
|
|
|
post: str=None, privateMessage: str=None, actor: str=None, entity: str=None, forum: str=None, |
|
|
|
|
ioc: str=None, report: str=None, reportTag: str=None, reportLocation: str=None, reportAdmiraltyCode: str=None, |
|
|
|
|
event: str=None, indicator: str=None, yara: str=None, nids: str=None, malwareReport: str=None, eventType: str=None, |
|
|
|
|
indicatorType: str=None, nidsType: str=None, threatType: str=None, threatUid: str=None, malwareFamily: str=None, |
|
|
|
|
malwareFamilyProfileUid: str=None, confidence: str=None, intelRequirement: str=None): |
|
|
|
|
|
|
|
|
|
'''Returns selection of results matching filter criteria. |
|
|
|
|
|
|
|
|
|
:param text: Search text everywhere |
|
|
|
|
:param ipAddress: IP address search |
|
|
|
|
:param url: URL search |
|
|
|
|
:param contactInfoEmail: E-mail address search |
|
|
|
|
:param post: Forum post search |
|
|
|
|
:param privateMessage: Forum private message search |
|
|
|
|
:param actor: Actor search |
|
|
|
|
:param entity: Entity search |
|
|
|
|
:param forum: Search posts in specific forum |
|
|
|
|
:param ioc: Indicators of compromise search |
|
|
|
|
:param report: Report search |
|
|
|
|
:param reportTag: Search reports by tag |
|
|
|
|
:param reportLocation: Search reports by location |
|
|
|
|
:param reportAdmiraltyCode: Search reports by admiralty code |
|
|
|
|
:param event: Free text event search |
|
|
|
|
:param indicator: Free text indicator search |
|
|
|
|
:param yara: Free text YARAs search |
|
|
|
|
:param nids: Free text NIDS search |
|
|
|
|
:param malwareReport: Free text malware reports search |
|
|
|
|
:param eventType: Search events by type |
|
|
|
|
:param indicatorType: Search indicators by type |
|
|
|
|
:param nidsType: Search NIDS by type |
|
|
|
|
:param threatType: Search events, indicators, YARAs, NIDS and malware reports by threat type. |
|
|
|
|
:param threatUid: Search events, indicators, YARAs, NIDS and malware reports by threat uid. |
|
|
|
|
:param malwareFamily: Search events, indicators, YARAs, NIDS and malware reports by malware family |
|
|
|
|
:param malwareFamilyProfileUid: Search events, indicators, YARAs, NIDS and malware reports by malware family profile UID |
|
|
|
|
''' |
|
|
|
|
f = locals() |
|
|
|
|
f.pop('self') |
|
|
|
|
return self.prepare_filters(f) |
|
|
|
|
return self.__prepare_url_path(f) |
|
|
|
|
|
|
|
|
|
def prepare_filters(self, filters: dict): |
|
|
|
|
'''filters example: {'url': 'injectsview.com', 'contactInfoEmail': 'santinosunny1@gmail.com'}''' |
|
|
|
|
authorized_filter_types = ['text', 'ipAddress', 'url', 'contactInfoEmail', 'post', 'privateMessage', 'actor', 'entity', 'forum', 'ioc', 'report', 'reportTag', |
|
|
|
|
'reportLocation', 'reportAdmiraltyCode', 'event', 'indicator', 'yara', 'nids', 'malwareReport', 'eventType', 'indicatorType', |
|
|
|
|
'nidsType', 'threatType', 'threatUid', 'malwareFamily', 'malwareFamilyProfileUid', 'confidence', 'intelRequirement'] |
|
|
|
|
to_return = '' |
|
|
|
|
for f, value in filters.items(): |
|
|
|
|
if f not in authorized_filter_types: |
|
|
|
|
raise Exception('filter_type ({}) can only be in {}'.format(f, ', '.join(authorized_filter_types))) |
|
|
|
|
if value is not None: |
|
|
|
|
to_return += f'{f}={value}&' |
|
|
|
|
if not to_return: |
|
|
|
|
raise Exception('You have to pass at least one filter.') |
|
|
|
|
return to_return |
|
|
|
|
def __prepare_url_path(self, params: dict): |
|
|
|
|
to_return = [] |
|
|
|
|
for key, value in params.items(): |
|
|
|
|
if value is None: |
|
|
|
|
continue |
|
|
|
|
if isinstance(value, datetime): |
|
|
|
|
# Date and time entries are UNIX timestamp multiplied by 1000 |
|
|
|
|
value = int(value.timestamp() * 1000) |
|
|
|
|
to_return.append(f'{key}={value}') |
|
|
|
|
return '&'.join(to_return) |
|
|
|
|
|
|
|
|
|
def search(self, prepared_filters: str, created_from: int=None, created_until: int=None, last_updated_from: int=None, last_updated_until: int=None, |
|
|
|
|
sort: str='relevance', offset: int=0, count: int=10, pretty_print: bool=True, response_format: str=None): |
|
|
|
|
url_path = prepared_filters |
|
|
|
|
if created_from is not None: |
|
|
|
|
url_path += f'from={created_from}&' |
|
|
|
|
if created_until is not None: |
|
|
|
|
url_path += f'until={created_until}&' |
|
|
|
|
if last_updated_from is not None: |
|
|
|
|
url_path += f'lastUpdatedFrom={last_updated_from}&' |
|
|
|
|
if last_updated_until is not None: |
|
|
|
|
url_path += f'lastUpdatedUntil={last_updated_until}&' |
|
|
|
|
if sort: |
|
|
|
|
if sort not in ['relevance', 'earliest', 'latest']: |
|
|
|
|
raise Exception('sort ({}) can only be in {}'.format(sort, ', '.join(['relevance', 'earliest', 'latest']))) |
|
|
|
|
url_path += f'sort={sort}&' |
|
|
|
|
if offset is not None: |
|
|
|
|
url_path += f'offset={offset}&' |
|
|
|
|
if count is not None: |
|
|
|
|
if not (0 <= count <= 100): |
|
|
|
|
raise Exception(f'count ({count}) has to be between 0 and 100') |
|
|
|
|
url_path += f'count={count}&' |
|
|
|
|
if pretty_print: |
|
|
|
|
url_path += f'prettyPrint={pretty_print}&' |
|
|
|
|
if response_format: |
|
|
|
|
url_path += f'format={response_format}&' |
|
|
|
|
url_path = url_path.rstrip('&') |
|
|
|
|
full_url = f'https://api.intel471.com/v1/search?{url_path}' |
|
|
|
|
def search(self, filters: str, parameters: str=None): |
|
|
|
|
if parameters is None: |
|
|
|
|
full_url = f'https://api.intel471.com/v1/search?{filters}' |
|
|
|
|
else: |
|
|
|
|
full_url = f'https://api.intel471.com/v1/search?{filters}&{parameters}' |
|
|
|
|
return self._prepare_request('GET', full_url) |
|
|
|
|