Merge remote-tracking branch 'upstream/master'

pull/312/head
Steve Clement 2018-11-24 15:03:23 +09:00
commit 54fa1771a7
14 changed files with 654 additions and 64 deletions

View File

@ -26,7 +26,37 @@ pip3 install pymisp
```
git clone https://github.com/MISP/PyMISP.git && cd PyMISP
git submodule update --init
pip3 install -I .
pip3 install -I .[fileobjects,neo,openioc,virustotal]
```
## Installing it with virtualenv
It is recommended to use virtualenv to not polute your OS python envirenment.
```
pip3 install virtualenv
git clone https://github.com/MISP/PyMISP.git && cd PyMISP
python3 -m venv ./
source venv/bin/activate
git submodule update --init
pip3 install -I .[fileobjects,neo,openioc,virustotal]
```
## Running the tests
```bash
pip3 install -U nose pip setuptools coveralls codecov requests-mock
pip3 install git+https://github.com/kbandla/pydeep.git
git clone https://github.com/viper-framework/viper-test-files.git tests/viper-test-files
nosetests-3.4 --with-coverage --cover-package=pymisp,tests --cover-tests tests/test_*.py
```
If you have a MISP instance to test against, you can also run the live ones:
**Note**: You need to update the key in `tests/testlive_comprehensive.py` to the automation key of your admin account.
```bash
nosetests-3.4 --with-coverage --cover-package=pymisp,tests --cover-tests tests/testlive_comprehensive.py
```
## Samples and how to use PyMISP

View File

@ -47,7 +47,7 @@
"\n",
"\n",
"```bash\n",
"pip install jupyter\n",
"pip3 install jupyter\n",
"cd docs/tutorial\n",
"jupyter-notebook\n",
"```"

View File

@ -100,6 +100,19 @@
"print(\"Event id: %s\" % event.id)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"event = misp_old.new_event(distribution=1,\n",
" threat_level_id=1,\n",
" analysis=1,\n",
" info=\"Event from notebook\")\n",
"print(\"Event id: %s\" % event['Event']['id'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -143,6 +156,17 @@
"print(event)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Fetch by ID\n",
"event = misp_old.get_event(event_id)\n",
"print(event)"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -297,6 +321,15 @@
" print(event['id'], ':', event['info'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results[0]"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -387,17 +420,17 @@
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080/'\n",
"#misp_url = 'http://127.0.0.1:8080/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"#misp_key = 'BSip0zVadeFDeolkX2g7MHx8mrlr0uE04hh6CQj0'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False\n",
"#misp_verifycert = False\n",
"\n",
"from pymisp import PyMISP\n",
"\n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert)\n",
"misp.direct_call('attributes/add/2167', {'type': 'ip-dst', 'value': '8.8.8.8'})"
"misp.direct_call('attributes/add/58', {'type': 'ip-dst', 'value': '8.11.8.8'})"
]
},
{
@ -427,12 +460,12 @@
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080/'\n",
"#misp_url = 'http://127.0.0.1:8080/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"#misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False\n",
"#misp_verifycert = False\n",
"\n",
"from pymisp import PyMISP\n",
"\n",

View File

@ -0,0 +1,447 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080'\n",
"# Can be found in the MISP web interface under ||\n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'LBelWqKY9SQyG0huZzAMqiEBl6FODxpgRRXMsZFu'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Getting the API key (automatically generated on the trainig VM)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"api_file = Path('apikey')\n",
"if api_file.exists():\n",
" misp_url = 'http://127.0.0.1'\n",
" misp_verifycert = False\n",
" with open(api_file) as f:\n",
" misp_key = f.read().strip()\n",
" print(misp_key)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Initialize PyMISP - NG"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pymisp import ExpandedPyMISP\n",
"\n",
"misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Index Search (fast, only returns events metadata)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search unpublished events\n",
"\n",
"**WARNING**: By default, the search query will only return all the events listed on teh index page"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(published=False)\n",
"print(r)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Get the meta data of events"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(eventid=[17217, 1717, 1721, 17218])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search Tag & mix with other parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tags=['tlp:white'], pythonify=True)\n",
"for e in r:\n",
" print(e)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag='TODO:VT-ENRICHMENT', published=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag=['!TODO:VT-ENRICHMENT', 'tlp:white'], published=False) # ! means \"not this tag\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Full text search on event info field"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(eventinfo='circl')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search by org"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(org='CIRCL')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search updated events"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(timestamp='1h')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Search full events (Slower, returns full events)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Getting timestamps"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime, date, timedelta\n",
"from dateutil.parser import parse\n",
"\n",
"int(datetime.now().timestamp())\n",
"\n",
"d = parse('2018-03-24')\n",
"int(d.timestamp())\n",
"\n",
"today = int(datetime.today().timestamp())\n",
"yesterday = int((datetime.today() - timedelta(days=1)).timestamp())\n",
"\n",
"print(today, yesterday)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"complex_query = misp.build_complex_query(or_parameters=['uibo.lembit@mail.ee', '103.195.185.222'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value=complex_query, pythonify=True)\n",
"print(r)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(category='Payload delivery')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='uibo.lembit@mail.ee', metadata=True, pythonify=True) # no attributes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(timestamp=['2h', '1h'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', enforceWarninglist=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', deleted=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', publish_timestamp=1521846000) # everything published since that timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', last='1d') # everything published in the last <interval>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', timestamp=[yesterday, today]) # everything updated since that timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(value='8.8.8.8', withAttachments=True) # Return attachments"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Search for attributes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', value='8.8.8.9')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', value='wrapper.no', event_timestamp='5d') # only consider events updated since this timestamp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"r"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Because reason"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tag_to_remove = 'foo'\n",
"\n",
"events = misp.search(tags=tag_to_remove, pythonify=True)\n",
"\n",
"for event in events:\n",
" for tag in event.tags:\n",
" if tag.name == tag_to_remove:\n",
" print(f'Got {tag_to_remove} in {event.info}')\n",
" misp.untag(event.uuid, tag_to_remove)\n",
" break\n",
" for attribute in event.attributes:\n",
" for tag in attribute.tags:\n",
" if tag.name == tag_to_remove:\n",
" print(f'Got {tag_to_remove} in {attribute.value}')\n",
" misp.untag(attribute.uuid, tag_to_remove)\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logs = misp.search_logs(model='Tag', title='tlp:white')\n",
"print(logs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logs = misp.search_logs(model='Event', pythonify=True)\n",
"#print(logs)\n",
"for l in logs:\n",
" print(l.title)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"log = misp.search_logs(model='Tag', title=tag_to_remove)[0]\n",
"roles = misp.get_roles_list()\n",
"for r in roles:\n",
" if r['Role']['name'] == 'User':\n",
" new_role = r['Role']['id']\n",
" break\n",
"user = misp.get_user(log['Log']['user_id'])\n",
"user['User']['role_id'] = new_role\n",
"misp.edit_user(user['User']['id'], **user['User'])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -10,7 +10,7 @@
"misp_url = 'http://127.0.0.1:8080'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'BSip0zVadeFDeolkX2g7MHx8mrlr0uE04hh6CQj0'\n",
"misp_key = 'LBelWqKY9SQyG0huZzAMqiEBl6FODxpgRRXMsZFu'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False"
]
@ -52,9 +52,9 @@
"metadata": {},
"outputs": [],
"source": [
"from pymisp import ExpandedPyMISP\n",
"from pymisp import PyMISP\n",
"\n",
"misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
"misp = PyMISP(misp_url, misp_key, misp_verifycert, debug=False)"
]
},
{
@ -112,7 +112,9 @@
"metadata": {},
"outputs": [],
"source": [
"r = misp.search_index(tag='TODO:VT-ENRICHMENT')"
"r = misp.search_index(tags=['tlp:white'])\n",
"for e in r:\n",
" print(e)"
]
},
{
@ -347,7 +349,7 @@
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', values='8.8.8.8')"
"r = misp.search(controller='attributes', value='8.8.8.9')"
]
},
{
@ -356,7 +358,7 @@
"metadata": {},
"outputs": [],
"source": [
"r = misp.search(controller='attributes', values='wrapper.no', event_timestamp='5d') # only consider events updated since this timestamp"
"r = misp.search(controller='attributes', value='wrapper.no', event_timestamp='5d') # only consider events updated since this timestamp"
]
},
{
@ -399,6 +401,28 @@
" break"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logs = misp.search_logs(model='Tag', title='tlp:white')\n",
"print(logs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"logs = misp.search_logs(model='Event', pythonify=True)\n",
"#print(logs)\n",
"for l in logs:\n",
" print(l.title)"
]
},
{
"cell_type": "code",
"execution_count": null,

View File

@ -17,13 +17,13 @@
"metadata": {},
"outputs": [],
"source": [
"from pymisp import PyMISP, MISPEvent, MISPAttribute\n",
"from pymisp import ExpandedPyMISP, MISPEvent, MISPAttribute\n",
"\n",
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'yB8DMS8LkfYYpcVX8bN2v7xwDZDMp4bpW0sNqNGj'\n",
"misp_key = 'LBelWqKY9SQyG0huZzAMqiEBl6FODxpgRRXMsZFu'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False"
]
@ -65,7 +65,7 @@
"metadata": {},
"outputs": [],
"source": [
"misp = PyMISP(misp_url, misp_key, misp_verifycert)"
"misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)"
]
},
{
@ -87,14 +87,14 @@
},
"outputs": [],
"source": [
"response = misp.search(last='1d')\n",
"response = misp.search(publish_timestamp='2h')\n",
"\n",
"events = []\n",
"for event in response['response']:\n",
" me = MISPEvent()\n",
" me.load(event)\n",
" events.append(me)\n",
"\n",
" \n",
"for e in events:\n",
" print(e)"
]
@ -105,13 +105,7 @@
"metadata": {},
"outputs": [],
"source": [
"response = misp.search(last=['3d', '2d'])\n",
"\n",
"events = []\n",
"for event in response['response']:\n",
" me = MISPEvent()\n",
" me.load(event)\n",
" events.append(me)\n",
"events = misp.search(publish_timestamp=['120m', '100m'], pythonify=True)\n",
"\n",
"for e in events:\n",
" print(e)"
@ -161,22 +155,13 @@
"metadata": {},
"outputs": [],
"source": [
"misp = PyMISP(misp_url, misp_key, misp_verifycert, debug=True)\n",
"\n",
"from datetime import datetime\n",
"ts = int(datetime.now().timestamp())\n",
"\n",
"response = misp.search(timestamp=[ts-3600, ts])\n",
"\n",
"events = []\n",
"for event in response['response']:\n",
" me = MISPEvent()\n",
" me.load(event)\n",
" events.append(me)\n",
"events = misp.search(timestamp=[ts-3600, ts], pythonify=True)\n",
"\n",
"for e in events:\n",
" print(e)\n",
" \n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert) # TODO: remove when fixed"
" print(e)"
]
},
{
@ -194,7 +179,7 @@
"metadata": {},
"outputs": [],
"source": [
"response = misp.search(controller='attributes', last='1h')\n",
"response = misp.search(controller='attributes', publish_timestamp='1h')\n",
"\n",
"attributes = []\n",
"for attribute in response['response']['Attribute']:\n",
@ -212,7 +197,7 @@
"metadata": {},
"outputs": [],
"source": [
"response = misp.search(controller='attributes', last=['2h', '1h'])\n",
"response = misp.search(controller='attributes', publish_timestamp=['2h', '1h'])\n",
"\n",
"attributes = []\n",
"for attribute in response['response']['Attribute']:\n",
@ -360,13 +345,8 @@
"metadata": {},
"outputs": [],
"source": [
"response = misp.search(values=['59.157.4.2', 'hotfixmsupload.com'])\n",
"\n",
"events = []\n",
"for event in response['response']:\n",
" me = MISPEvent()\n",
" me.load(event)\n",
" events.append(me)\n",
"complex_query = misp.build_complex_query(or_parameters=['59.157.4.2', 'hotfixmsupload.com'])\n",
"events = misp.search(value=complex_query, pythonify=True)\n",
"\n",
"for e in events:\n",
" print(e)"

View File

@ -35,7 +35,7 @@ try:
from .exceptions import PyMISPError, NewEventError, NewAttributeError, MissingDependency, NoURL, NoKey, InvalidMISPObject, UnknownMISPObjectTemplate, PyMISPInvalidFormat, MISPServerError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse # noqa
from .api import PyMISP # noqa
from .abstract import AbstractMISP, MISPEncode, MISPTag, Distribution, ThreatLevel, Analysis # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting # noqa
from .mispevent import MISPEvent, MISPAttribute, MISPObjectReference, MISPObjectAttribute, MISPObject, MISPUser, MISPOrganisation, MISPSighting, MISPLog # noqa
from .tools import AbstractMISPObjectGenerator # noqa
from .tools import Neo4j # noqa
from .tools import stix # noqa

View File

@ -1532,7 +1532,7 @@ class PyMISP(object):
"""Get the existing sharing groups"""
url = urljoin(self.root_url, 'sharing_groups.json')
response = self._prepare_request('GET', url)
return self._check_response(response)['response']
return self._check_response(response)
# ############## Users ##################

View File

@ -2,7 +2,8 @@
# -*- coding: utf-8 -*-
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet
from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute, MISPSighting
from .api import PyMISP, everything_broken
from .mispevent import MISPEvent, MISPAttribute, MISPSighting, MISPLog
from typing import TypeVar, Optional, Tuple, List, Dict
from datetime import date, datetime
import json
@ -263,7 +264,7 @@ class ExpandedPyMISP(PyMISP):
:param requested_attributes: [CSV only] Select the fields that you wish to include in the CSV export. By setting event level fields additionally, includeContext is not required to get event metadata.
:param include_context: [CSV Only] Include the event data with each attribute.
:param headerless: [CSV Only] The CSV created when this setting is set to true will not contain the header row.
:param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
Deprecated:
@ -346,7 +347,6 @@ class ExpandedPyMISP(PyMISP):
query['requested_attributes'] = requested_attributes
query['includeContext'] = include_context
query['headerless'] = headerless
url = urljoin(self.root_url, f'{controller}/restSearch')
# Remove None values.
# TODO: put that in self._prepare_request
@ -393,7 +393,7 @@ class ExpandedPyMISP(PyMISP):
action: Optional[str]=None, user_id: Optional[int]=None,
change: Optional[str]=None, email: Optional[str]=None,
org: Optional[str]=None, description: Optional[str]=None,
ip: Optional[str]=None):
ip: Optional[str]=None, pythonify: Optional[bool]=False):
'''Search in logs
Note: to run substring queries simply append/prepend/encapsulate the search term with %
@ -411,9 +411,11 @@ class ExpandedPyMISP(PyMISP):
:param org: Organisation of the User doing the action
:param description: Description of the action
:param ip: Origination IP of the User doing the action
:param pythonify: Returns a list of PyMISP Objects instead or the plain json output. Warning: it might use a lot of RAM
'''
query = locals()
query.pop('self')
query.pop('pythonify')
if log_id is not None:
query['id'] = query.pop('log_id')
@ -423,4 +425,69 @@ class ExpandedPyMISP(PyMISP):
query = {k: v for k, v in query.items() if v is not None}
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
return normalized_response
if not pythonify:
return normalized_response
to_return = []
for l in normalized_response:
ml = MISPLog()
ml.from_dict(**l['Log'])
to_return.append(ml)
return to_return
def search_index(self, published: Optional[bool]=None, eventid: Optional[SearchType]=None,
tags: Optional[SearchParameterTypes]=None,
date_from: Optional[DateTypes]=None,
date_to: Optional[DateTypes]=None,
eventinfo: Optional[str]=None,
threatlevel: Optional[List[SearchType]]=None,
distribution: Optional[List[SearchType]]=None,
analysis: Optional[List[SearchType]]=None,
org: Optional[SearchParameterTypes]=None,
timestamp: Optional[DateInterval]=None,
pythonify: Optional[bool]=None):
"""Search only at the index level. Using ! in front of a value means NOT (default is OR)
:param published: Set whether published or unpublished events should be returned. Do not set the parameter if you want both.
:param eventid: The events that should be included / excluded from the search
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param eventinfo: Filter on the event's info field.
:param threatlevel: Threat level(s) (1,2,3,4) | list
:param distribution: Distribution level(s) (0,1,2,3) | list
:param analysis: Analysis level(s) (0,1,2) | list
:param org: Search by the creator organisation by supplying the organisation identifier.
:param timestamp: Restrict the results by the timestamp (last edit). Any event with a timestamp newer than the given timestamp will be returned. In case you are dealing with /attributes as scope, the attribute's timestamp will be used for the lookup.
:param pythonify: Returns a list of PyMISP Objects instead or the plain json output. Warning: it might use a lot of RAM
"""
query = locals()
query.pop('self')
query.pop('pythonify')
if query.get('date_from'):
query['datefrom'] = self.make_timestamp(query.pop('date_from'))
if query.get('date_to'):
query['dateuntil'] = self.make_timestamp(query.pop('date_to'))
if query.get('timestamp') is not None:
timestamp = query.pop('timestamp')
if isinstance(timestamp, (list, tuple)):
query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1]))
else:
query['timestamp'] = self.make_timestamp(timestamp)
url = urljoin(self.root_url, 'events/index')
# Remove None values.
# TODO: put that in self._prepare_request
query = {k: v for k, v in query.items() if v is not None}
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if not pythonify:
return normalized_response
to_return = []
for e_meta in normalized_response:
me = MISPEvent()
me.from_dict(**e_meta)
to_return.append(me)
return to_return

@ -1 +1 @@
Subproject commit 6e03108fb104ae90617701aa5d0749cb932c821f
Subproject commit 7fe77c02affc0abe14cc67fe9f14400e8b72561c

View File

@ -820,6 +820,15 @@ class MISPFeed(AbstractMISP):
super(MISPFeed, self).__init__()
class MISPLog(AbstractMISP):
def __init__(self):
super(MISPLog, self).__init__()
def __repr__(self):
return '<{self.__class__.__name__}({self.model}, {self.action}, {self.title})'.format(self=self)
class MISPSighting(AbstractMISP):
def __init__(self):

View File

@ -1,5 +1,4 @@
{
"response": [
[
{
"SharingGroup": {
"id": "1",
@ -96,5 +95,4 @@
],
"editable": true
}
]
}
]

View File

@ -112,7 +112,8 @@ class TestOffline(unittest.TestCase):
self.initURI(m)
pymisp = PyMISP(self.domain, self.key)
sharing_groups = pymisp.get_sharing_groups()
self.assertEqual(sharing_groups[0], self.sharing_groups['response'][0])
print(sharing_groups)
self.assertEqual(sharing_groups['response'][0], self.sharing_groups[0])
def test_auth_error(self, m):
self.initURI(m)

View File

@ -14,7 +14,7 @@ try:
except ImportError as e:
print(e)
url = 'http://localhost:8080'
key = 'y0rs3LNOP0Y3v6dfSMMdhxj5Oxx9MfaInpRP2pBC'
key = 'BSip0zVadeFDeolkX2g7MHx8mrlr0uE04hh6CQj0'
from uuid import uuid4
@ -920,8 +920,9 @@ class TestComprehensive(unittest.TestCase):
first = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
r = self.admin_misp_connector.pushEventToZMQ(first.id)
self.assertEqual(r['message'], 'Event published to ZMQ')
if not travis_run:
r = self.admin_misp_connector.pushEventToZMQ(first.id)
self.assertEqual(r['message'], 'Event published to ZMQ')
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)