Merge pull request #648 from JakubOnderka/orjson

chg: [internal] Add support for orjson
pull/650/head
Jakub Onderka 2024-01-08 14:45:36 +01:00 committed by GitHub
commit 9dc3fbe10c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 161 additions and 154 deletions

View File

@ -19,16 +19,12 @@ jobs:
- name: Install packages
run: |
sudo apt-get install libpoppler-cpp-dev libzbar0 tesseract-ocr
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache Python dependencies
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ matrix.python-version }}-${{ hashFiles('REQUIREMENTS') }}
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
@ -42,12 +38,17 @@ jobs:
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Run server in background
run: |
misp-modules -l 127.0.0.1 -s 2>error.log &
sleep 5
- name: Check if server is running
run: |
curl -sS localhost:6666/healthcheck
- name: Test with pytest
run: |
# Run server in background
misp-modules -l 127.0.0.1 -s &
sleep 5
# Check if modules are running
curl -sS localhost:6666/modules
# Run tests
pytest tests
- name: Show error log
if: always()
run: |
cat error.log

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Core MISP expansion modules loader and web service
#
@ -23,7 +22,6 @@ import os
import signal
import sys
import importlib
import json
import logging
import fnmatch
import argparse
@ -31,6 +29,11 @@ import re
import datetime
import psutil
try:
import orjson as json
except ImportError:
import json
import tornado.web
import tornado.process
from tornado.ioloop import IOLoop
@ -58,17 +61,13 @@ def handle_signal(sig, frame):
IOLoop.instance().add_callback_from_signal(IOLoop.instance().stop)
def init_logger(level=False):
def init_logger(debug=False):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(stream=sys.stdout)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)
if level:
handler.setLevel(logging.DEBUG)
log.addHandler(handler)
log.setLevel(logging.INFO)
if level:
log.setLevel(logging.DEBUG)
log.propagate = False
log.setLevel(logging.DEBUG if debug else logging.INFO)
return log
@ -89,28 +88,28 @@ def load_helpers(helpersdir):
selftest = hhandlers[helpername].selftest()
if selftest is None:
helpers.append(helpername)
log.info('Helpers loaded {} '.format(filename))
log.info(f'Helpers loaded {filename}')
else:
log.info('Helpers failed {} due to {}'.format(filename, selftest))
log.warning(f'Helpers failed {filename} due to {selftest}')
def load_package_helpers():
if not HAS_PACKAGE_HELPERS:
log.info('Unable to load MISP helpers from package.')
sys.exit()
log.error('Unable to load MISP helpers from package.')
sys.exit(1)
mhandlers = {}
helpers = []
for path, helper in sys.modules.items():
if not path.startswith('misp_modules.helpers.'):
continue
helpername = path.replace('misp_modules.helpers.', '')
mhandlers[helpername] = helper
selftest = mhandlers[helpername].selftest()
helper_name = path.replace('misp_modules.helpers.', '')
mhandlers[helper_name] = helper
selftest = mhandlers[helper_name].selftest()
if selftest is None:
helpers.append(helpername)
log.info('Helper loaded {}'.format(helpername))
helpers.append(helper_name)
log.info(f'Helper loaded {helper_name}')
else:
log.info('Helpers failed {} due to {}'.format(helpername, selftest))
log.warning(f'Helpers failed {helper_name} due to {selftest}')
return mhandlers, helpers
@ -128,49 +127,54 @@ def load_modules(mod_dir):
continue
if filename == '__init__.py':
continue
modulename = filename.split(".")[0]
moduletype = os.path.split(mod_dir)[1]
module_name = filename.split(".")[0]
module_type = os.path.split(mod_dir)[1]
try:
mhandlers[modulename] = importlib.import_module(os.path.basename(root) + '.' + modulename)
mhandlers[module_name] = importlib.import_module(os.path.basename(root) + '.' + module_name)
except Exception as e:
log.warning('MISP modules {0} failed due to {1}'.format(modulename, e))
log.warning(f'MISP modules {module_name} failed due to {e}')
continue
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
mhandlers['type:' + modulename] = moduletype
modules.append(module_name)
log.info(f'MISP modules {module_name} imported')
mhandlers['type:' + module_name] = module_type
return mhandlers, modules
def load_package_modules():
if not HAS_PACKAGE_MODULES:
log.info('Unable to load MISP modules from package.')
sys.exit()
log.error('Unable to load MISP modules from package.')
sys.exit(1)
mhandlers = {}
modules = []
for path, module in sys.modules.items():
r = re.findall(r"misp_modules[.]modules[.](\w+)[.]([^_]\w+)", path)
if r and len(r[0]) == 2:
moduletype, modulename = r[0]
mhandlers[modulename] = module
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
mhandlers['type:' + modulename] = moduletype
module_type, module_name = r[0]
mhandlers[module_name] = module
modules.append(module_name)
log.info(f'MISP modules {module_name} imported')
mhandlers['type:' + module_name] = module_type
return mhandlers, modules
class Healthcheck(tornado.web.RequestHandler):
def get(self):
self.write(b'{"status": true}')
class ListModules(tornado.web.RequestHandler):
global loaded_modules
global mhandlers
def get(self):
ret = []
for module in loaded_modules:
x = {}
x['name'] = module
x['type'] = mhandlers['type:' + module]
x['mispattributes'] = mhandlers[module].introspection()
x['meta'] = mhandlers[module].version()
ret.append(x)
for module_name in loaded_modules:
ret.append({
'name': module_name,
'type': mhandlers['type:' + module_name],
'mispattributes': mhandlers[module_name].introspection(),
'meta': mhandlers[module_name].version()
})
log.debug('MISP ListModules request')
self.write(json.dumps(ret))
@ -183,28 +187,34 @@ class QueryModule(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(nb_threads)
@run_on_executor
def run_request(self, module, jsonpayload):
log.debug('MISP QueryModule request {0}'.format(jsonpayload))
response = mhandlers[module].handler(q=jsonpayload)
def run_request(self, module_name, json_payload, dict_payload):
log.debug('MISP QueryModule %s request %s', module_name, json_payload)
module = mhandlers[module_name]
if getattr(module, "dict_handler", None):
# New method that avoids double JSON decoding, new modules should define dict_handler
response = module.dict_handler(request=dict_payload)
else:
response = module.handler(q=json_payload)
return json.dumps(response)
@tornado.gen.coroutine
def post(self):
try:
jsonpayload = self.request.body.decode('utf-8')
dict_payload = json.loads(jsonpayload)
json_payload = self.request.body
dict_payload = json.loads(json_payload)
if dict_payload.get('timeout'):
timeout = datetime.timedelta(seconds=int(dict_payload.get('timeout')))
else:
timeout = datetime.timedelta(seconds=300)
response = yield tornado.gen.with_timeout(timeout, self.run_request(dict_payload['module'], jsonpayload))
future = self.run_request(dict_payload['module'], json_payload, dict_payload)
response = yield tornado.gen.with_timeout(timeout, future)
self.write(response)
except tornado.gen.TimeoutError:
log.warning('Timeout on {} '.format(dict_payload['module']))
log.warning('Timeout on {}'.format(dict_payload['module']))
self.write(json.dumps({'error': 'Timeout.'}))
except Exception:
self.write(json.dumps({'error': 'Something went wrong, look in the server logs for details'}))
log.exception('Something went wrong:')
log.exception('Something went wrong when processing query request')
finally:
self.finish()
@ -223,20 +233,22 @@ def main():
global loaded_modules
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
argParser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter)
argParser.add_argument('-t', default=False, action='store_true', help='Test mode')
argParser.add_argument('-s', default=False, action='store_true', help='Run a system install (package installed via pip)')
argParser.add_argument('-d', default=False, action='store_true', help='Enable debugging')
argParser.add_argument('-p', default=6666, help='misp-modules TCP port (default 6666)')
argParser.add_argument('-l', default='localhost', help='misp-modules listen address (default localhost)')
argParser.add_argument('-m', default=[], action='append', help='Register a custom module')
argParser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''')
args = argParser.parse_args()
arg_parser = argparse.ArgumentParser(description='misp-modules server', formatter_class=argparse.RawTextHelpFormatter)
arg_parser.add_argument('-t', default=False, action='store_true', help='Test mode')
arg_parser.add_argument('-s', default=False, action='store_true', help='Run a system install (package installed via pip)')
arg_parser.add_argument('-d', default=False, action='store_true', help='Enable debugging')
arg_parser.add_argument('-p', default=6666, help='misp-modules TCP port (default 6666)')
arg_parser.add_argument('-l', default='localhost', help='misp-modules listen address (default localhost)')
arg_parser.add_argument('-m', default=[], action='append', help='Register a custom module')
arg_parser.add_argument('--devel', default=False, action='store_true', help='''Start in development mode, enable debug, start only the module(s) listed in -m.\nExample: -m misp_modules.modules.expansion.bgpranking''')
args = arg_parser.parse_args()
port = args.p
listen = args.l
if args.devel:
log = init_logger(level=True)
log.info('Launch MISP modules server in developement mode. Enable debug, load a list of modules is -m is used.')
log = init_logger(debug=True)
log.info('Launch MISP modules server in development mode. Enable debug, load a list of modules is -m is used.')
if args.m:
mhandlers = {}
modules = []
@ -247,11 +259,11 @@ def main():
mhandlers[modulename] = importlib.import_module(module)
mhandlers['type:' + modulename] = moduletype
modules.append(modulename)
log.info('MISP modules {0} imported'.format(modulename))
log.info(f'MISP modules {modulename} imported')
else:
mhandlers, loaded_modules = _launch_from_current_dir()
else:
log = init_logger(level=args.d)
log = init_logger(debug=args.d)
if args.s:
log.info('Launch MISP modules server from package.')
load_package_helpers()
@ -263,7 +275,11 @@ def main():
mispmod = importlib.import_module(module)
mispmod.register(mhandlers, loaded_modules)
service = [(r'/modules', ListModules), (r'/query', QueryModule)]
service = [
(r'/modules', ListModules),
(r'/query', QueryModule),
(r'/healthcheck', Healthcheck),
]
application = tornado.web.Application(service)
try:
@ -279,14 +295,14 @@ def main():
print("\nmisp-modules is still running as PID: {}\n".format(pid))
print("Please kill accordingly:")
print("sudo kill {}".format(pid))
sys.exit(-1)
return 1
print(e)
print("misp-modules might still be running.")
log.info('MISP modules server started on {0} port {1}'.format(listen, port))
log.info(f'MISP modules server started on {listen} port {port}')
if args.t:
log.info('MISP modules started in test-mode, quitting immediately.')
sys.exit()
return 0
try:
IOLoop.instance().start()
finally:

View File

@ -127,10 +127,11 @@ def handler(q=False):
try:
response = apiosintDS.request(entities=tosubmit, stix=submit_stix, cache=submitcache, cachedirectory=submitcache_directory, cachetimeout=submitcache_timeout, verbose=True, localdirectory=sumbit_localdirectory)
r["results"] += apiosintParserHover(persistent, response, import_related, submit_stix)
except ValueError as e:
log.debug(str(e))
misperrors['error'] = str(e)
return r
return r
except Exception as e:
log.exception("Could not process apiosintDS")
return {'error': str(e)}
def apiosintParserHover(ispersistent, response, import_related, stix):
apiosinttype = ['hash', 'ip', 'url', 'domain']

View File

@ -1,4 +1,3 @@
import json
import pypdns
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
@ -10,7 +9,7 @@ moduleinfo = {'version': '0.2', 'author': 'Alexandre Dulaunoy',
moduleconfig = ['username', 'password']
class PassiveDNSParser():
class PassiveDNSParser:
def __init__(self, attribute, authentication):
self.misp_event = MISPEvent()
self.attribute = MISPAttribute()
@ -21,7 +20,7 @@ class PassiveDNSParser():
def get_results(self):
if hasattr(self, 'result'):
return self.result
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object')}
return {'results': results}
@ -50,10 +49,7 @@ class PassiveDNSParser():
self.misp_event.add_object(**pdns_object)
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if not request.get('config'):
return {'error': 'CIRCL Passive DNS authentication is missing.'}
if not request['config'].get('username') or not request['config'].get('password'):

View File

@ -1,6 +1,5 @@
import base64
import io
import json
import logging
import sys
import zipfile
@ -43,7 +42,7 @@ def create_response(original_attribute: dict, software: str, signature: Optional
av_signature_object.add_reference(original_attribute["uuid"], "belongs-to")
misp_event.add_object(av_signature_object)
event = json.loads(misp_event.to_json())
event = misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {"results": results}
@ -58,12 +57,7 @@ def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
raise Exception("ClamAV connection string is invalid. It must be unix socket path with 'unix://' prefix or IP:PORT.")
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
connection_string: str = request["config"].get("connection")
if not connection_string:
return {"error": "No ClamAV connection string provided"}

View File

@ -1,4 +1,3 @@
import json
from urllib.parse import urlparse
import vt
from . import check_input_attribute, standard_error_message
@ -45,7 +44,7 @@ class VirusTotalParser:
self.input_types_mapping[self.attribute.type](self.attribute.value)
def get_result(self) -> dict:
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {'results': results}
@ -257,10 +256,7 @@ def parse_error(status_code: int) -> str:
return "VirusTotal may not be accessible."
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if not request.get('config') or not request['config'].get('apikey'):
misperrors['error'] = 'A VirusTotal api key is required for this module.'
return misperrors

View File

@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
from pymisp import MISPEvent, MISPObject
from pymisp import __path__ as pymisp_path
import csv
import io
import json
import os
import base64
misperrors = {'error': 'Error'}
@ -33,7 +29,7 @@ misp_context_additional_fields = ['event_info', 'event_member_org', 'event_sourc
misp_extended_csv_header = misp_standard_csv_header + misp_context_additional_fields
class CsvParser():
class CsvParser:
def __init__(self, header, has_header, delimiter, data, from_misp, MISPtypes, categories):
self.misp_event = MISPEvent()
self.header = header
@ -77,7 +73,7 @@ class CsvParser():
return {'error': 'In order to import MISP objects, an object relation for each attribute contained in an object is required.'}
self.__build_misp_event(attribute_indexes, object_indexes)
else:
attribute_fields = attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_fields = misp_standard_csv_header[:1] + misp_standard_csv_header[2:9]
attribute_indexes = []
types_indexes = []
for i in range(len(self.header)):
@ -236,7 +232,7 @@ class CsvParser():
return score
def __finalize_results(self):
event = json.loads(self.misp_event.to_json())
event = self.misp_event.to_dict()
self.results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
@ -252,10 +248,7 @@ def __standard_parsing(data):
return list(tuple(part.strip() for part in line) for line in csv.reader(io.TextIOWrapper(io.BytesIO(data.encode()), encoding='utf-8')) if line and not line[0].startswith('#'))
def handler(q=False):
if q is False:
return False
request = json.loads(q)
def dict_handler(request: dict):
if request.get('data'):
try:
data = base64.b64decode(request['data']).decode('utf-8')
@ -282,12 +275,11 @@ def handler(q=False):
del data[0]
if header == misp_standard_csv_header or header == misp_extended_csv_header:
header = misp_standard_csv_header
descFilename = os.path.join(pymisp_path[0], 'data/describeTypes.json')
with open(descFilename, 'r') as f:
description = json.loads(f.read())['result']
MISPtypes = description['types']
description = MISPEvent().describe_types
misp_types = description['types']
for h in header:
if not any((h in MISPtypes, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
if not any((h in misp_types, h in misp_extended_csv_header, h in ('', ' ', '_', 'object_id'))):
misperrors['error'] = 'Wrong header field: {}. Please use a header value that can be recognized by MISP (or alternatively skip it using a whitespace).'.format(h)
return misperrors
from_misp = all((h in misp_extended_csv_header or h in ('', ' ', '_', 'object_id') for h in header))
@ -300,7 +292,7 @@ def handler(q=False):
wrong_types = tuple(wrong_type for wrong_type in ('type', 'value') if wrong_type in header)
misperrors['error'] = 'Error with the following header: {}. It contains the following field(s): {}, which is(are) already provided by the usage of at least on MISP attribute type in the header.'.format(header, 'and'.join(wrong_types))
return misperrors
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, MISPtypes, description['categories'])
csv_parser = CsvParser(header, has_header, delimiter, data, from_misp, misp_types, description['categories'])
# build the attributes
result = csv_parser.parse_csv()
if 'error' in result:

View File

@ -1,6 +1,4 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import base64
import zipfile
@ -33,12 +31,7 @@ moduleconfig = ["unzip_attachments",
"extract_urls"]
def handler(q=False):
if q is False:
return False
# Decode and parse email
request = json.loads(q)
def dict_handler(request: dict):
# request data is always base 64 byte encoded
data = base64.b64decode(request["data"])
@ -51,18 +44,18 @@ def handler(q=False):
# Do we unzip attachments we find?
unzip = config.get("unzip_attachments", None)
if (unzip is not None and unzip.lower() in acceptable_config_yes):
if unzip is not None and unzip.lower() in acceptable_config_yes:
unzip = True
# Do we try to find passwords for protected zip files?
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
if zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes:
zip_pass_crack = True
password_list = get_zip_passwords(email_object.email)
# Do we extract URL's from the email.
extract_urls = config.get("extract_urls", None)
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
if extract_urls is not None and extract_urls.lower() in acceptable_config_yes:
extract_urls = True
file_objects = [] # All possible file objects
@ -81,12 +74,12 @@ def handler(q=False):
# Attempt to unzip the attachment and return its files
if unzip and temp_filename.suffix[1:] not in zipped_files:
try:
unzip_attachement(attachment_name, attachment, email_object, file_objects)
unzip_attachment(attachment_name, attachment, email_object, file_objects)
except RuntimeError: # File is encrypted with a password
if zip_pass_crack is True:
password = test_zip_passwords(attachment, password_list)
if password:
unzip_attachement(attachment_name, attachment, email_object, file_objects, password)
unzip_attachment(attachment_name, attachment, email_object, file_objects, password)
else: # Inform the analyst that we could not crack password
f_object, main_object, sections = make_binary_objects(pseudofile=attachment, filename=attachment_name, standalone=False)
f_object.comment = "Encrypted Zip: Password could not be cracked from message"
@ -125,14 +118,14 @@ def handler(q=False):
file_objects.append(url_object)
email_object.add_reference(url_object.uuid, 'includes', 'URL in email body')
objects = [email_object.to_json()]
objects = [email_object.to_dict()]
if file_objects:
objects += [o.to_json() for o in file_objects if o]
r = {'results': {'Object': [json.loads(o) for o in objects]}}
objects += [o.to_dict() for o in file_objects if o]
r = {'results': {'Object': objects}}
return r
def unzip_attachement(filename, data, email_object, file_objects, password=None):
def unzip_attachment(filename, data, email_object, file_objects, password=None):
"""Extract the contents of a zipfile.
Args:
@ -289,4 +282,4 @@ def version():
if __name__ == '__main__':
with open('tests/test_no_attach.eml', 'r') as email_file:
handler(q=email_file.read())
dict_handler(json.loads(email_file.read()))

View File

@ -28,12 +28,15 @@ class TestExpansions(unittest.TestCase):
return requests.post(urljoin(self.url, "query"), json=query)
@staticmethod
def get_attribute(response):
def get_attribute_types(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
return data
return data['results']['Attribute'][0]['type']
types = []
for attribute in data['results']['Attribute']:
types.append(attribute['type'])
return types
@staticmethod
def get_data(response):
@ -52,7 +55,18 @@ class TestExpansions(unittest.TestCase):
return data['error']
@staticmethod
def get_object(response):
def get_object_types(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
return data
names = []
for obj in data['results']['Object']:
names.append(obj['name'])
return names
@staticmethod
def get_first_object_type(response):
data = response.json()
if not isinstance(data, dict):
print(json.dumps(data, indent=2))
@ -74,6 +88,8 @@ class TestExpansions(unittest.TestCase):
return data['results'][0]['values']
def test_apiosintds(self):
self.skipTest("apiosintds is probably broken")
query = {'module': 'apiosintds', 'ip-dst': '10.10.10.10'}
response = self.misp_modules_post(query)
@ -93,7 +109,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'dns-record')
self.assertEqual(self.get_first_object_type(response), 'dns-record')
except Exception:
self.assertTrue(self.get_errors(response).startswith('You do not have enough APIVoid credits'))
else:
@ -110,7 +126,7 @@ class TestExpansions(unittest.TestCase):
}
}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'asn')
self.assertEqual(self.get_first_object_type(response), 'asn')
def test_btc_steroids(self):
if LiveCI:
@ -140,7 +156,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'passive-dns')
self.assertEqual(self.get_first_object_type(response), 'passive-dns')
except Exception:
self.assertTrue(self.get_errors(response).startswith('There is an authentication error'))
else:
@ -158,7 +174,7 @@ class TestExpansions(unittest.TestCase):
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'x509')
self.assertEqual(self.get_first_object_type(response), 'x509')
except Exception:
self.assertTrue(self.get_errors(response).startswith('There is an authentication error'))
else:
@ -188,7 +204,7 @@ class TestExpansions(unittest.TestCase):
"config": {}}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), 'vulnerability')
self.assertEqual(self.get_first_object_type(response), 'vulnerability')
except Exception:
print(self.get_errors(response))
@ -307,7 +323,7 @@ class TestExpansions(unittest.TestCase):
"value": "149.13.33.14",
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'asn')
self.assertEqual(self.get_first_object_type(response), 'asn')
def test_ipqs_fraud_and_risk_scoring(self):
module_name = "ipqs_fraud_and_risk_scoring"
@ -506,7 +522,7 @@ class TestExpansions(unittest.TestCase):
if module_name in self.configs:
query['config'] = self.configs[module_name]
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), 'ip-api-address')
self.assertEqual(self.get_first_object_type(response), 'ip-api-address')
else:
response = self.misp_modules_post(query)
self.assertEqual(self.get_errors(response), 'Shodan authentication is missing')
@ -579,6 +595,7 @@ class TestExpansions(unittest.TestCase):
'a04ac6d98ad989312783d4fe3456c53730b212c79a426fb215708b6c6daa3de3',
'http://79.118.195.239:1924/.i')
results = ('url', 'url', 'file', 'virustotal-report')
for query_type, query_value, result in zip(query_types[:2], query_values[:2], results[:2]):
query = {"module": "urlhaus",
"attribute": {"type": query_type,
@ -586,7 +603,8 @@ class TestExpansions(unittest.TestCase):
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
print(response.json())
self.assertEqual(self.get_attribute(response), result)
self.assertIn(result, self.get_attribute_types(response))
for query_type, query_value, result in zip(query_types[2:], query_values[2:], results[2:]):
query = {"module": "urlhaus",
"attribute": {"type": query_type,
@ -594,7 +612,7 @@ class TestExpansions(unittest.TestCase):
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"}}
response = self.misp_modules_post(query)
print(response.json())
self.assertEqual(self.get_object(response), result)
self.assertIn(result, self.get_object_types(response))
def test_urlscan(self):
module_name = "urlscan"
@ -639,7 +657,7 @@ class TestExpansions(unittest.TestCase):
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
except Exception:
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
else:
@ -682,7 +700,7 @@ class TestExpansions(unittest.TestCase):
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
try:
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
except Exception:
self.assertEqual(self.get_errors(response), "VirusTotal request rate limit exceeded.")
else:
@ -728,7 +746,7 @@ class TestExpansions(unittest.TestCase):
"uuid": "ea89a33b-4ab7-4515-9f02-922a0bee333d"},
"config": self.configs[module_name]}
response = self.misp_modules_post(query)
self.assertEqual(self.get_object(response), result)
self.assertEqual(self.get_first_object_type(response), result)
else:
query = {"module": module_name,
"attribute": {"type": query_types[0],