diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index e2297fe8..a414fc84 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -19,16 +19,12 @@ jobs: - name: Install packages run: | sudo apt-get install libpoppler-cpp-dev libzbar0 tesseract-ocr - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - - name: Cache Python dependencies - uses: actions/cache@v2 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.python-version }}-${{ hashFiles('REQUIREMENTS') }} + cache: 'pip' - name: Install dependencies run: | python -m pip install --upgrade pip @@ -42,12 +38,17 @@ jobs: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Run server in background + run: | + misp-modules -l 127.0.0.1 -s 2>error.log & + sleep 5 + - name: Check if server is running + run: | + curl -sS localhost:6666/healthcheck - name: Test with pytest run: | - # Run server in background - misp-modules -l 127.0.0.1 -s & - sleep 5 - # Check if modules are running - curl -sS localhost:6666/modules - # Run tests pytest tests + - name: Show error log + if: always() + run: | + cat error.log diff --git a/misp_modules/__init__.py b/misp_modules/__init__.py index b068d8a1..b628d78f 100644 --- a/misp_modules/__init__.py +++ b/misp_modules/__init__.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- # # Core MISP expansion modules loader and web service # @@ -23,7 +22,6 @@ import os import signal import sys import importlib -import json import logging import fnmatch import argparse @@ -31,6 +29,11 @@ import re import datetime import psutil +try: + import orjson as json +except ImportError: + import json + import tornado.web import tornado.process from tornado.ioloop import IOLoop @@ -58,17 +61,13 @@ def handle_signal(sig, frame): IOLoop.instance().add_callback_from_signal(IOLoop.instance().stop) -def init_logger(level=False): +def init_logger(debug=False): formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler = logging.StreamHandler(stream=sys.stdout) + handler = logging.StreamHandler() handler.setFormatter(formatter) - handler.setLevel(logging.INFO) - if level: - handler.setLevel(logging.DEBUG) log.addHandler(handler) - log.setLevel(logging.INFO) - if level: - log.setLevel(logging.DEBUG) + log.propagate = False + log.setLevel(logging.DEBUG if debug else logging.INFO) return log @@ -89,28 +88,28 @@ def load_helpers(helpersdir): selftest = hhandlers[helpername].selftest() if selftest is None: helpers.append(helpername) - log.info('Helpers loaded {} '.format(filename)) + log.info(f'Helpers loaded {filename}') else: - log.info('Helpers failed {} due to {}'.format(filename, selftest)) + log.warning(f'Helpers failed {filename} due to {selftest}') def load_package_helpers(): if not HAS_PACKAGE_HELPERS: - log.info('Unable to load MISP helpers from package.') - sys.exit() + log.error('Unable to load MISP helpers from package.') + sys.exit(1) mhandlers = {} helpers = [] for path, helper in sys.modules.items(): if not path.startswith('misp_modules.helpers.'): continue - helpername = path.replace('misp_modules.helpers.', '') - mhandlers[helpername] = helper - selftest = mhandlers[helpername].selftest() + helper_name = path.replace('misp_modules.helpers.', '') + mhandlers[helper_name] = helper + selftest = mhandlers[helper_name].selftest() if selftest is None: - helpers.append(helpername) - log.info('Helper loaded {}'.format(helpername)) + helpers.append(helper_name) + log.info(f'Helper loaded {helper_name}') else: - log.info('Helpers failed {} due to {}'.format(helpername, selftest)) + log.warning(f'Helpers failed {helper_name} due to {selftest}') return mhandlers, helpers @@ -128,49 +127,54 @@ def load_modules(mod_dir): continue if filename == '__init__.py': continue - modulename = filename.split(".")[0] - moduletype = os.path.split(mod_dir)[1] + module_name = filename.split(".")[0] + module_type = os.path.split(mod_dir)[1] try: - mhandlers[modulename] = importlib.import_module(os.path.basename(root) + '.' + modulename) + mhandlers[module_name] = importlib.import_module(os.path.basename(root) + '.' + module_name) except Exception as e: - log.warning('MISP modules {0} failed due to {1}'.format(modulename, e)) + log.warning(f'MISP modules {module_name} failed due to {e}') continue - modules.append(modulename) - log.info('MISP modules {0} imported'.format(modulename)) - mhandlers['type:' + modulename] = moduletype + modules.append(module_name) + log.info(f'MISP modules {module_name} imported') + mhandlers['type:' + module_name] = module_type return mhandlers, modules def load_package_modules(): if not HAS_PACKAGE_MODULES: - log.info('Unable to load MISP modules from package.') - sys.exit() + log.error('Unable to load MISP modules from package.') + sys.exit(1) mhandlers = {} modules = [] for path, module in sys.modules.items(): r = re.findall(r"misp_modules[.]modules[.](\w+)[.]([^_]\w+)", path) if r and len(r[0]) == 2: - moduletype, modulename = r[0] - mhandlers[modulename] = module - modules.append(modulename) - log.info('MISP modules {0} imported'.format(modulename)) - mhandlers['type:' + modulename] = moduletype + module_type, module_name = r[0] + mhandlers[module_name] = module + modules.append(module_name) + log.info(f'MISP modules {module_name} imported') + mhandlers['type:' + module_name] = module_type return mhandlers, modules +class Healthcheck(tornado.web.RequestHandler): + def get(self): + self.write(b'{"status": true}') + + class ListModules(tornado.web.RequestHandler): global loaded_modules global mhandlers def get(self): ret = [] - for module in loaded_modules: - x = {} - x['name'] = module - x['type'] = mhandlers['type:' + module] - x['mispattributes'] = mhandlers[module].introspection() - x['meta'] = mhandlers[module].version() - ret.append(x) + for module_name in loaded_modules: + ret.append({ + 'name': module_name, + 'type': mhandlers['type:' + module_name], + 'mispattributes': mhandlers[module_name].introspection(), + 'meta': mhandlers[module_name].version() + }) log.debug('MISP ListModules request') self.write(json.dumps(ret)) @@ -183,28 +187,34 @@ class QueryModule(tornado.web.RequestHandler): executor = ThreadPoolExecutor(nb_threads) @run_on_executor - def run_request(self, module, jsonpayload): - log.debug('MISP QueryModule request {0}'.format(jsonpayload)) - response = mhandlers[module].handler(q=jsonpayload) + def run_request(self, module_name, json_payload, dict_payload): + log.debug('MISP QueryModule %s request %s', module_name, json_payload) + module = mhandlers[module_name] + if getattr(module, "dict_handler", None): + # New method that avoids double JSON decoding, new modules should define dict_handler + response = module.dict_handler(request=dict_payload) + else: + response = module.handler(q=json_payload) return json.dumps(response) @tornado.gen.coroutine def post(self): try: - jsonpayload = self.request.body.decode('utf-8') - dict_payload = json.loads(jsonpayload) + json_payload = self.request.body + dict_payload = json.loads(json_payload) if dict_payload.get('timeout'): timeout = datetime.timedelta(seconds=int(dict_payload.get('timeout'))) else: timeout = datetime.timedelta(seconds=300) - response = yield tornado.gen.with_timeout(timeout, self.run_request(dict_payload['module'], jsonpayload)) + future = self.run_request(dict_payload['module'], json_payload, dict_payload) + response = yield tornado.gen.with_timeout(timeout, future) self.write(response) except tornado.gen.TimeoutError: - log.warning('Timeout on {} '.format(dict_payload['module'])) + log.warning('Timeout on {}'.format(dict_payload['module'])) self.write(json.dumps({'error': 'Timeout.'})) except Exception: self.write(json.dumps({'error': 'Something went wrong, look in the server logs for details'})) - log.exception('Something went wrong:') + log.exception('Something went wrong when processing query request') finally: self.finish() @@ -223,20 +233,22 @@ def main(): global loaded_modules signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) - argParser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter) - argParser.add_argument('-t', default=False, action='store_true', help='Test mode') - argParser.add_argument('-s', default=False, action='store_true', help='Run a system install (package installed via pip)') - argParser.add_argument('-d', default=False, action='store_true', help='Enable debugging') - argParser.add_argument('-p', default=6666, help='misp-modules TCP port (default 6666)') - argParser.add_argument('-l', default='localhost', help='misp-modules listen address (default localhost)') - argParser.add_argument('-m', default=[], action='append', help='Register a custom module') - argParser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''') - args = argParser.parse_args() + + arg_parser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter) + arg_parser.add_argument('-t', default=False, action='store_true', help='Test mode') + arg_parser.add_argument('-s', default=False, action='store_true', help='Run a system install (package installed via pip)') + arg_parser.add_argument('-d', default=False, action='store_true', help='Enable debugging') + arg_parser.add_argument('-p', default=6666, help='misp-modules TCP port (default 6666)') + arg_parser.add_argument('-l', default='localhost', help='misp-modules listen address (default localhost)') + arg_parser.add_argument('-m', default=[], action='append', help='Register a custom module') + arg_parser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''') + args = arg_parser.parse_args() + port = args.p listen = args.l if args.devel: - log = init_logger(level=True) - log.info('Launch MISP modules server in developement mode. Enable debug, load a list of modules is -m is used.') + log = init_logger(debug=True) + log.info('Launch MISP modules server in development mode. Enable debug, load a list of modules is -m is used.') if args.m: mhandlers = {} modules = [] @@ -247,11 +259,11 @@ def main(): mhandlers[modulename] = importlib.import_module(module) mhandlers['type:' + modulename] = moduletype modules.append(modulename) - log.info('MISP modules {0} imported'.format(modulename)) + log.info(f'MISP modules {modulename} imported') else: mhandlers, loaded_modules = _launch_from_current_dir() else: - log = init_logger(level=args.d) + log = init_logger(debug=args.d) if args.s: log.info('Launch MISP modules server from package.') load_package_helpers() @@ -263,7 +275,11 @@ def main(): mispmod = importlib.import_module(module) mispmod.register(mhandlers, loaded_modules) - service = [(r'/modules', ListModules), (r'/query', QueryModule)] + service = [ + (r'/modules', ListModules), + (r'/query', QueryModule), + (r'/healthcheck', Healthcheck), + ] application = tornado.web.Application(service) try: @@ -279,14 +295,14 @@ def main(): print("\nmisp-modules is still running as PID: {}\n".format(pid)) print("Please kill accordingly:") print("sudo kill {}".format(pid)) - sys.exit(-1) + return 1 print(e) print("misp-modules might still be running.") - log.info('MISP modules server started on {0} port {1}'.format(listen, port)) + log.info(f'MISP modules server started on {listen} port {port}') if args.t: log.info('MISP modules started in test-mode, quitting immediately.') - sys.exit() + return 0 try: IOLoop.instance().start() finally: diff --git a/misp_modules/modules/expansion/apiosintds.py b/misp_modules/modules/expansion/apiosintds.py index 0eb82081..4dddf0d7 100644 --- a/misp_modules/modules/expansion/apiosintds.py +++ b/misp_modules/modules/expansion/apiosintds.py @@ -127,10 +127,11 @@ def handler(q=False): try: response = apiosintDS.request(entities=tosubmit, stix=submit_stix, cache=submitcache, cachedirectory=submitcache_directory, cachetimeout=submitcache_timeout, verbose=True, localdirectory=sumbit_localdirectory) r["results"] += apiosintParserHover(persistent, response, import_related, submit_stix) - except ValueError as e: - log.debug(str(e)) - misperrors['error'] = str(e) - return r + return r + except Exception as e: + log.exception("Could not process apiosintDS") + return {'error': str(e)} + def apiosintParserHover(ispersistent, response, import_related, stix): apiosinttype = ['hash', 'ip', 'url', 'domain'] diff --git a/misp_modules/modules/expansion/circl_passivedns.py b/misp_modules/modules/expansion/circl_passivedns.py index 5f98314b..eca78c8b 100755 --- a/misp_modules/modules/expansion/circl_passivedns.py +++ b/misp_modules/modules/expansion/circl_passivedns.py @@ -1,4 +1,3 @@ -import json import pypdns from . import check_input_attribute, standard_error_message from pymisp import MISPAttribute, MISPEvent, MISPObject @@ -10,7 +9,7 @@ moduleinfo = {'version': '0.2', 'author': 'Alexandre Dulaunoy', moduleconfig = ['username', 'password'] -class PassiveDNSParser(): +class PassiveDNSParser: def __init__(self, attribute, authentication): self.misp_event = MISPEvent() self.attribute = MISPAttribute() @@ -21,7 +20,7 @@ class PassiveDNSParser(): def get_results(self): if hasattr(self, 'result'): return self.result - event = json.loads(self.misp_event.to_json()) + event = self.misp_event.to_dict() results = {key: event[key] for key in ('Attribute', 'Object')} return {'results': results} @@ -50,10 +49,7 @@ class PassiveDNSParser(): self.misp_event.add_object(**pdns_object) -def handler(q=False): - if q is False: - return False - request = json.loads(q) +def dict_handler(request: dict): if not request.get('config'): return {'error': 'CIRCL Passive DNS authentication is missing.'} if not request['config'].get('username') or not request['config'].get('password'): diff --git a/misp_modules/modules/expansion/clamav.py b/misp_modules/modules/expansion/clamav.py index 1582409c..bdff3b51 100644 --- a/misp_modules/modules/expansion/clamav.py +++ b/misp_modules/modules/expansion/clamav.py @@ -1,6 +1,5 @@ import base64 import io -import json import logging import sys import zipfile @@ -43,7 +42,7 @@ def create_response(original_attribute: dict, software: str, signature: Optional av_signature_object.add_reference(original_attribute["uuid"], "belongs-to") misp_event.add_object(av_signature_object) - event = json.loads(misp_event.to_json()) + event = misp_event.to_dict() results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} return {"results": results} @@ -58,12 +57,7 @@ def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket: raise Exception("ClamAV connection string is invalid. It must be unix socket path with 'unix://' prefix or IP:PORT.") -def handler(q=False): - if q is False: - return False - - request = json.loads(q) - +def dict_handler(request: dict): connection_string: str = request["config"].get("connection") if not connection_string: return {"error": "No ClamAV connection string provided"} diff --git a/misp_modules/modules/expansion/virustotal.py b/misp_modules/modules/expansion/virustotal.py index 93d09666..29f05500 100644 --- a/misp_modules/modules/expansion/virustotal.py +++ b/misp_modules/modules/expansion/virustotal.py @@ -1,4 +1,3 @@ -import json from urllib.parse import urlparse import vt from . import check_input_attribute, standard_error_message @@ -45,7 +44,7 @@ class VirusTotalParser: self.input_types_mapping[self.attribute.type](self.attribute.value) def get_result(self) -> dict: - event = json.loads(self.misp_event.to_json()) + event = self.misp_event.to_dict() results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} return {'results': results} @@ -257,10 +256,7 @@ def parse_error(status_code: int) -> str: return "VirusTotal may not be accessible." -def handler(q=False): - if q is False: - return False - request = json.loads(q) +def dict_handler(request: dict): if not request.get('config') or not request['config'].get('apikey'): misperrors['error'] = 'A VirusTotal api key is required for this module.' return misperrors diff --git a/misp_modules/modules/import_mod/csvimport.py b/misp_modules/modules/import_mod/csvimport.py index 6bd79b71..8f4a643d 100644 --- a/misp_modules/modules/import_mod/csvimport.py +++ b/misp_modules/modules/import_mod/csvimport.py @@ -1,10 +1,6 @@ -# -*- coding: utf-8 -*- from pymisp import MISPEvent, MISPObject -from pymisp import __path__ as pymisp_path import csv import io -import json -import os import base64 misperrors = {'error': 'Error'} @@ -33,7 +29,7 @@ misp_context_additional_fields = ['event_info', 'event_member_org', 'event_sourc misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fields -class CsvParser(): +class CsvParser: def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories): self.misp_event = MISPEvent() self.header = header @@ -77,7 +73,7 @@ class CsvParser(): return {'error': 'In order to import MISP objects, an object relation for each attribute contained in an object is required.'} self.__build_misp_event(attribute_indexes, object_indexes) else: - attribute_fields = attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9] + attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9] attribute_indexes = [] types_indexes = [] for i in range(len(self.header)): @@ -236,7 +232,7 @@ class CsvParser(): return score def __finalize_results(self): - event = json.loads(self.misp_event.to_json()) + event = self.misp_event.to_dict() self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])} @@ -252,10 +248,7 @@ def __standard_parsing(data): return list(tuple(part.strip() for part in line) for line in csv.reader(io.TextIOWrapper(io.BytesIO(data.encode()), encoding='utf-8')) if line and not line[0].startswith('#')) -def handler(q=False): - if q is False: - return False - request = json.loads(q) +def dict_handler(request: dict): if request.get('data'): try: data = base64.b64decode(request['data']).decode('utf-8') @@ -282,12 +275,11 @@ def handler(q=False): del data[0] if header == misp_standard_csv_header or header == misp_extended_csv_header: header = misp_standard_csv_header - descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json') - with open(descFilename, 'r') as f: - description = json.loads(f.read())['result'] - MISPtypes = description['types'] + + description = MISPEvent().describe_types + misp_types = description['types'] for h in header: - if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))): + if not any((h in misp_types, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))): misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h) return misperrors from_misp = all((h in misp_extended_csv_header or h in ('', ' ', '_', 'object_id') for h in header)) @@ -300,7 +292,7 @@ def handler(q=False): wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header) misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types)) return misperrors - csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories']) + csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, misp_types, description['categories']) # build the attributes result = csv_parser.parse_csv() if 'error' in result: diff --git a/misp_modules/modules/import_mod/email_import.py b/misp_modules/modules/import_mod/email_import.py index 3ebf3a27..bad4f6a6 100644 --- a/misp_modules/modules/import_mod/email_import.py +++ b/misp_modules/modules/import_mod/email_import.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- - import json import base64 import zipfile @@ -33,12 +31,7 @@ moduleconfig = ["unzip_attachments", "extract_urls"] -def handler(q=False): - if q is False: - return False - - # Decode and parse email - request = json.loads(q) +def dict_handler(request: dict): # request data is always base 64 byte encoded data = base64.b64decode(request["data"]) @@ -51,18 +44,18 @@ def handler(q=False): # Do we unzip attachments we find? unzip = config.get("unzip_attachments", None) - if (unzip is not None and unzip.lower() in acceptable_config_yes): + if unzip is not None and unzip.lower() in acceptable_config_yes: unzip = True # Do we try to find passwords for protected zip files? zip_pass_crack = config.get("guess_zip_attachment_passwords", None) - if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes): + if zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes: zip_pass_crack = True password_list = get_zip_passwords(email_object.email) # Do we extract URL's from the email. extract_urls = config.get("extract_urls", None) - if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes): + if extract_urls is not None and extract_urls.lower() in acceptable_config_yes: extract_urls = True file_objects = [] # All possible file objects @@ -81,12 +74,12 @@ def handler(q=False): # Attempt to unzip the attachment and return its files if unzip and temp_filename.suffix[1:] not in zipped_files: try: - unzip_attachement(attachment_name, attachment, email_object, file_objects) + unzip_attachment(attachment_name, attachment, email_object, file_objects) except RuntimeError: # File is encrypted with a password if zip_pass_crack is True: password = test_zip_passwords(attachment, password_list) if password: - unzip_attachement(attachment_name, attachment, email_object, file_objects, password) + unzip_attachment(attachment_name, attachment, email_object, file_objects, password) else: # Inform the analyst that we could not crack password f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False) f_object.comment = "Encrypted Zip: Password could not be cracked from message" @@ -125,14 +118,14 @@ def handler(q=False): file_objects.append(url_object) email_object.add_reference(url_object.uuid, 'includes', 'URL in email body') - objects = [email_object.to_json()] + objects = [email_object.to_dict()] if file_objects: - objects += [o.to_json() for o in file_objects if o] - r = {'results': {'Object': [json.loads(o) for o in objects]}} + objects += [o.to_dict() for o in file_objects if o] + r = {'results': {'Object': objects}} return r -def unzip_attachement(filename, data, email_object, file_objects, password=None): +def unzip_attachment(filename, data, email_object, file_objects, password=None): """Extract the contents of a zipfile. Args: @@ -289,4 +282,4 @@ def version(): if __name__ == '__main__': with open('tests/test_no_attach.eml', 'r') as email_file: - handler(q=email_file.read()) + dict_handler(json.loads(email_file.read())) diff --git a/tests/test_expansions.py b/tests/test_expansions.py index 17563b63..8099a62c 100644 --- a/tests/test_expansions.py +++ b/tests/test_expansions.py @@ -28,12 +28,15 @@ class TestExpansions(unittest.TestCase): return requests.post(urljoin(self.url, "query"), json=query) @staticmethod - def get_attribute(response): + def get_attribute_types(response): data = response.json() if not isinstance(data, dict): print(json.dumps(data, indent=2)) return data - return data['results']['Attribute'][0]['type'] + types = [] + for attribute in data['results']['Attribute']: + types.append(attribute['type']) + return types @staticmethod def get_data(response): @@ -52,7 +55,18 @@ class TestExpansions(unittest.TestCase): return data['error'] @staticmethod - def get_object(response): + def get_object_types(response): + data = response.json() + if not isinstance(data, dict): + print(json.dumps(data, indent=2)) + return data + names = [] + for obj in data['results']['Object']: + names.append(obj['name']) + return names + + @staticmethod + def get_first_object_type(response): data = response.json() if not isinstance(data, dict): print(json.dumps(data, indent=2)) @@ -74,6 +88,8 @@ class TestExpansions(unittest.TestCase): return data['results'][0]['values'] def test_apiosintds(self): + self.skipTest("apiosintds is probably broken") + query = {'module': 'apiosintds', 'ip-dst': '10.10.10.10'} response = self.misp_modules_post(query) @@ -93,7 +109,7 @@ class TestExpansions(unittest.TestCase): query['config'] = self.configs[module_name] response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), 'dns-record') + self.assertEqual(self.get_first_object_type(response), 'dns-record') except Exception: self.assertTrue(self.get_errors(response).startswith('You do not have enough APIVoid credits')) else: @@ -110,7 +126,7 @@ class TestExpansions(unittest.TestCase): } } response = self.misp_modules_post(query) - self.assertEqual(self.get_object(response), 'asn') + self.assertEqual(self.get_first_object_type(response), 'asn') def test_btc_steroids(self): if LiveCI: @@ -140,7 +156,7 @@ class TestExpansions(unittest.TestCase): query['config'] = self.configs[module_name] response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), 'passive-dns') + self.assertEqual(self.get_first_object_type(response), 'passive-dns') except Exception: self.assertTrue(self.get_errors(response).startswith('There is an authentication error')) else: @@ -158,7 +174,7 @@ class TestExpansions(unittest.TestCase): query['config'] = self.configs[module_name] response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), 'x509') + self.assertEqual(self.get_first_object_type(response), 'x509') except Exception: self.assertTrue(self.get_errors(response).startswith('There is an authentication error')) else: @@ -188,7 +204,7 @@ class TestExpansions(unittest.TestCase): "config": {}} response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), 'vulnerability') + self.assertEqual(self.get_first_object_type(response), 'vulnerability') except Exception: print(self.get_errors(response)) @@ -307,7 +323,7 @@ class TestExpansions(unittest.TestCase): "value": "149.13.33.14", "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}} response = self.misp_modules_post(query) - self.assertEqual(self.get_object(response), 'asn') + self.assertEqual(self.get_first_object_type(response), 'asn') def test_ipqs_fraud_and_risk_scoring(self): module_name = "ipqs_fraud_and_risk_scoring" @@ -506,7 +522,7 @@ class TestExpansions(unittest.TestCase): if module_name in self.configs: query['config'] = self.configs[module_name] response = self.misp_modules_post(query) - self.assertEqual(self.get_object(response), 'ip-api-address') + self.assertEqual(self.get_first_object_type(response), 'ip-api-address') else: response = self.misp_modules_post(query) self.assertEqual(self.get_errors(response), 'Shodan authentication is missing') @@ -579,6 +595,7 @@ class TestExpansions(unittest.TestCase): 'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3', 'http://79.118.195.239:1924/.i') results = ('url', 'url', 'file', 'virustotal-report') + for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]): query = {"module": "urlhaus", "attribute": {"type": query_type, @@ -586,7 +603,8 @@ class TestExpansions(unittest.TestCase): "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}} response = self.misp_modules_post(query) print(response.json()) - self.assertEqual(self.get_attribute(response), result) + self.assertIn(result, self.get_attribute_types(response)) + for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]): query = {"module": "urlhaus", "attribute": {"type": query_type, @@ -594,7 +612,7 @@ class TestExpansions(unittest.TestCase): "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}} response = self.misp_modules_post(query) print(response.json()) - self.assertEqual(self.get_object(response), result) + self.assertIn(result, self.get_object_types(response)) def test_urlscan(self): module_name = "urlscan" @@ -639,7 +657,7 @@ class TestExpansions(unittest.TestCase): "config": self.configs[module_name]} response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), result) + self.assertEqual(self.get_first_object_type(response), result) except Exception: self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.") else: @@ -682,7 +700,7 @@ class TestExpansions(unittest.TestCase): "config": self.configs[module_name]} response = self.misp_modules_post(query) try: - self.assertEqual(self.get_object(response), result) + self.assertEqual(self.get_first_object_type(response), result) except Exception: self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.") else: @@ -728,7 +746,7 @@ class TestExpansions(unittest.TestCase): "uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}, "config": self.configs[module_name]} response = self.misp_modules_post(query) - self.assertEqual(self.get_object(response), result) + self.assertEqual(self.get_first_object_type(response), result) else: query = {"module": module_name, "attribute": {"type": query_types[0],