mirror of https://github.com/MISP/PyMISP
Reorganisation, make add attribute more flexible
parent
c348fcc7dc
commit
1da447abf2
|
@ -5,6 +5,7 @@ from pymisp import PyMISP
|
||||||
from keys import misp_url, misp_key
|
from keys import misp_url, misp_key
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
|
from io import open
|
||||||
|
|
||||||
# Usage for pipe masters: ./last.py -l 5h | jq .
|
# Usage for pipe masters: ./last.py -l 5h | jq .
|
||||||
|
|
||||||
|
|
360
pymisp/api.py
360
pymisp/api.py
|
@ -10,13 +10,15 @@ import os
|
||||||
import base64
|
import base64
|
||||||
import re
|
import re
|
||||||
import warnings
|
import warnings
|
||||||
|
import functools
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from urllib.parse import urljoin
|
from urllib.parse import urljoin
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from urlparse import urljoin
|
from urlparse import urljoin
|
||||||
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
|
warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4")
|
||||||
from io import BytesIO
|
from io import BytesIO, open
|
||||||
import zipfile
|
import zipfile
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -40,28 +42,21 @@ except NameError:
|
||||||
unicode = str
|
unicode = str
|
||||||
|
|
||||||
|
|
||||||
class distributions(object):
|
def deprecated(func):
|
||||||
"""Enumeration of the available distributions."""
|
'''This is a decorator which can be used to mark functions
|
||||||
your_organization = 0
|
as deprecated. It will result in a warning being emitted
|
||||||
this_community = 1
|
when the function is used.'''
|
||||||
connected_communities = 2
|
|
||||||
all_communities = 3
|
|
||||||
sharing_group = 4
|
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
class threat_level(object):
|
def new_func(*args, **kwargs):
|
||||||
"""Enumeration of the available threat levels."""
|
warnings.warn_explicit(
|
||||||
high = 1
|
"Call to deprecated function {}.".format(func.__name__),
|
||||||
medium = 2
|
category=DeprecationWarning,
|
||||||
low = 3
|
filename=func.func_code.co_filename,
|
||||||
undefined = 4
|
lineno=func.func_code.co_firstlineno + 1
|
||||||
|
)
|
||||||
|
return func(*args, **kwargs)
|
||||||
class analysis(object):
|
return new_func
|
||||||
"""Enumeration of the available analysis statuses."""
|
|
||||||
initial = 0
|
|
||||||
ongoing = 1
|
|
||||||
completed = 2
|
|
||||||
|
|
||||||
|
|
||||||
class PyMISP(object):
|
class PyMISP(object):
|
||||||
|
@ -79,11 +74,6 @@ class PyMISP(object):
|
||||||
:param cert: Client certificate, as described there: http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
|
:param cert: Client certificate, as described there: http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# So it can may be accessed from the misp object.
|
|
||||||
distributions = distributions
|
|
||||||
threat_level = threat_level
|
|
||||||
analysis = analysis
|
|
||||||
|
|
||||||
def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None, cert=None):
|
def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None, cert=None):
|
||||||
if not url:
|
if not url:
|
||||||
raise NoURL('Please provide the URL of your MISP instance.')
|
raise NoURL('Please provide the URL of your MISP instance.')
|
||||||
|
@ -155,6 +145,10 @@ class PyMISP(object):
|
||||||
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
|
'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)})
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
# #####################
|
||||||
|
# ### Core helpers ####
|
||||||
|
# #####################
|
||||||
|
|
||||||
def flatten_error_messages(self, response):
|
def flatten_error_messages(self, response):
|
||||||
messages = []
|
messages = []
|
||||||
if response.get('error'):
|
if response.get('error'):
|
||||||
|
@ -221,6 +215,44 @@ class PyMISP(object):
|
||||||
print(json.dumps(to_return, indent=4))
|
print(json.dumps(to_return, indent=4))
|
||||||
return to_return
|
return to_return
|
||||||
|
|
||||||
|
def _one_or_more(self, value):
|
||||||
|
"""Returns a list/tuple of one or more items, regardless of input."""
|
||||||
|
return value if isinstance(value, (tuple, list)) else (value,)
|
||||||
|
|
||||||
|
def _make_mispevent(self, event):
|
||||||
|
if not isinstance(event, MISPEvent):
|
||||||
|
e = MISPEvent(self.describe_types)
|
||||||
|
e.load(event)
|
||||||
|
else:
|
||||||
|
e = event
|
||||||
|
return e
|
||||||
|
|
||||||
|
def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
|
||||||
|
misp_event = MISPEvent(self.describe_types)
|
||||||
|
misp_event.set_all_values(info=info, distribution=distribution, threat_level_id=threat_level_id,
|
||||||
|
analysis=analysis, date=date, orgc_id=orgc_id, org_id=org_id, sharing_group_id=sharing_group_id)
|
||||||
|
if published:
|
||||||
|
misp_event.publish()
|
||||||
|
return misp_event
|
||||||
|
|
||||||
|
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5, **kwargs):
|
||||||
|
misp_attribute = MISPAttribute(self.describe_types)
|
||||||
|
misp_attribute.set_all_values(type=type_value, value=value, category=category,
|
||||||
|
to_ids=to_ids, comment=comment, distribution=distribution, **kwargs)
|
||||||
|
return misp_attribute
|
||||||
|
|
||||||
|
def _valid_uuid(self, uuid):
|
||||||
|
"""Test if uuid is valid
|
||||||
|
Will test against CakeText's RFC 4122, i.e
|
||||||
|
"the third group must start with a 4,
|
||||||
|
and the fourth group must start with 8, 9, a or b."
|
||||||
|
|
||||||
|
:param uuid: an uuid
|
||||||
|
"""
|
||||||
|
regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I)
|
||||||
|
match = regex.match(uuid)
|
||||||
|
return bool(match)
|
||||||
|
|
||||||
# ################################################
|
# ################################################
|
||||||
# ############### Simple REST API ################
|
# ############### Simple REST API ################
|
||||||
# ################################################
|
# ################################################
|
||||||
|
@ -249,20 +281,6 @@ class PyMISP(object):
|
||||||
response = session.get(url)
|
response = session.get(url)
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def get_stix_event(self, event_id=None, with_attachments=False, from_date=False, to_date=False, tags=False):
|
|
||||||
"""Get an event/events in STIX format"""
|
|
||||||
if tags:
|
|
||||||
if isinstance(tags, list):
|
|
||||||
tags = "&&".join(tags)
|
|
||||||
|
|
||||||
session = self.__prepare_session()
|
|
||||||
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
|
|
||||||
event_id, with_attachments, tags, from_date, to_date))
|
|
||||||
if self.debug:
|
|
||||||
print("Getting STIX event from {}".format(url))
|
|
||||||
response = session.get(url)
|
|
||||||
return self._check_response(response)
|
|
||||||
|
|
||||||
def add_event(self, event):
|
def add_event(self, event):
|
||||||
"""Add a new event
|
"""Add a new event
|
||||||
|
|
||||||
|
@ -307,43 +325,12 @@ class PyMISP(object):
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
# ##############################################
|
# ##############################################
|
||||||
# ######### Event handling (Json only) #########
|
# ############### Event handling ###############
|
||||||
# ##############################################
|
# ##############################################
|
||||||
|
|
||||||
def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
|
|
||||||
misp_event = MISPEvent(self.describe_types)
|
|
||||||
misp_event.set_all_values(info=info, distribution=distribution, threat_level_id=threat_level_id,
|
|
||||||
analysis=analysis, date=date, orgc_id=orgc_id, org_id=org_id, sharing_group_id=sharing_group_id)
|
|
||||||
if published:
|
|
||||||
misp_event.publish()
|
|
||||||
return misp_event
|
|
||||||
|
|
||||||
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5, **kwargs):
|
|
||||||
misp_attribute = MISPAttribute(self.describe_types)
|
|
||||||
misp_attribute.set_all_values(type=type_value, value=value, category=category,
|
|
||||||
to_ids=to_ids, comment=comment, distribution=distribution, **kwargs)
|
|
||||||
return misp_attribute
|
|
||||||
|
|
||||||
def _one_or_more(self, value):
|
|
||||||
"""Returns a list/tuple of one or more items, regardless of input."""
|
|
||||||
return value if isinstance(value, (tuple, list)) else (value,)
|
|
||||||
|
|
||||||
# ########## Helpers ##########
|
|
||||||
|
|
||||||
def _make_mispevent(self, event):
|
|
||||||
if not isinstance(event, MISPEvent):
|
|
||||||
e = MISPEvent(self.describe_types)
|
|
||||||
e.load(event)
|
|
||||||
else:
|
|
||||||
e = event
|
|
||||||
return e
|
|
||||||
|
|
||||||
def get(self, eid):
|
def get(self, eid):
|
||||||
return self.get_event(eid)
|
return self.get_event(eid)
|
||||||
|
|
||||||
def get_stix(self, **kwargs):
|
|
||||||
return self.get_stix_event(**kwargs)
|
|
||||||
|
|
||||||
def update(self, event):
|
def update(self, event):
|
||||||
e = self._make_mispevent(event)
|
e = self._make_mispevent(event)
|
||||||
if e.uuid:
|
if e.uuid:
|
||||||
|
@ -386,50 +373,22 @@ class PyMISP(object):
|
||||||
response = session.post(urljoin(self.root_url, path))
|
response = session.post(urljoin(self.root_url, path))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def add_tag(self, event, tag, attribute=False):
|
|
||||||
# FIXME: this is dirty, this function needs to be deprecated with something tagging a UUID
|
|
||||||
session = self.__prepare_session()
|
|
||||||
if attribute:
|
|
||||||
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
|
||||||
path = 'attributes/addTag'
|
|
||||||
else:
|
|
||||||
# Allow for backwards-compat with old style
|
|
||||||
if "Event" in event:
|
|
||||||
event = event["Event"]
|
|
||||||
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
|
||||||
path = 'events/addTag'
|
|
||||||
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
||||||
return self._check_response(response)
|
|
||||||
|
|
||||||
def remove_tag(self, event, tag, attribute=False):
|
|
||||||
# FIXME: this is dirty, this function needs to be deprecated with something removing the tag to a UUID
|
|
||||||
session = self.__prepare_session()
|
|
||||||
if attribute:
|
|
||||||
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
|
||||||
path = 'attributes/addTag'
|
|
||||||
else:
|
|
||||||
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
|
||||||
path = 'events/addTag'
|
|
||||||
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
|
||||||
return self._check_response(response)
|
|
||||||
|
|
||||||
def _valid_uuid(self, uuid):
|
|
||||||
"""Test if uuid is valid
|
|
||||||
Will test against CakeText's RFC 4122, i.e
|
|
||||||
"the third group must start with a 4,
|
|
||||||
and the fourth group must start with 8, 9, a or b."
|
|
||||||
|
|
||||||
:param uuid: an uuid
|
|
||||||
"""
|
|
||||||
regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I)
|
|
||||||
match = regex.match(uuid)
|
|
||||||
return bool(match)
|
|
||||||
|
|
||||||
# ##### File attributes #####
|
# ##### File attributes #####
|
||||||
|
|
||||||
def _send_attributes(self, event, attributes, proposal=False):
|
def _send_attributes(self, event, attributes, proposal=False):
|
||||||
if proposal:
|
# FIXME: unable to send a proposal if we have a full event.
|
||||||
response = self.proposal_add(event['Event']['id'], attributes)
|
if isinstance(event, MISPEvent):
|
||||||
|
event.attributes += attributes
|
||||||
|
response = self.update(event)
|
||||||
|
elif isinstance(event, int) or (isinstance(event, str) and (event.isdigit() or self._valid_uuid(event))):
|
||||||
|
# No full event, just an ID
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, 'attributes/add/{}'.format(event))
|
||||||
|
for a in attributes:
|
||||||
|
if proposal:
|
||||||
|
response = self.proposal_add(event, json.dumps(a, cls=EncodeUpdate))
|
||||||
|
else:
|
||||||
|
response = session.post(url, data=json.dumps(a, cls=EncodeUpdate))
|
||||||
else:
|
else:
|
||||||
e = MISPEvent(self.describe_types)
|
e = MISPEvent(self.describe_types)
|
||||||
e.load(event)
|
e.load(event)
|
||||||
|
@ -471,32 +430,41 @@ class PyMISP(object):
|
||||||
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
return self.add_named_attribute(event, 'filename', filename, category, to_ids, comment, distribution, proposal)
|
return self.add_named_attribute(event, 'filename', filename, category, to_ids, comment, distribution, proposal)
|
||||||
|
|
||||||
def add_attachment(self, event, filename, attachment=None, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
def add_attachment(self, event, attachment, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False):
|
||||||
"""Add an attachment to the MISP event
|
"""Add an attachment to the MISP event
|
||||||
|
|
||||||
:param event: The event to add an attachment to
|
:param event: The event to add an attachment to
|
||||||
:param filename: The name you want to store the file under
|
|
||||||
:param attachment: Either a file handle or a path to a file - will be uploaded
|
:param attachment: Either a file handle or a path to a file - will be uploaded
|
||||||
"""
|
"""
|
||||||
|
if isinstance(attachment, basestring) and os.path.isfile(attachment):
|
||||||
if hasattr(attachment, "read"):
|
# We have a file to open
|
||||||
# It's a file handle - we can read it
|
filename = os.path.basename(attachment)
|
||||||
|
with open(attachment, "rb") as f:
|
||||||
|
fileData = f.read()
|
||||||
|
elif hasattr(attachment, "read"):
|
||||||
|
# It's a file handle - we can read it but it has no filename
|
||||||
fileData = attachment.read()
|
fileData = attachment.read()
|
||||||
|
filename = 'attachment'
|
||||||
elif isinstance(attachment, basestring):
|
elif isinstance(attachment, (tuple, list)):
|
||||||
# It can either be the b64 encoded data or a file path
|
# tuple/list (filename, pseudofile)
|
||||||
if os.path.exists(attachment):
|
filename = attachment[0]
|
||||||
# It's a path!
|
if hasattr(attachment[1], "read"):
|
||||||
with open(attachment, "r") as f:
|
# Pseudo file
|
||||||
fileData = f.read()
|
fileData = attachment[1].read()
|
||||||
else:
|
else:
|
||||||
# We have to assume it's the actual data
|
fileData = attachment[1]
|
||||||
fileData = attachment
|
else:
|
||||||
|
# Plain file content, no filename
|
||||||
|
filename = 'attachment'
|
||||||
|
fileData = attachment
|
||||||
|
|
||||||
|
if not isinstance(fileData, bytes):
|
||||||
|
fileData = fileData.encode()
|
||||||
|
|
||||||
# by now we have a string for the file
|
# by now we have a string for the file
|
||||||
# we just need to b64 encode it and send it on its way
|
# we just need to b64 encode it and send it on its way
|
||||||
# also, just decode it to utf-8 to avoid the b'string' format
|
# also, just decode it to utf-8 to avoid the b'string' format
|
||||||
encodedData = base64.b64encode(fileData.encode("utf-8")).decode("utf-8")
|
encodedData = base64.b64encode(fileData).decode("utf-8")
|
||||||
|
|
||||||
# Send it on its way
|
# Send it on its way
|
||||||
return self.add_named_attribute(event, 'attachment', filename, category, to_ids, comment, distribution, proposal, data=encodedData)
|
return self.add_named_attribute(event, 'attachment', filename, category, to_ids, comment, distribution, proposal, data=encodedData)
|
||||||
|
@ -679,7 +647,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def _encode_file_to_upload(self, path):
|
def _encode_file_to_upload(self, path):
|
||||||
with open(path, 'rb') as f:
|
with open(path, 'rb') as f:
|
||||||
return str(base64.b64encode(f.read()))
|
return base64.b64encode(f.read()).decode()
|
||||||
|
|
||||||
def upload_sample(self, filename, filepath, event_id, distribution=None,
|
def upload_sample(self, filename, filepath, event_id, distribution=None,
|
||||||
to_ids=True, category=None, comment=None, info=None,
|
to_ids=True, category=None, comment=None, info=None,
|
||||||
|
@ -985,10 +953,10 @@ class PyMISP(object):
|
||||||
zipped = BytesIO(decoded)
|
zipped = BytesIO(decoded)
|
||||||
try:
|
try:
|
||||||
archive = zipfile.ZipFile(zipped)
|
archive = zipfile.ZipFile(zipped)
|
||||||
try:
|
if f.get('md5'):
|
||||||
# New format
|
# New format
|
||||||
unzipped = BytesIO(archive.open(f['md5'], pwd=b'infected').read())
|
unzipped = BytesIO(archive.open(f['md5'], pwd=b'infected').read())
|
||||||
except KeyError:
|
else:
|
||||||
# Old format
|
# Old format
|
||||||
unzipped = BytesIO(archive.open(f['filename'], pwd=b'infected').read())
|
unzipped = BytesIO(archive.open(f['filename'], pwd=b'infected').read())
|
||||||
details.append([f['event_id'], f['filename'], unzipped])
|
details.append([f['event_id'], f['filename'], unzipped])
|
||||||
|
@ -1005,25 +973,6 @@ class PyMISP(object):
|
||||||
"""
|
"""
|
||||||
return self.search(last=last)
|
return self.search(last=last)
|
||||||
|
|
||||||
# ############## Suricata ###############
|
|
||||||
|
|
||||||
def download_all_suricata(self):
|
|
||||||
"""Download all suricata rules events."""
|
|
||||||
suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download')
|
|
||||||
session = self.__prepare_session('rules')
|
|
||||||
response = session.get(suricata_rules)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def download_suricata_rule_event(self, event_id):
|
|
||||||
"""Download one suricata rule event.
|
|
||||||
|
|
||||||
:param event_id: ID of the event to download (same as get)
|
|
||||||
"""
|
|
||||||
template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
|
||||||
session = self.__prepare_session('rules')
|
|
||||||
response = session.get(template)
|
|
||||||
return response
|
|
||||||
|
|
||||||
# ########## Tags ##########
|
# ########## Tags ##########
|
||||||
|
|
||||||
def get_all_tags(self, quiet=False):
|
def get_all_tags(self, quiet=False):
|
||||||
|
@ -1084,15 +1033,6 @@ class PyMISP(object):
|
||||||
else:
|
else:
|
||||||
return {'error': 'Impossible to retrieve the version of the master branch.'}
|
return {'error': 'Impossible to retrieve the version of the master branch.'}
|
||||||
|
|
||||||
# ############## Export Attributes in text ####################################
|
|
||||||
|
|
||||||
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
|
||||||
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
|
||||||
session = self.__prepare_session('txt')
|
|
||||||
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
|
||||||
response = session.get(url)
|
|
||||||
return response
|
|
||||||
|
|
||||||
# ############## Statistics ##################
|
# ############## Statistics ##################
|
||||||
|
|
||||||
def get_attributes_statistics(self, context='type', percentage=None):
|
def get_attributes_statistics(self, context='type', percentage=None):
|
||||||
|
@ -1145,7 +1085,7 @@ class PyMISP(object):
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
def sighting_per_json(self, json_file):
|
def sighting_per_json(self, json_file):
|
||||||
with open(json_file) as f:
|
with open(json_file, 'r') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
return self.set_sightings(jdata)
|
return self.set_sightings(jdata)
|
||||||
|
|
||||||
|
@ -1220,7 +1160,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def add_user_json(self, json_file):
|
def add_user_json(self, json_file):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
with open(json_file) as f:
|
with open(json_file, 'r') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/users/add/')
|
url = urljoin(self.root_url, 'admin/users/add/')
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
@ -1241,7 +1181,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def edit_user_json(self, json_file, user_id):
|
def edit_user_json(self, json_file, user_id):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
with open(json_file) as f:
|
with open(json_file, 'r') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id))
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
@ -1298,7 +1238,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def add_organisation_json(self, json_file):
|
def add_organisation_json(self, json_file):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
with open(json_file) as f:
|
with open(json_file, 'r') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/organisations/add/')
|
url = urljoin(self.root_url, 'admin/organisations/add/')
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
@ -1319,7 +1259,7 @@ class PyMISP(object):
|
||||||
|
|
||||||
def edit_organisation_json(self, json_file, org_id):
|
def edit_organisation_json(self, json_file, org_id):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
with open(json_file) as f:
|
with open(json_file, 'r') as f:
|
||||||
jdata = json.load(f)
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id))
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
|
@ -1393,7 +1333,8 @@ class PyMISP(object):
|
||||||
|
|
||||||
def add_server_json(self, json_file):
|
def add_server_json(self, json_file):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
jdata = json.load(open(json_file))
|
with open(json_file, 'r') as f:
|
||||||
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'servers/add')
|
url = urljoin(self.root_url, 'servers/add')
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
@ -1411,7 +1352,92 @@ class PyMISP(object):
|
||||||
|
|
||||||
def edit_server_json(self, json_file, server_id):
|
def edit_server_json(self, json_file, server_id):
|
||||||
session = self.__prepare_session()
|
session = self.__prepare_session()
|
||||||
jdata = json.load(open(json_file))
|
with open(json_file, 'r') as f:
|
||||||
|
jdata = json.load(f)
|
||||||
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id))
|
||||||
response = session.post(url, data=json.dumps(jdata))
|
response = session.post(url, data=json.dumps(jdata))
|
||||||
return self._check_response(response)
|
return self._check_response(response)
|
||||||
|
|
||||||
|
# ##############################################
|
||||||
|
# ############### Non-JSON output ##############
|
||||||
|
# ##############################################
|
||||||
|
|
||||||
|
# ############## Suricata ##############
|
||||||
|
|
||||||
|
def download_all_suricata(self):
|
||||||
|
"""Download all suricata rules events."""
|
||||||
|
suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download')
|
||||||
|
session = self.__prepare_session('rules')
|
||||||
|
response = session.get(suricata_rules)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def download_suricata_rule_event(self, event_id):
|
||||||
|
"""Download one suricata rule event.
|
||||||
|
|
||||||
|
:param event_id: ID of the event to download (same as get)
|
||||||
|
"""
|
||||||
|
template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id))
|
||||||
|
session = self.__prepare_session('rules')
|
||||||
|
response = session.get(template)
|
||||||
|
return response
|
||||||
|
|
||||||
|
# ############## Text ###############
|
||||||
|
|
||||||
|
def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False):
|
||||||
|
"""Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise."""
|
||||||
|
session = self.__prepare_session('txt')
|
||||||
|
url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished))
|
||||||
|
response = session.get(url)
|
||||||
|
return response
|
||||||
|
|
||||||
|
# ############## STIX ##############
|
||||||
|
|
||||||
|
def get_stix_event(self, event_id=None, with_attachments=False, from_date=False, to_date=False, tags=False):
|
||||||
|
"""Get an event/events in STIX format"""
|
||||||
|
if tags:
|
||||||
|
if isinstance(tags, list):
|
||||||
|
tags = "&&".join(tags)
|
||||||
|
|
||||||
|
session = self.__prepare_session()
|
||||||
|
url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format(
|
||||||
|
event_id, with_attachments, tags, from_date, to_date))
|
||||||
|
if self.debug:
|
||||||
|
print("Getting STIX event from {}".format(url))
|
||||||
|
response = session.get(url)
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
def get_stix(self, **kwargs):
|
||||||
|
return self.get_stix_event(**kwargs)
|
||||||
|
|
||||||
|
# ###########################
|
||||||
|
# ####### Deprecated ########
|
||||||
|
# ###########################
|
||||||
|
|
||||||
|
@deprecated
|
||||||
|
def add_tag(self, event, tag, attribute=False):
|
||||||
|
# FIXME: this is dirty, this function needs to be deprecated with something tagging a UUID
|
||||||
|
session = self.__prepare_session()
|
||||||
|
if attribute:
|
||||||
|
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
||||||
|
path = 'attributes/addTag'
|
||||||
|
else:
|
||||||
|
# Allow for backwards-compat with old style
|
||||||
|
if "Event" in event:
|
||||||
|
event = event["Event"]
|
||||||
|
to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}}
|
||||||
|
path = 'events/addTag'
|
||||||
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
||||||
|
@deprecated
|
||||||
|
def remove_tag(self, event, tag, attribute=False):
|
||||||
|
# FIXME: this is dirty, this function needs to be deprecated with something removing the tag to a UUID
|
||||||
|
session = self.__prepare_session()
|
||||||
|
if attribute:
|
||||||
|
to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}}
|
||||||
|
path = 'attributes/addTag'
|
||||||
|
else:
|
||||||
|
to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}}
|
||||||
|
path = 'events/addTag'
|
||||||
|
response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post))
|
||||||
|
return self._check_response(response)
|
||||||
|
|
Loading…
Reference in New Issue