Merge remote-tracking branch 'upstream/master'

pull/312/head
Steve Clement 2018-11-06 10:41:53 +09:00
commit d37b340d39
37 changed files with 634 additions and 240 deletions

5
.gitignore vendored
View File

@ -3,8 +3,11 @@
*.pyc
examples/keys.py
examples/cudeso.py
examples/feed-generator/output/*.json
examples/feed-generator/output/*\.json
examples/feed-generator/output/hashes\.csv
examples/feed-generator/settings\.py
build/*
dist/*
pymisp.egg-info/*
.idea

View File

@ -2,6 +2,97 @@ Changelog
=========
v2.4.96 (2018-10-12)
--------------------
New
~~~
- [freedFromRedis] try to create an object/attribute out of the incoming
data even if not added with the helper. [Sami Mokaddem]
- Direct_call without data means GET. [Raphaël Vinot]
- Add direct call to just post data on a URL. [Raphaël Vinot]
- Tests for update modules. [Raphaël Vinot]
- Tests for upload_sample. [Raphaël Vinot]
- Add more test cases. [Raphaël Vinot]
- Update warninglists. [Raphaël Vinot]
- Add test for warninglists. [Raphaël Vinot]
- Toggle warning list, add test case. [Raphaël Vinot]
- Add lots of test cases, find lots of bugs. [Raphaël Vinot]
- Use new CSV interface, add test cases. [Raphaël Vinot]
Changes
~~~~~~~
- Bump version. [Raphaël Vinot]
- Bump misp-objects. [Raphaël Vinot]
- Allow to pass a json string to direct_call. [Raphaël Vinot]
- More test cases. [Raphaël Vinot]
- Update order parameters & doc. [Raphaël Vinot]
- Add an extra IP from the warninglists. [Raphaël Vinot]
- Test for event UUID in attribute. [Raphaël Vinot]
Fix
~~~
- Prevent checking length on a integer. [Sami Mokaddem]
- Direct call & add example. [Raphaël Vinot]
- Disable test for travis, take 2. [Raphaël Vinot]
- Disable test for travis. [Raphaël Vinot]
- Skip tests that fail on travis for no reason... [Raphaël Vinot]
- Tentative to fix tests on travis. [Raphaël Vinot]
- Disable test warning lists. Enabling is not deterministic. [Raphaël
Vinot]
- Use proper dependency (enum34) [Raphaël Vinot]
- Make travis happy again. [Raphaël Vinot]
- Python2 support. [Raphaël Vinot]
Fix #274
Other
~~~~~
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #284 from mokaddem/fixFeedGenerator. [Sami
Mokaddem]
fix: prevent checking length on a integer
- Merge pull request #283 from mokaddem/updateFromRedis. [Raphaël Vinot]
new: [freedFromRedis] try to create an object/attribute out of the in…
- Merge branch 'IFX-CDC-master' [Raphaël Vinot]
- Fixed leaked taxonomy tags problem. [netjinho]
- Added some getters and setters for taxonomies, warninglists,
noticelists and tags & documentation. [netjinho]
- Merge branch 'netjinho-master' [Raphaël Vinot]
- Merge branch 'master' of https://github.com/netjinho/PyMISP into
netjinho-master. [Raphaël Vinot]
- Added update_galaxies and update_taxonomies. [netjinho]
- Merge branch 'DragonDev1906-master' [Raphaël Vinot]
- Merge branch 'master' of
https://github.com/DragonDev1906/PyMISP_upload_sample into
DragonDev1906-master. [Raphaël Vinot]
- Add: Advanced Extraction to upload_sample. [root]
- Add: update noticelists and object templates. [Raphaël Vinot]
- Add: Add __eq__ to AbstractMISP. [Raphaël Vinot]
Allow to discard duplicate tags.
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Add: more test cases. [Raphaël Vinot]
- Merge branch 'master' of github.com:MISP/PyMISP. [Raphaël Vinot]
- Merge pull request #277 from GOVCERT-LU/pypi_fixes. [Raphaël Vinot]
- Add description from README.md as long-description -> displayed on …
- Fix invalid py2 keyword. [Georges Toth]
- - Add description from README.md as long-description -> displayed on
pypi. - Add project related URLs to be displayed on pypi. [Georges
Toth]
v2.4.95.1 (2018-09-06)
----------------------
Changes
~~~~~~~
- Bump changelog. [Raphaël Vinot]
v2.4.95 (2018-09-06)
--------------------

View File

@ -328,6 +328,80 @@
"print('Event ID', event_id)\n",
"print(response)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Direct call, no validation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False\n",
"\n",
"from pymisp import PyMISP\n",
"\n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert)\n",
"misp.direct_call('attributes/add/2167', {'type': 'ip-dst', 'value': '8.8.8.8'})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False\n",
"\n",
"from pymisp import PyMISP\n",
"\n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert)\n",
"misp.direct_call('attributes/add/2167', '{\"type\": \"ip-dst\", \"value\": \"8.8.8.9\"}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The URL of the MISP instance to connect to\n",
"misp_url = 'http://127.0.0.1:8080/'\n",
"# Can be found in the MISP web interface under \n",
"# http://+MISP_URL+/users/view/me -> Authkey\n",
"misp_key = 'fk5BodCZw8owbscW8pQ4ykMASLeJ4NYhuAbshNjo'\n",
"# Should PyMISP verify the MISP certificate\n",
"misp_verifycert = False\n",
"\n",
"from pymisp import PyMISP\n",
"\n",
"misp = PyMISP(misp_url, misp_key, misp_verifycert)\n",
"misp.direct_call('events')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
@ -346,7 +420,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
"version": "3.6.5"
}
},
"nbformat": 4,

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json', debug=True)
return PyMISP(url, key, misp_verifycert, 'json', debug=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add an attribute to an event')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add a new user by setting the mandory fields.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add the user described in the given json. If no file is provided, returns a json listing all the fields used to describe a user.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json', debug=True)
return PyMISP(url, key, misp_verifycert, 'json', debug=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create an event on MISP.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Delete the user with the given id. Keep in mind that disabling users (by setting the disabled flag via an edit) is always prefered to keep user associations to events intact.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Edit the email of the user designed by the user_id.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Edit the user designed by the user_id. If no file is provided, returns a json listing all the fields used to describe a user.')

View File

@ -9,14 +9,14 @@
import sys, json, time, requests
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
et_url = 'https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt'
et_str = 'Emerging Threats '
def init_misp():
global mymisp
mymisp = PyMISP(misp_url, misp_key)
mymisp = PyMISP(misp_url, misp_key, misp_verifycert)
def load_misp_event(eid):
global et_attr

View File

@ -17,9 +17,9 @@ class CowrieMISPObject(AbstractMISPObjectGenerator):
self.generate_attributes()
def generate_attributes(self):
skip_list = ['time', 'duration', 'isError', 'ttylog']
valid_object_attributes = self._definition['attributes'].keys()
for object_relation, value in self._dico_val.items():
if object_relation in skip_list or 'log_' in object_relation:
if object_relation not in valid_object_attributes:
continue
if object_relation == 'timestamp':
@ -29,4 +29,7 @@ class CowrieMISPObject(AbstractMISPObjectGenerator):
if isinstance(value, dict):
self.add_attribute(object_relation, **value)
else:
# uniformize value, sometimes empty array
if isinstance(value, list) and len(value) == 0:
value = ''
self.add_attribute(object_relation, value=value)

View File

@ -27,7 +27,8 @@ class RedisToMISPFeed:
SUFFIX_SIGH = '_sighting'
SUFFIX_ATTR = '_attribute'
SUFFIX_OBJ = '_object'
SUFFIX_LIST = [SUFFIX_SIGH, SUFFIX_ATTR, SUFFIX_OBJ]
SUFFIX_NO = ''
SUFFIX_LIST = [SUFFIX_SIGH, SUFFIX_ATTR, SUFFIX_OBJ, SUFFIX_NO]
def __init__(self):
self.host = settings.host
@ -100,8 +101,33 @@ class RedisToMISPFeed:
self.update_last_action("Error while adding object")
else:
# Suffix not valid
self.update_last_action("Redis key suffix not supported")
# Suffix not provided, try to add anyway
if settings.fallback_MISP_type == 'attribute':
new_key = key + self.SUFFIX_ATTR
# Add atribute type from the config
if 'type' not in data and settings.fallback_attribute_type:
data['type'] = settings.fallback_attribute_type
else:
new_key = None
elif settings.fallback_MISP_type == 'object':
new_key = key + self.SUFFIX_OBJ
# Add object template name from the config
if 'name' not in data and settings.fallback_object_template_name:
data['name'] = settings.fallback_object_template_name
else:
new_key = None
elif settings.fallback_MISP_type == 'sighting':
new_key = key + self.SUFFIX_SIGH
else:
new_key = None
if new_key is None:
self.update_last_action("Redis key suffix not supported and automatic not configured")
else:
self.perform_action(new_key, data)
# OTHERS
def update_last_action(self, action):

View File

@ -4,10 +4,15 @@ host='127.0.0.1'
port=6379
db=0
## The keynames to POP element from
#keyname_pop='misp_feed_generator_key'
keyname_pop=['cowrie']
# OTHERS
## If key prefix not provided, data will be added as either object, attribute or sighting
fallback_MISP_type = 'object'
### How to handle the fallback
fallback_object_template_name = 'cowrie' # MISP-Object only
fallback_attribute_category = 'comment' # MISP-Attribute only
## How frequent the event should be written on disk
flushing_interval=5*60
## The redis list keyname in which to put items that generated an error

View File

@ -5,9 +5,26 @@ This python script can be used to generate a MISP feed based on an existing MISP
# Installation
````
git clone https://github.com/CIRCL/PyMISP
git clone https://github.com/MISP/PyMISP.git
cd examples/feed-generator
cp settings-default.py settings.py
vi settings.py #adjust your settings
python3 generate.py
````
# Output
The generated feed will be stored in your `outputdir`.
It contains the files:
- `manifest.json` - containing the feed manifest (generic event information)
- `hashes.csv` - listing the hashes of the attribute values
- `*.json` - a large amount of `json` files
# Importing in MISP
To import this feed into your MISP instance:
- Sync Actions > List Feeds > Add feed
- Fill in the form while ensuring the 'source format' is set to 'MISP Feed'
For more information about feeds please read: https://misp.gitbooks.io/misp-book/content/managing-feeds/

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
@ -79,15 +79,17 @@ valid_attribute_distributions = []
attributeHashes = []
def init():
# If we have an old settings.py file then this variable won't exist
global valid_attribute_distributions
try:
valid_attribute_distributions = valid_attribute_distribution_levels
except:
except Exception:
valid_attribute_distributions = ['0', '1', '2', '3', '4', '5']
return PyMISP(url, key, ssl)
def recursiveExtract(container, containerType, leaf, eventUuid):
temp = {}
if containerType in ['Attribute', 'Object']:
@ -118,8 +120,8 @@ def recursiveExtract(container, containerType, leaf, eventUuid):
temp[childType].append(processed)
return temp
def saveEvent(misp, uuid):
result = {}
event = misp.get_event(uuid)
if not event.get('Event'):
print('Error while fetching event: {}'.format(event['message']))
@ -130,11 +132,13 @@ def saveEvent(misp, uuid):
eventFile.write(event)
eventFile.close()
def __blockByDistribution(element):
if element['distribution'] not in valid_attribute_distributions:
return True
return False
def saveHashes():
if not attributeHashes:
return False
@ -148,7 +152,6 @@ def saveHashes():
sys.exit('Could not create the quick hash lookup file.')
def saveManifest(manifest):
try:
manifestFile = open(os.path.join(outputdir, 'manifest.json'), 'w')

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
from pymisp import PyMISP
@ -12,7 +12,7 @@ except NameError:
pass
def init(url, key):
return PyMISP(url, key, False, 'json', debug=False)
return PyMISP(url, key, misp_verifycert, 'json', debug=False)
if __name__ == '__main__':

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
from io import open
@ -15,7 +15,7 @@ if __name__ == '__main__':
args = parser.parse_args()
pymisp = PyMISP(misp_url, misp_key)
pymisp = PyMISP(misp_url, misp_key, misp_verifycert)
with open(args.input, 'r') as f:
result = pymisp.freetext(args.event, f.read())

View File

@ -7,7 +7,7 @@
import sys
import datetime
from pymisp import PyMISP, MISPAttribute
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
cefconfig = {"Default_Severity":1, "Device_Vendor":"MISP", "Device_Product":"MISP", "Device_Version":1}
@ -45,7 +45,7 @@ def make_cef(event):
def init_misp():
global mymisp
mymisp = PyMISP(misp_url, misp_key)
mymisp = PyMISP(misp_url, misp_key, misp_verifycert)
def echeck(r):

View File

@ -6,12 +6,12 @@
import sys
from pymisp import PyMISP, MISPAttribute
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
def init_misp():
global mymisp
mymisp = PyMISP(misp_url, misp_key)
mymisp = PyMISP(misp_url, misp_key, misp_verifycert)
def echeck(r):

View File

@ -0,0 +1,42 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key, misp_verifycert
import argparse
import os
import json
def init(url, key):
return PyMISP(url, key, misp_verifycert, 'json')
def search_sighting(m, context, out=None, **kwargs):
result = m.sighting_search(context, **kwargs)
if out is None:
print(json.dumps(result['response']))
else:
with open(out, 'w') as f:
f.write(json.dumps(result['response']))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get all the events matching a value.')
parser.add_argument("-c", "--context", default="", help="Context in which to search. Could be empty, attribute or event")
parser.add_argument("-i", "--id", type=int, help="If context is set, the ID in which the search should be done")
parser.add_argument("-o", "--output", help="Output file")
args = parser.parse_args()
if args.output is not None and os.path.exists(args.output):
print('Output file already exists, abord.')
exit(0)
misp = init(misp_url, misp_key)
kwargs = {}
if len(args.context) > 0:
kwargs['id'] = args.id
search_sighting(misp, args.context, args.output, **kwargs)

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get a list of the sharing groups from the MISP instance.')

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add sighting.')

View File

@ -2,12 +2,12 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
def init(url, key):
return PyMISP(url, key, True)
return PyMISP(url, key, misp_verifycert)
def fetch(m, all_events, event):

View File

@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
import json
def init(url, key):
return PyMISP(url, key, True, 'json', True)
return PyMISP(url, key, misp_verifycert, 'json', True)
def get_tags(m):

View File

@ -2,13 +2,13 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
from io import open
def init(url, key):
return PyMISP(url, key, True, 'json', debug=True)
return PyMISP(url, key, misp_verifycert, 'json', debug=True)
def up_event(m, event, content):
with open(content, 'r') as f:

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from pymisp import PyMISP
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
import argparse
# For python2 & 3 compat, a bit dirty, but it seems to be the least bad one
@ -13,7 +13,7 @@ except NameError:
def init(url, key):
return PyMISP(url, key, True, 'json')
return PyMISP(url, key, misp_verifycert, 'json')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Get a list of the sharing groups from the MISP instance.')

View File

@ -4,7 +4,7 @@
from pymisp import PyMISP
from pymisp.tools import load_warninglists
import argparse
from keys import misp_url, misp_key
from keys import misp_url, misp_key, misp_verifycert
if __name__ == '__main__':
@ -18,5 +18,5 @@ if __name__ == '__main__':
if args.package:
print(load_warninglists.from_package())
elif args.remote:
pm = PyMISP(misp_url, misp_key)
pm = PyMISP(misp_url, misp_key, misp_verifycert)
print(load_warninglists.from_instance(pm))

View File

@ -1,4 +1,4 @@
__version__ = '2.4.95'
__version__ = '2.4.96'
import logging
import functools
import warnings

View File

@ -401,9 +401,15 @@ class PyMISP(object):
response = self._prepare_request('POST', url)
return self._check_response(response)
def direct_call(self, url, data):
'''Very lightweight call that posts a data blob (python dictionary) on the URL'''
response = self._prepare_request('POST', url, data)
def direct_call(self, url, data=None):
'''Very lightweight call that posts a data blob (python dictionary or json string) on the URL'''
url = urljoin(self.root_url, url)
if not data:
response = self._prepare_request('GET', url)
else:
if isinstance(data, dict):
data = json.dumps(data)
response = self._prepare_request('POST', url, data)
return self._check_response(response)
# ##############################################
@ -1015,8 +1021,8 @@ class PyMISP(object):
"""Helper to prepare a search query"""
if query.get('error') is not None:
return query
if controller not in ['events', 'attributes', 'objects']:
raise ValueError('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes', 'objects'])))
if controller not in ['events', 'attributes', 'objects', 'sightings']:
raise ValueError('Invalid controller. Can only be {}'.format(', '.join(['events', 'attributes', 'objects', 'sightings'])))
url = urljoin(self.root_url, '{}/{}'.format(controller, path.lstrip('/')))
if ASYNC_OK and async_callback:
@ -1428,7 +1434,7 @@ class PyMISP(object):
:value: Value of the attribute the sighting is related too. Pushing this object
will update the sighting count of each attriutes with thifs value on the instance
:uuid: UUID of the attribute to update
:id: ID of the attriute to update
:id: ID of the attribute to update
:source: Source of the sighting
:type: Type of the sighting
:timestamp: Timestamp associated to the sighting
@ -1467,6 +1473,53 @@ class PyMISP(object):
response = self._prepare_request('POST', url)
return self._check_response(response)
def search_sightings(self, context='', async_callback=None, **kwargs):
"""Search sightings via the REST API
:context: The context of the search, could be attribute, event or False
:param context_id: ID of the attribute or event if context is specified
:param type_sighting: Type of the sighting
:param date_from: From date
:param date_to: To date
:param publish_timestamp: Last published sighting (e.g. 5m, 3h, 7d)
:param org_id: The org_id
:param source: The source of the sighting
:param include_attribute: Should the result include attribute data
:param include_event: Should the result include event data
:param async_callback: The function to run when results are returned
:Example:
>>> misp.search_sightings({'publish_timestamp': '30d'}) # search sightings for the last 30 days on the instance
[ ... ]
>>> misp.search_sightings('attribute', {'id': 6, 'include_attribute': 1}) # return list of sighting for attribute 6 along with the attribute itself
[ ... ]
>>> misp.search_sightings('event', {'id': 17, 'include_event': 1, 'org_id': 2}) # return list of sighting for event 17 filtered with org id 2
"""
if context not in ['', 'attribute', 'event']:
raise Exception('Context parameter must be empty, "attribute" or "event"')
query = {}
# Sighting: array('id', 'type', 'from', 'to', 'last', 'org_id', 'includeAttribute', 'includeEvent');
query['returnFormat'] = kwargs.pop('returnFormat', 'json')
query['id'] = kwargs.pop('context_id', None)
query['type'] = kwargs.pop('type_sighting', None)
query['from'] = kwargs.pop('date_from', None)
query['to'] = kwargs.pop('date_to', None)
query['last'] = kwargs.pop('publish_timestamp', None)
query['org_id'] = kwargs.pop('org_id', None)
query['source'] = kwargs.pop('source', None)
query['includeAttribute'] = kwargs.pop('include_attribute', None)
query['includeEvent'] = kwargs.pop('include_event', None)
# Cleanup
query = {k: v for k, v in query.items() if v is not None}
if kwargs:
raise SearchError('Unused parameter: {}'.format(', '.join(kwargs.keys())))
# Create a session, make it async if and only if we have a callback
controller = 'sightings'
return self.__query('restSearch/' + context, query, controller, async_callback)
# ############## Sharing Groups ##################
def get_sharing_groups(self):

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet, PyMISPUnexpectedResponse
from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute
from .exceptions import MISPServerError, NewEventError, UpdateEventError, UpdateAttributeError, PyMISPNotImplementedYet
from .api import PyMISP, everything_broken, MISPEvent, MISPAttribute, MISPSighting
from typing import TypeVar, Optional, Tuple, List, Dict
from datetime import date, datetime
import json
@ -116,8 +116,94 @@ class ExpandedPyMISP(PyMISP):
a.from_dict(**updated_attribute)
return a
# TODO: Make that thing async & test it.
def search_sightings(self, context: Optional[str]=None,
context_id: Optional[SearchType]=None,
type_sighting: Optional[str]=None,
date_from: Optional[DateTypes]=None,
date_to: Optional[DateTypes]=None,
publish_timestamp: Optional[DateInterval]=None, last: Optional[DateInterval]=None,
org: Optional[SearchType]=None,
source: Optional[str]=None,
include_attribute: Optional[bool]=None,
include_event_meta: Optional[bool]=None,
pythonify: Optional[bool]=False
):
'''Search sightings
:param context: The context of the search. Can be either "attribute", "event", or nothing (will then match on events and attributes).
:param context_id: Only relevant if context is either "attribute" or "event". Then it is the relevant ID.
:param type_sighting: Type of sighting
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param publish_timestamp: Restrict the results by the last publish timestamp (newer than).
:param org: Search by the creator organisation by supplying the organisation identifier.
:param source: Source of the sighting
:param include_attribute: Include the attribute.
:param include_event_meta: Include the meta information of the event.
Deprecated:
:param last: synonym for publish_timestamp
:Example:
>>> misp.search_sightings(publish_timestamp='30d') # search sightings for the last 30 days on the instance
[ ... ]
>>> misp.search_sightings(context='attribute', context_id=6, include_attribute=True) # return list of sighting for attribute 6 along with the attribute itself
[ ... ]
>>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2
'''
query = {'returnFormat': 'json'}
if context is not None:
if context not in ['attribute', 'event']:
raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event'])))
url_path = f'sightings/restSearch/{context}'
else:
url_path = 'sightings/restSearch'
query['id'] = context_id
query['type'] = type_sighting
query['from'] = date_from
query['to'] = date_to
query['last'] = publish_timestamp
query['org_id'] = org
query['source'] = source
query['includeAttribute'] = include_attribute
query['includeEvent'] = include_event_meta
url = urljoin(self.root_url, url_path)
# Remove None values.
# TODO: put that in self._prepare_request
query = {k: v for k, v in query.items() if v is not None}
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
elif pythonify:
to_return = []
for s in normalized_response:
entries = {}
s_data = s['Sighting']
if include_event_meta:
e = s_data.pop('Event')
me = MISPEvent()
me.from_dict(**e)
entries['event'] = me
if include_attribute:
a = s_data.pop('Attribute')
ma = MISPAttribute()
ma.from_dict(**a)
entries['attribute'] = ma
ms = MISPSighting()
ms.from_dict(**s_data)
entries['sighting'] = ms
to_return.append(entries)
return to_return
else:
return normalized_response
def search(self, controller: str='events', return_format: str='json',
limit: Optional[int]=None, page: Optional[int]=None,
value: Optional[SearchParameterTypes]=None,
type_attribute: Optional[SearchParameterTypes]=None,
category: Optional[SearchParameterTypes]=None,
@ -141,18 +227,22 @@ class ExpandedPyMISP(PyMISP):
sg_reference_only: Optional[bool]=None,
eventinfo: Optional[str]=None,
searchall: Optional[bool]=None,
requested_attributes: Optional[str]=None,
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
headerless: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Search in the MISP instance
'''Search in the MISP instance
:param returnFormat: Set the return format of the search (Currently supported: json, xml, openioc, suricata, snort - more formats are being moved to restSearch with the goal being that all searches happen through this API). Can be passed as the first parameter after restSearch or via the JSON payload.
:param limit: Limit the number of results returned, depending on the scope (for example 10 attributes or 10 full events).
:param page: If a limit is set, sets the page to be returned. page 3, limit 100 will return records 201->300).
:param value: Search for the given value in the attributes' value field.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param category: The attribute category, any valid MISP attribute category is accepted.
:param org: Search by the creator organisation by supplying the organisation identifier.
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param quickfilter: If set it makes the search ignore all of the other arguments, except for the auth key and value. MISP will return all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value fields, or in the attribute comment.
:param quickfilter: Enabling this (by passing "1" as the argument) will make the search ignore all of the other arguments, except for the auth key and value. MISP will return an xml / json (depending on the header sent) of all events that have a sub-string match on value in the event info, event orgc, or any of the attribute value1 / value2 fields, or in the attribute comment.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param eventid: The events that should be included / excluded from the search
@ -170,6 +260,9 @@ class ExpandedPyMISP(PyMISP):
:param sg_reference_only: If this flag is set, sharing group objects will not be included, instead only the sharing group ID is set.
:param eventinfo: Filter on the event's info field.
:param searchall: Search for a full or a substring (delimited by % for substrings) in the event info, event tags, attribute tags, attribute values or attribute comment fields.
:param requested_attributes: [CSV only] Select the fields that you wish to include in the CSV export. By setting event level fields additionally, includeContext is not required to get event metadata.
:param include_context: [CSV Only] Include the event data with each attribute.
:param headerless: [CSV Only] The CSV created when this setting is set to true will not contain the header row.
:param pythonify: Returns a list of PyMISP Objects the the plain json output. Warning: it might use a lot of RAM
Deprecated:
@ -178,10 +271,13 @@ class ExpandedPyMISP(PyMISP):
:param last: synonym for publish_timestamp
:param enforceWarninglist: synonym for enforce_warninglist
:param includeEventUuid: synonym for include_event_uuid
:param includeContext: synonym for include_context
'''
if controller not in ['events', 'attributes', 'objects']:
return_formats = ['openioc', 'json', 'xml', 'suricata', 'snort', 'text', 'rpz', 'csv', 'cache']
if controller not in ['events', 'attributes', 'objects', 'sightings']:
raise ValueError('controller has to be in {}'.format(', '.join(['events', 'attributes', 'objects'])))
# Deprecated stuff / synonyms
@ -193,38 +289,31 @@ class ExpandedPyMISP(PyMISP):
enforce_warninglist = enforceWarninglist
if includeEventUuid is not None:
include_event_uuid = includeEventUuid
if includeContext is not None:
include_context = includeContext
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
if return_format is not None:
if return_format not in ['json', 'xml', 'openioc', 'suricata', 'snort']:
raise ValueError('return_format has to be in {}'.format(', '.join(['json', 'xml', 'openioc', 'suricata', 'snort'])))
query['returnFormat'] = return_format
if value is not None:
query['value'] = value
if type_attribute is not None:
query['type'] = type_attribute
if category is not None:
query['category'] = category
if org is not None:
query['org'] = org
if tags is not None:
query['tags'] = tags
if quickfilter is not None:
query['quickfilter'] = quickfilter
if date_from is not None:
query['from'] = self.make_timestamp(date_from)
if date_to is not None:
query['to'] = self.make_timestamp(date_to)
if eventid is not None:
query['eventid'] = eventid
if with_attachments is not None:
query['withAttachments'] = with_attachments
if metadata is not None:
query['metadata'] = metadata
if uuid is not None:
query['uuid'] = uuid
if return_format not in return_formats:
raise ValueError('return_format has to be in {}'.format(', '.join(return_formats)))
query['returnFormat'] = return_format
query['page'] = page
query['limit'] = limit
query['value'] = value
query['type'] = type_attribute
query['category'] = category
query['org'] = org
query['tags'] = tags
query['quickfilter'] = quickfilter
query['from'] = self.make_timestamp(date_from)
query['to'] = self.make_timestamp(date_to)
query['eventid'] = eventid
query['withAttachments'] = with_attachments
query['metadata'] = metadata
query['uuid'] = uuid
if publish_timestamp is not None:
if isinstance(publish_timestamp, (list, tuple)):
query['publish_timestamp'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
@ -235,35 +324,36 @@ class ExpandedPyMISP(PyMISP):
query['timestamp'] = (self.make_timestamp(timestamp[0]), self.make_timestamp(timestamp[1]))
else:
query['timestamp'] = self.make_timestamp(timestamp)
if published is not None:
query['published'] = published
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
query['published'] = published
query['enforceWarninglist'] = enforce_warninglist
if to_ids is not None:
if str(to_ids) not in ['0', '1', 'exclude']:
raise ValueError('to_ids has to be in {}'.format(', '.join(['0', '1', 'exclude'])))
query['to_ids'] = to_ids
if deleted is not None:
query['deleted'] = deleted
if include_event_uuid is not None:
query['includeEventUuid'] = include_event_uuid
query['deleted'] = deleted
query['includeEventUuid'] = include_event_uuid
if event_timestamp is not None:
if isinstance(event_timestamp, (list, tuple)):
query['event_timestamp'] = (self.make_timestamp(event_timestamp[0]), self.make_timestamp(event_timestamp[1]))
else:
query['event_timestamp'] = self.make_timestamp(event_timestamp)
if sg_reference_only is not None:
query['sgReferenceOnly'] = sg_reference_only
if eventinfo is not None:
query['eventinfo'] = eventinfo
if searchall is not None:
query['searchall'] = searchall
query['sgReferenceOnly'] = sg_reference_only
query['eventinfo'] = eventinfo
query['searchall'] = searchall
query['requested_attributes'] = requested_attributes
query['includeContext'] = include_context
query['headerless'] = headerless
url = urljoin(self.root_url, f'{controller}/restSearch')
# Remove None values.
# TODO: put that in self._prepare_request
query = {k: v for k, v in query.items() if v is not None}
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
if return_format == 'csv' and pythonify and not headerless:
return self._csv_to_dict(normalized_response)
elif isinstance(normalized_response, str) or (isinstance(normalized_response, dict) and
normalized_response.get('errors')):
return normalized_response
elif return_format == 'json' and pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
@ -284,90 +374,12 @@ class ExpandedPyMISP(PyMISP):
else:
return normalized_response
def get_csv(self,
eventid: Optional[SearchType]=None,
ignore: Optional[bool]=None,
tags: Optional[SearchParameterTypes]=None,
category: Optional[SearchParameterTypes]=None,
type_attribute: Optional[SearchParameterTypes]=None,
include_context: Optional[bool]=None, includeContext: Optional[bool]=None,
date_from: Optional[DateTypes]=None, date_to: Optional[DateTypes]=None,
publish_timestamp: Optional[DateInterval]=None, # converted internally to last (consistent with search)
headerless: Optional[bool]=None,
enforce_warninglist: Optional[bool]=None, enforceWarninglist: Optional[bool]=None,
pythonify: Optional[bool]=False,
**kwargs):
'''
Get MISP data in CSV format.
:param eventid: Restrict the download to a single event
:param ignore: If true, the response includes attributes without the to_ids flag
:param tags: Tags to search or to exclude. You can pass a list, or the output of `build_complex_query`
:param category: The attribute category, any valid MISP attribute category is accepted.
:param type_attribute: The attribute type, any valid MISP attribute type is accepted.
:param include_context: Include the event data with each attribute.
:param date_from: Events with the date set to a date after the one specified. This filter will use the date of the event.
:param date_to: Events with the date set to a date before the one specified. This filter will use the date of the event.
:param publish_timestamp: Events published within the last x amount of time. This filter will use the published timestamp of the event.
:param headerless: The CSV created when this setting is set to true will not contain the header row.
:param enforceWarninglist: All attributes that have a hit on a warninglist will be excluded.
:param pythonify: Returns a list of dictionaries instead of the plain CSV
'''
# Deprecated stuff / synonyms
if includeContext is not None:
include_context = includeContext
if enforceWarninglist is not None:
enforce_warninglist = enforceWarninglist
# Add all the parameters in kwargs are aimed at modules, or other 3rd party components, and cannot be sanitized.
# They are passed as-is.
query = kwargs
if eventid is not None:
query['eventid'] = eventid
if ignore is not None:
query['ignore'] = ignore
if tags is not None:
query['tags'] = tags
if category is not None:
query['category'] = category
if type_attribute is not None:
query['type'] = type_attribute
if include_context is not None:
query['includeContext'] = include_context
if date_from is not None:
query['from'] = self.make_timestamp(date_from)
if date_to is not None:
query['to'] = self.make_timestamp(date_to)
if publish_timestamp is not None:
if isinstance(publish_timestamp, (list, tuple)):
query['last'] = (self.make_timestamp(publish_timestamp[0]), self.make_timestamp(publish_timestamp[1]))
else:
query['last'] = self.make_timestamp(publish_timestamp)
if headerless is not None:
query['headerless'] = headerless
if enforce_warninglist is not None:
query['enforceWarninglist'] = enforce_warninglist
url = urljoin(self.root_url, '/events/csv/download/')
response = self._prepare_request('POST', url, data=json.dumps(query))
normalized_response = self._check_response(response)
if isinstance(normalized_response, str):
if pythonify and not headerless:
# Make it a list of dict
fieldnames, lines = normalized_response.split('\n', 1)
fieldnames = fieldnames.split(',')
to_return = []
for line in csv.reader(lines.split('\n')):
if line:
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
return to_return
return normalized_response
elif isinstance(normalized_response, dict):
# The server returned a dictionary, it contains the error message.
logger.critical(f'The server should have returned a CSV file as text. instead it returned an error message:\n{normalized_response}')
return normalized_response
else:
# Should not happen...
raise PyMISPUnexpectedResponse(f'The server should have returned a CSV file as text. instead it returned:\n{normalized_response}')
def _csv_to_dict(self, csv_content):
'''Makes a list of dict out of a csv file (requires headers)'''
fieldnames, lines = csv_content.split('\n', 1)
fieldnames = fieldnames.split(',')
to_return = []
for line in csv.reader(lines.split('\n')):
if line:
to_return.append({fname: value for fname, value in zip(fieldnames, line)})
return to_return

@ -1 +1 @@
Subproject commit 38071f4bd9e3de1138a096cbbf66089f5105d798
Subproject commit 6e03108fb104ae90617701aa5d0749cb932c821f

View File

@ -31,7 +31,7 @@
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "13",
"template_version": "15",
"uuid": "a"
},
{
@ -51,7 +51,7 @@
"name": "url",
"sharing_group_id": "0",
"template_uuid": "60efb77b-40b5-4c46-871b-ed1ed999fce5",
"template_version": "6",
"template_version": "7",
"uuid": "b"
}
]

View File

@ -23,7 +23,7 @@
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "13",
"template_version": "15",
"uuid": "a"
},
{
@ -48,7 +48,7 @@
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "13",
"template_version": "15",
"uuid": "b"
}
]

View File

@ -39,7 +39,7 @@
"meta-category": "file",
"description": "File object describing a file with meta-information",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "7",
"template_version": "6",
"event_id": "6719",
"uuid": "5a4e4ffe-4cb8-48b1-bd5c-48fb950d210f",
"timestamp": "1515081726",

View File

@ -112,7 +112,7 @@
"name": "file",
"sharing_group_id": "0",
"template_uuid": "688c46fb-5edb-40a3-8273-1af7923e2215",
"template_version": "7",
"template_version": "8",
"timestamp": "1514975928",
"uuid": "5a4cb2b8-7958-4323-852c-4d2a950d210f"
}

View File

@ -516,8 +516,10 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(events[0].id, first.id)
# quickfilter
events = self.user_misp_connector.search(timestamp=timeframe, quickfilter='%bar%', pythonify=True)
events = self.user_misp_connector.search(timestamp=timeframe,
quickfilter='%bar%', pythonify=True)
# FIXME: should return one event
# print(events)
# self.assertEqual(len(events), 1)
# self.assertEqual(events[0].id, second.id)
@ -561,19 +563,17 @@ class TestComprehensive(unittest.TestCase):
self.assertEqual(attributes[0].event_uuid, second.uuid)
# event_timestamp
time.sleep(1)
second.add_attribute('ip-src', '8.8.8.9')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(event_timestamp=second.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(events), 1)
# searchall
# FIXME: searchall doesn't seem to do anything
# second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment')
# second = self.user_misp_connector.update_event(second)
# events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
# events = self.user_misp_connector.search(value='stuff', searchall=True, pythonify=True)
# self.assertEqual(len(events), 1)
second.add_attribute('text', 'This is a test for the full text search', comment='Test stuff comment')
second = self.user_misp_connector.update_event(second)
events = self.user_misp_connector.search(value='%for the full text%', searchall=True, pythonify=True)
self.assertEqual(len(events), 1)
# warninglist
self.admin_misp_connector.update_warninglists()
@ -586,22 +586,29 @@ class TestComprehensive(unittest.TestCase):
events = self.user_misp_connector.search(eventid=second.id, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 4)
self.assertEqual(len(events[0].attributes), 5)
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=False, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 4)
self.assertEqual(len(events[0].attributes), 5)
if not travis_run:
# FIXME: This is fialing on travis for no discernable reason...
# FIXME: This is failing on travis for no discernable reason...
events = self.user_misp_connector.search(eventid=second.id, enforce_warninglist=True, pythonify=True)
self.assertEqual(len(events), 1)
self.assertEqual(events[0].id, second.id)
self.assertEqual(len(events[0].attributes), 2)
self.assertEqual(len(events[0].attributes), 3)
response = self.admin_misp_connector.toggle_warninglist(warninglist_name='%dns resolv%') # disable ipv4 DNS.
self.assertDictEqual(response, {'saved': True, 'success': '3 warninglist(s) toggled'})
# Page / limit
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=1, limit=3, pythonify=True)
self.assertEqual(len(attributes), 3)
attributes = self.user_misp_connector.search(controller='attributes', eventid=second.id, page=2, limit=3, pythonify=True)
self.assertEqual(len(attributes), 2)
time.sleep(1) # make sure the next attribute is added one at least one second later
# attachments
@ -637,73 +644,131 @@ class TestComprehensive(unittest.TestCase):
# Delete event
self.admin_misp_connector.delete_event(first.id)
def test_get_csv(self):
def test_sightings(self):
first = self.create_simple_event()
second = self.create_simple_event()
try:
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
current_ts = int(time.time())
self.user_misp_connector.sighting(value=first.attributes[0].value)
self.user_misp_connector.sighting(value=second.attributes[0].value,
source='Testcases',
type='1')
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts, include_attribute=True,
include_event_meta=True, pythonify=True)
self.assertEqual(len(s), 2)
self.assertEqual(s[0]['event'].id, first.id)
self.assertEqual(s[0]['attribute'].id, first.attributes[0].id)
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts,
source='Testcases',
include_attribute=True,
include_event_meta=True,
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['event'].id, second.id)
self.assertEqual(s[0]['attribute'].id, second.attributes[0].id)
s = self.user_misp_connector.search_sightings(publish_timestamp=current_ts,
type_sighting='1',
include_attribute=True,
include_event_meta=True,
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['event'].id, second.id)
self.assertEqual(s[0]['attribute'].id, second.attributes[0].id)
s = self.user_misp_connector.search_sightings(context='event',
context_id=first.id,
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['sighting'].event_id, str(first.id))
s = self.user_misp_connector.search_sightings(context='attribute',
context_id=second.attributes[0].id,
pythonify=True)
self.assertEqual(len(s), 1)
self.assertEqual(s[0]['sighting'].attribute_id, str(second.attributes[0].id))
finally:
# Delete event
self.admin_misp_connector.delete_event(first.id)
self.admin_misp_connector.delete_event(second.id)
def test_search_csv(self):
first = self.create_simple_event()
first.attributes[0].comment = 'This is the original comment'
second = self.create_simple_event()
second.info = 'foo blah'
second.set_date('2018-09-01')
second.add_attribute('ip-src', '8.8.8.8')
try:
first.attributes[0].comment = 'This is the original comment'
first = self.user_misp_connector.add_event(first)
second = self.user_misp_connector.add_event(second)
response = self.user_misp_connector.fast_publish(first.id, alert=False)
self.assertEqual(response['errors'][0][1]['message'], 'You do not have permission to use this functionality.')
# default search, all attributes with to_ids == False
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
# FIXME: Should not return anything (to_ids is False)
# self.assertEqual(len(csv), 0)
# Also export attributes with to_ids set to false
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, ignore=True, pythonify=True)
self.assertEqual(len(csv), 1)
# Default search, attribute with to_ids == True
first.attributes[0].to_ids = True
first = self.user_misp_connector.update_event(first)
self.admin_misp_connector.fast_publish(first.id, alert=False)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp() - 5, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# eventid
csv = self.user_misp_connector.get_csv(eventid=first.id, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', eventid=first.id, pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
# category
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Other', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), category='Person', pythonify=True)
self.assertEqual(len(csv), 0)
# type_attribute
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='text', pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), type_attribute='ip-src', pythonify=True)
self.assertEqual(len(csv), 0)
# context
csv = self.user_misp_connector.get_csv(publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', publish_timestamp=first.timestamp.timestamp(), include_context=True, pythonify=True)
self.assertEqual(len(csv), 1)
# print(csv[0])
# FIXME: there is no context.
self.assertTrue('event_info' in csv[0])
# date_from date_to
second = self.user_misp_connector.add_event(second)
csv = self.user_misp_connector.get_csv(date_from=date.today().isoformat(), pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from=date.today().isoformat(), pythonify=True)
self.assertEqual(len(csv), 1)
self.assertEqual(csv[0]['value'], first.attributes[0].value)
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', pythonify=True)
self.assertEqual(len(csv), 2)
# headerless
csv = self.user_misp_connector.get_csv(date_from='2018-09-01', date_to='2018-09-02', headerless=True)
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', headerless=True)
# FIXME: The header is here.
# print(csv)
# Expects 2 lines after removing the empty ones.
# self.assertEqual(len(csv.strip().split('\n')), 2)
# include_context
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', include_context=True, pythonify=True)
event_context_keys = ['event_info', 'event_member_org', 'event_source_org', 'event_distribution', 'event_threat_level_id', 'event_analysis', 'event_date', 'event_tag', 'event_timestamp']
for k in event_context_keys:
self.assertTrue(k in csv[0])
# requested_attributes
columns = ['value', 'event_id']
csv = self.user_misp_connector.search(return_format='csv', date_from='2018-09-01', date_to='2018-09-02', requested_attributes=columns, pythonify=True)
self.assertEqual(len(csv[0].keys()), 2)
for k in columns:
self.assertTrue(k in csv[0])
finally:
# Delete event