diff --git a/examples/up.py b/examples/up.py index cdca33e..4b3c429 100755 --- a/examples/up.py +++ b/examples/up.py @@ -5,6 +5,7 @@ from pymisp import PyMISP from keys import misp_url, misp_key import argparse +from io import open # Usage for pipe masters: ./last.py -l 5h | jq . diff --git a/pymisp/api.py b/pymisp/api.py index e34a6de..e319f0e 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -10,13 +10,15 @@ import os import base64 import re import warnings +import functools + try: from urllib.parse import urljoin except ImportError: from urlparse import urljoin warnings.warn("You're using python 2, it is strongly recommended to use python >=3.4") -from io import BytesIO +from io import BytesIO, open import zipfile try: @@ -40,28 +42,21 @@ except NameError: unicode = str -class distributions(object): - """Enumeration of the available distributions.""" - your_organization = 0 - this_community = 1 - connected_communities = 2 - all_communities = 3 - sharing_group = 4 +def deprecated(func): + '''This is a decorator which can be used to mark functions + as deprecated. It will result in a warning being emitted + when the function is used.''' - -class threat_level(object): - """Enumeration of the available threat levels.""" - high = 1 - medium = 2 - low = 3 - undefined = 4 - - -class analysis(object): - """Enumeration of the available analysis statuses.""" - initial = 0 - ongoing = 1 - completed = 2 + @functools.wraps(func) + def new_func(*args, **kwargs): + warnings.warn_explicit( + "Call to deprecated function {}.".format(func.__name__), + category=DeprecationWarning, + filename=func.func_code.co_filename, + lineno=func.func_code.co_firstlineno + 1 + ) + return func(*args, **kwargs) + return new_func class PyMISP(object): @@ -79,11 +74,6 @@ class PyMISP(object): :param cert: Client certificate, as described there: http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification """ - # So it can may be accessed from the misp object. - distributions = distributions - threat_level = threat_level - analysis = analysis - def __init__(self, url, key, ssl=True, out_type='json', debug=False, proxies=None, cert=None): if not url: raise NoURL('Please provide the URL of your MISP instance.') @@ -155,6 +145,10 @@ class PyMISP(object): 'User-Agent': 'PyMISP {} - Python {}.{}.{}'.format(__version__, *sys.version_info)}) return session + # ##################### + # ### Core helpers #### + # ##################### + def flatten_error_messages(self, response): messages = [] if response.get('error'): @@ -221,6 +215,44 @@ class PyMISP(object): print(json.dumps(to_return, indent=4)) return to_return + def _one_or_more(self, value): + """Returns a list/tuple of one or more items, regardless of input.""" + return value if isinstance(value, (tuple, list)) else (value,) + + def _make_mispevent(self, event): + if not isinstance(event, MISPEvent): + e = MISPEvent(self.describe_types) + e.load(event) + else: + e = event + return e + + def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None): + misp_event = MISPEvent(self.describe_types) + misp_event.set_all_values(info=info, distribution=distribution, threat_level_id=threat_level_id, + analysis=analysis, date=date, orgc_id=orgc_id, org_id=org_id, sharing_group_id=sharing_group_id) + if published: + misp_event.publish() + return misp_event + + def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5, **kwargs): + misp_attribute = MISPAttribute(self.describe_types) + misp_attribute.set_all_values(type=type_value, value=value, category=category, + to_ids=to_ids, comment=comment, distribution=distribution, **kwargs) + return misp_attribute + + def _valid_uuid(self, uuid): + """Test if uuid is valid + Will test against CakeText's RFC 4122, i.e + "the third group must start with a 4, + and the fourth group must start with 8, 9, a or b." + + :param uuid: an uuid + """ + regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I) + match = regex.match(uuid) + return bool(match) + # ################################################ # ############### Simple REST API ################ # ################################################ @@ -249,20 +281,6 @@ class PyMISP(object): response = session.get(url) return self._check_response(response) - def get_stix_event(self, event_id=None, with_attachments=False, from_date=False, to_date=False, tags=False): - """Get an event/events in STIX format""" - if tags: - if isinstance(tags, list): - tags = "&&".join(tags) - - session = self.__prepare_session() - url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format( - event_id, with_attachments, tags, from_date, to_date)) - if self.debug: - print("Getting STIX event from {}".format(url)) - response = session.get(url) - return self._check_response(response) - def add_event(self, event): """Add a new event @@ -307,43 +325,12 @@ class PyMISP(object): return self._check_response(response) # ############################################## - # ######### Event handling (Json only) ######### + # ############### Event handling ############### # ############################################## - def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None): - misp_event = MISPEvent(self.describe_types) - misp_event.set_all_values(info=info, distribution=distribution, threat_level_id=threat_level_id, - analysis=analysis, date=date, orgc_id=orgc_id, org_id=org_id, sharing_group_id=sharing_group_id) - if published: - misp_event.publish() - return misp_event - - def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=5, **kwargs): - misp_attribute = MISPAttribute(self.describe_types) - misp_attribute.set_all_values(type=type_value, value=value, category=category, - to_ids=to_ids, comment=comment, distribution=distribution, **kwargs) - return misp_attribute - - def _one_or_more(self, value): - """Returns a list/tuple of one or more items, regardless of input.""" - return value if isinstance(value, (tuple, list)) else (value,) - - # ########## Helpers ########## - - def _make_mispevent(self, event): - if not isinstance(event, MISPEvent): - e = MISPEvent(self.describe_types) - e.load(event) - else: - e = event - return e - def get(self, eid): return self.get_event(eid) - def get_stix(self, **kwargs): - return self.get_stix_event(**kwargs) - def update(self, event): e = self._make_mispevent(event) if e.uuid: @@ -386,50 +373,22 @@ class PyMISP(object): response = session.post(urljoin(self.root_url, path)) return self._check_response(response) - def add_tag(self, event, tag, attribute=False): - # FIXME: this is dirty, this function needs to be deprecated with something tagging a UUID - session = self.__prepare_session() - if attribute: - to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} - path = 'attributes/addTag' - else: - # Allow for backwards-compat with old style - if "Event" in event: - event = event["Event"] - to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}} - path = 'events/addTag' - response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) - return self._check_response(response) - - def remove_tag(self, event, tag, attribute=False): - # FIXME: this is dirty, this function needs to be deprecated with something removing the tag to a UUID - session = self.__prepare_session() - if attribute: - to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} - path = 'attributes/addTag' - else: - to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}} - path = 'events/addTag' - response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) - return self._check_response(response) - - def _valid_uuid(self, uuid): - """Test if uuid is valid - Will test against CakeText's RFC 4122, i.e - "the third group must start with a 4, - and the fourth group must start with 8, 9, a or b." - - :param uuid: an uuid - """ - regex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}\Z', re.I) - match = regex.match(uuid) - return bool(match) - # ##### File attributes ##### def _send_attributes(self, event, attributes, proposal=False): - if proposal: - response = self.proposal_add(event['Event']['id'], attributes) + # FIXME: unable to send a proposal if we have a full event. + if isinstance(event, MISPEvent): + event.attributes += attributes + response = self.update(event) + elif isinstance(event, int) or (isinstance(event, str) and (event.isdigit() or self._valid_uuid(event))): + # No full event, just an ID + session = self.__prepare_session() + url = urljoin(self.root_url, 'attributes/add/{}'.format(event)) + for a in attributes: + if proposal: + response = self.proposal_add(event, json.dumps(a, cls=EncodeUpdate)) + else: + response = session.post(url, data=json.dumps(a, cls=EncodeUpdate)) else: e = MISPEvent(self.describe_types) e.load(event) @@ -471,32 +430,41 @@ class PyMISP(object): def add_filename(self, event, filename, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False): return self.add_named_attribute(event, 'filename', filename, category, to_ids, comment, distribution, proposal) - def add_attachment(self, event, filename, attachment=None, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False): + def add_attachment(self, event, attachment, category='Artifacts dropped', to_ids=False, comment=None, distribution=None, proposal=False): """Add an attachment to the MISP event :param event: The event to add an attachment to - :param filename: The name you want to store the file under :param attachment: Either a file handle or a path to a file - will be uploaded """ - - if hasattr(attachment, "read"): - # It's a file handle - we can read it + if isinstance(attachment, basestring) and os.path.isfile(attachment): + # We have a file to open + filename = os.path.basename(attachment) + with open(attachment, "rb") as f: + fileData = f.read() + elif hasattr(attachment, "read"): + # It's a file handle - we can read it but it has no filename fileData = attachment.read() - - elif isinstance(attachment, basestring): - # It can either be the b64 encoded data or a file path - if os.path.exists(attachment): - # It's a path! - with open(attachment, "r") as f: - fileData = f.read() + filename = 'attachment' + elif isinstance(attachment, (tuple, list)): + # tuple/list (filename, pseudofile) + filename = attachment[0] + if hasattr(attachment[1], "read"): + # Pseudo file + fileData = attachment[1].read() else: - # We have to assume it's the actual data - fileData = attachment + fileData = attachment[1] + else: + # Plain file content, no filename + filename = 'attachment' + fileData = attachment + + if not isinstance(fileData, bytes): + fileData = fileData.encode() # by now we have a string for the file # we just need to b64 encode it and send it on its way # also, just decode it to utf-8 to avoid the b'string' format - encodedData = base64.b64encode(fileData.encode("utf-8")).decode("utf-8") + encodedData = base64.b64encode(fileData).decode("utf-8") # Send it on its way return self.add_named_attribute(event, 'attachment', filename, category, to_ids, comment, distribution, proposal, data=encodedData) @@ -679,7 +647,7 @@ class PyMISP(object): def _encode_file_to_upload(self, path): with open(path, 'rb') as f: - return str(base64.b64encode(f.read())) + return base64.b64encode(f.read()).decode() def upload_sample(self, filename, filepath, event_id, distribution=None, to_ids=True, category=None, comment=None, info=None, @@ -985,10 +953,10 @@ class PyMISP(object): zipped = BytesIO(decoded) try: archive = zipfile.ZipFile(zipped) - try: + if f.get('md5'): # New format unzipped = BytesIO(archive.open(f['md5'], pwd=b'infected').read()) - except KeyError: + else: # Old format unzipped = BytesIO(archive.open(f['filename'], pwd=b'infected').read()) details.append([f['event_id'], f['filename'], unzipped]) @@ -1005,25 +973,6 @@ class PyMISP(object): """ return self.search(last=last) - # ############## Suricata ############### - - def download_all_suricata(self): - """Download all suricata rules events.""" - suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download') - session = self.__prepare_session('rules') - response = session.get(suricata_rules) - return response - - def download_suricata_rule_event(self, event_id): - """Download one suricata rule event. - - :param event_id: ID of the event to download (same as get) - """ - template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id)) - session = self.__prepare_session('rules') - response = session.get(template) - return response - # ########## Tags ########## def get_all_tags(self, quiet=False): @@ -1084,15 +1033,6 @@ class PyMISP(object): else: return {'error': 'Impossible to retrieve the version of the master branch.'} - # ############## Export Attributes in text #################################### - - def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False): - """Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise.""" - session = self.__prepare_session('txt') - url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished)) - response = session.get(url) - return response - # ############## Statistics ################## def get_attributes_statistics(self, context='type', percentage=None): @@ -1145,7 +1085,7 @@ class PyMISP(object): return self._check_response(response) def sighting_per_json(self, json_file): - with open(json_file) as f: + with open(json_file, 'r') as f: jdata = json.load(f) return self.set_sightings(jdata) @@ -1220,7 +1160,7 @@ class PyMISP(object): def add_user_json(self, json_file): session = self.__prepare_session() - with open(json_file) as f: + with open(json_file, 'r') as f: jdata = json.load(f) url = urljoin(self.root_url, 'admin/users/add/') response = session.post(url, data=json.dumps(jdata)) @@ -1241,7 +1181,7 @@ class PyMISP(object): def edit_user_json(self, json_file, user_id): session = self.__prepare_session() - with open(json_file) as f: + with open(json_file, 'r') as f: jdata = json.load(f) url = urljoin(self.root_url, 'admin/users/edit/{}'.format(user_id)) response = session.post(url, data=json.dumps(jdata)) @@ -1298,7 +1238,7 @@ class PyMISP(object): def add_organisation_json(self, json_file): session = self.__prepare_session() - with open(json_file) as f: + with open(json_file, 'r') as f: jdata = json.load(f) url = urljoin(self.root_url, 'admin/organisations/add/') response = session.post(url, data=json.dumps(jdata)) @@ -1319,7 +1259,7 @@ class PyMISP(object): def edit_organisation_json(self, json_file, org_id): session = self.__prepare_session() - with open(json_file) as f: + with open(json_file, 'r') as f: jdata = json.load(f) url = urljoin(self.root_url, 'admin/organisations/edit/{}'.format(org_id)) response = session.post(url, data=json.dumps(jdata)) @@ -1393,7 +1333,8 @@ class PyMISP(object): def add_server_json(self, json_file): session = self.__prepare_session() - jdata = json.load(open(json_file)) + with open(json_file, 'r') as f: + jdata = json.load(f) url = urljoin(self.root_url, 'servers/add') response = session.post(url, data=json.dumps(jdata)) return self._check_response(response) @@ -1411,7 +1352,92 @@ class PyMISP(object): def edit_server_json(self, json_file, server_id): session = self.__prepare_session() - jdata = json.load(open(json_file)) + with open(json_file, 'r') as f: + jdata = json.load(f) url = urljoin(self.root_url, 'servers/edit/{}'.format(server_id)) response = session.post(url, data=json.dumps(jdata)) return self._check_response(response) + + # ############################################## + # ############### Non-JSON output ############## + # ############################################## + + # ############## Suricata ############## + + def download_all_suricata(self): + """Download all suricata rules events.""" + suricata_rules = urljoin(self.root_url, 'events/nids/suricata/download') + session = self.__prepare_session('rules') + response = session.get(suricata_rules) + return response + + def download_suricata_rule_event(self, event_id): + """Download one suricata rule event. + + :param event_id: ID of the event to download (same as get) + """ + template = urljoin(self.root_url, 'events/nids/suricata/download/{}'.format(event_id)) + session = self.__prepare_session('rules') + response = session.get(template) + return response + + # ############## Text ############### + + def get_all_attributes_txt(self, type_attr, tags=False, eventId=False, allowNonIDS=False, date_from=False, date_to=False, last=False, enforceWarninglist=False, allowNotPublished=False): + """Get all attributes from a specific type as plain text. Only published and IDS flagged attributes are exported, except if stated otherwise.""" + session = self.__prepare_session('txt') + url = urljoin(self.root_url, 'attributes/text/download/%s/%s/%s/%s/%s/%s/%s/%s/%s' % (type_attr, tags, eventId, allowNonIDS, date_from, date_to, last, enforceWarninglist, allowNotPublished)) + response = session.get(url) + return response + + # ############## STIX ############## + + def get_stix_event(self, event_id=None, with_attachments=False, from_date=False, to_date=False, tags=False): + """Get an event/events in STIX format""" + if tags: + if isinstance(tags, list): + tags = "&&".join(tags) + + session = self.__prepare_session() + url = urljoin(self.root_url, "/events/stix/download/{}/{}/{}/{}/{}".format( + event_id, with_attachments, tags, from_date, to_date)) + if self.debug: + print("Getting STIX event from {}".format(url)) + response = session.get(url) + return self._check_response(response) + + def get_stix(self, **kwargs): + return self.get_stix_event(**kwargs) + + # ########################### + # ####### Deprecated ######## + # ########################### + + @deprecated + def add_tag(self, event, tag, attribute=False): + # FIXME: this is dirty, this function needs to be deprecated with something tagging a UUID + session = self.__prepare_session() + if attribute: + to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} + path = 'attributes/addTag' + else: + # Allow for backwards-compat with old style + if "Event" in event: + event = event["Event"] + to_post = {'request': {'Event': {'id': event['id'], 'tag': tag}}} + path = 'events/addTag' + response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) + return self._check_response(response) + + @deprecated + def remove_tag(self, event, tag, attribute=False): + # FIXME: this is dirty, this function needs to be deprecated with something removing the tag to a UUID + session = self.__prepare_session() + if attribute: + to_post = {'request': {'Attribute': {'id': event['id'], 'tag': tag}}} + path = 'attributes/addTag' + else: + to_post = {'request': {'Event': {'id': event['Event']['id'], 'tag': tag}}} + path = 'events/addTag' + response = session.post(urljoin(self.root_url, path), data=json.dumps(to_post)) + return self._check_response(response)