add: Specific error message for misp_standard format expansion modules

- Checking if the input format is respected and
  displaying an error message if it is not
pull/420/head
chrisr3d 2020-07-28 11:47:53 +02:00
parent 6d528628c7
commit 3b7a5c4dc2
No known key found for this signature in database
GPG Key ID: 6BBED1B63A6D639F
20 changed files with 107 additions and 24 deletions

View File

@ -19,3 +19,12 @@ __all__ = ['cuckoo_submit', 'vmray_submit', 'bgpranking', 'circl_passivedns', 'c
'assemblyline_submit', 'assemblyline_query', 'ransomcoindb', 'malwarebazaar',
'lastline_query', 'lastline_submit', 'sophoslabs_intelix', 'cytomic_orion', 'censys_enrich',
'trustar_enrich', 'recordedfuture']
minimum_required_fields = ('type', 'uuid', 'value')
checking_error = 'containing at least a "type" field and a "value" field'
standard_error_message = 'This module requires an "attribute" field as input'
def check_input_attribute(attribute, requirements=minimum_required_fields):
return all(feature in attribute for feature in requirements)

View File

@ -1,5 +1,6 @@
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
@ -74,7 +75,11 @@ def handler(q=False):
request = json.loads(q)
if not request.get('config', {}).get('apikey'):
return {'error': 'An API key for APIVoid is required.'}
attribute = request.get('attribute')
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
apikey = request['config']['apikey']
apivoid_parser = APIVoidParser(attribute)
apivoid_parser.parse_domain(apikey)

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import json
from . import check_input_attribute, standard_error_message
from assemblyline_client import Client, ClientError
from collections import defaultdict
from pymisp import MISPAttribute, MISPEvent, MISPObject
@ -139,6 +140,10 @@ def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
if not request.get('config'):
return {"error": "Missing configuration."}
if not request['config'].get('apiurl'):

View File

@ -3,6 +3,7 @@ import json
import base64
import codecs
from dateutil.parser import isoparse
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
try:
import censys.base
@ -36,11 +37,11 @@ def handler(q=False):
api_id = request['config']['api_id']
api_secret = request['config']['api_secret']
if not request.get('attribute'):
return {'error': 'Unsupported input.'}
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if not any(input_type == attribute['type'] for input_type in mispattributes['input']):
return {'error': 'Unsupported attributes type'}
return {'error': 'Unsupported attribute type.'}
attribute = MISPAttribute()
attribute.from_dict(**request['attribute'])

View File

@ -1,5 +1,6 @@
import json
import pypdns
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
mispattributes = {'input': ['hostname', 'domain', 'ip-src', 'ip-dst', 'ip-src|port', 'ip-dst|port'], 'format': 'misp_standard'}
@ -58,11 +59,11 @@ def handler(q=False):
if not request['config'].get('username') or not request['config'].get('password'):
return {'error': 'CIRCL Passive DNS authentication is incomplete, please provide your username and password.'}
authentication = (request['config']['username'], request['config']['password'])
if not request.get('attribute'):
return {'error': 'Unsupported input.'}
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if not any(input_type == attribute['type'] for input_type in mispattributes['input']):
return {'error': 'Unsupported attributes type'}
return {'error': 'Unsupported attribute type.'}
pdns_parser = PassiveDNSParser(attribute, authentication)
pdns_parser.parse()
return pdns_parser.get_results()

View File

@ -1,5 +1,6 @@
import json
import pypssl
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
mispattributes = {'input': ['ip-src', 'ip-dst', 'ip-src|port', 'ip-dst|port'], 'format': 'misp_standard'}
@ -83,11 +84,11 @@ def handler(q=False):
if not request['config'].get('username') or not request['config'].get('password'):
return {'error': 'CIRCL Passive SSL authentication is incomplete, please provide your username and password.'}
authentication = (request['config']['username'], request['config']['password'])
if not request.get('attribute'):
return {'error': 'Unsupported input.'}
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if not any(input_type == attribute['type'] for input_type in mispattributes['input']):
return {'error': 'Unsupported attributes type'}
return {'error': 'Unsupported attribute type.'}
pssl_parser = PassiveSSLParser(attribute, authentication)
pssl_parser.parse()
return pssl_parser.get_results()

View File

@ -1,7 +1,8 @@
from collections import defaultdict
from pymisp import MISPEvent, MISPObject
import json
import requests
from . import check_input_attribute, standard_error_message
from collections import defaultdict
from pymisp import MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['vulnerability'], 'format': 'misp_standard'}
@ -108,7 +109,8 @@ def handler(q=False):
if q is False:
return False
request = json.loads(q)
attribute = request.get('attribute')
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if attribute.get('type') != 'vulnerability':
misperrors['error'] = 'Vulnerability id missing.'
return misperrors

View File

@ -7,6 +7,7 @@ An expansion module to enrich attributes in MISP and share indicators of comprom
'''
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
import json
import requests
@ -146,9 +147,10 @@ def handler(q=False):
if not request.get('attribute'):
return {'error': 'Unsupported input.'}
attribute = request['attribute']
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if not any(input_type == attribute['type'] for input_type in mispattributes['input']):
return {'error': 'Unsupported attributes type'}
return {'error': 'Unsupported attribute type.'}
if not request.get('config'):
return {'error': 'Missing configuration'}

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
from . import check_input_attribute, standard_error_message
from pyipasnhistory import IPASNHistory
from pymisp import MISPAttribute, MISPEvent, MISPObject
@ -34,11 +35,11 @@ def handler(q=False):
if q is False:
return False
request = json.loads(q)
if request.get('attribute') and request['attribute'].get('type') in mispattributes['input']:
toquery = request['attribute']['value']
else:
misperrors['error'] = "Unsupported attributes type"
return misperrors
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
toquery = request['attribute']['value']
ipasn = IPASNHistory()
values = ipasn.query(toquery)

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import jbxapi
import json
from . import check_input_attribute, checking_error, standard_error_message
from joe_parser import JoeParser
misperrors = {'error': 'Error'}
@ -27,6 +28,10 @@ def handler(q=False):
if not apikey:
return {'error': 'No API key provided'}
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error} that is the link to the Joe Sandbox report.'}
if request['attribute']['type'] != 'link':
return {'error': 'Unsupported attribute type.'}
url = request['attribute']['value']
if "/submissions/" not in url:
return {'error': "The URL does not point to a Joe Sandbox analysis."}

View File

@ -3,8 +3,8 @@
Module (type "expansion") to query a Lastline report from an analysis link.
"""
import json
import lastline_api
from . import check_input_attribute, checking_error, standard_error_message
misperrors = {
@ -52,6 +52,8 @@ def handler(q=False):
try:
config = request["config"]
auth_data = lastline_api.LastlineAbstractClient.get_login_params_from_dict(config)
if not request.get('attribute') or not request['attribute'].get('value'):
return {'error': f'{standard_error_message}, {checking_error} that is the link to a Lastline analysis.'}
analysis_link = request['attribute']['value']
# The API url changes based on the analysis link host name
api_url = lastline_api.get_portal_url_from_task_link(analysis_link)

View File

@ -1,5 +1,6 @@
import json
import requests
from . import check_input_attribute, checking_error, standard_error_message
from pymisp import MISPEvent, MISPObject
mispattributes = {'input': ['md5', 'sha1', 'sha256'],
@ -34,7 +35,11 @@ def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error} that is the hash to submit to Malware Bazaar.'}
attribute = request['attribute']
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
url = 'https://mb-api.abuse.ch/api/v1/'
response = requests.post(url, data={'query': 'get_info', 'hash': attribute['value']}).json()
query_status = response['query_status']

View File

@ -1,4 +1,5 @@
import json
from . import check_input_attribute, checking_error, standard_error_message
from ._ransomcoindb import ransomcoindb
from pymisp import MISPObject
@ -28,6 +29,10 @@ def handler(q=False):
q = json.loads(q)
if "config" not in q or "api-key" not in q["config"]:
return {"error": "Ransomcoindb API key is missing"}
if not q.get('attribute') or not check_input_attribute(attribute, requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error}.'}
if q['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
api_key = q["config"]["api-key"]
r = {"results": []}

View File

@ -1,6 +1,7 @@
import json
import logging
import requests
from . import check_input_attribute, checking_error, standard_error_message
from urllib.parse import quote
from pymisp import MISPAttribute, MISPEvent, MISPTag, MISPObject
@ -257,6 +258,10 @@ def handler(q=False):
else:
misperrors['error'] = 'Missing Recorded Future token.'
return misperrors
if not request.get('attribute') or not check_input_attribute(request['atttribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error}.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
input_attribute = request.get('attribute')
rf_enricher = RFEnricher(token, input_attribute)

View File

@ -1,3 +1,4 @@
from. import check_input_attribute, checking_error, standard_error_message
from pymisp import MISPEvent, MISPObject
import json
import requests
@ -105,6 +106,12 @@ def handler(q=False):
misperrors['error'] = "Missing client_id or client_secret value for SOPHOSLabs Intelix. \
It's free to sign up here https://aws.amazon.com/marketplace/pp/B07SLZPMCS."
return misperrors
to_check = (('type', 'value'), ('type', 'value1'))
if not request.get('attribute') or not any(check_input_attribute(request['attribute'], requirements=check) for check in to_check):
return {'error': f'{standard_error_message}, {checking_error}.'}
attribute = request['attribute']
if attribute['type'] not in misp_types_in:
return {'error': 'Unsupported attribute type.'}
client = SophosLabsApi(j['config']['client_id'], j['config']['client_secret'])
if j['attribute']['type'] == "sha256":
client.hash_lookup(j['attribute']['value1'])

View File

@ -1,5 +1,6 @@
import json
import pymisp
from . import check_input_attribute, checking_error, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
from trustar import TruStar
@ -110,7 +111,11 @@ def handler(q=False):
misperrors['error'] = "Your TruSTAR API key and secret are required for indicator enrichment."
return misperrors
if not request.get('attribute') or not check_input_attribute(request['attribute'], requirements=('type', 'value')):
return {'error': f'{standard_error_message}, {checking_error}.'}
attribute = request['attribute']
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
trustar_parser = TruSTARParser(attribute, config)
try:

View File

@ -1,6 +1,8 @@
from pymisp import MISPAttribute, MISPEvent, MISPObject
# -*- coding: utf-8 -*-
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['domain', 'hostname', 'ip-src', 'ip-dst', 'md5', 'sha256', 'url'],
@ -134,7 +136,11 @@ def handler(q=False):
if q is False:
return False
request = json.loads(q)
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
urlhaus_parser = _misp_type_mapping[attribute['type']](attribute)
return urlhaus_parser.query_api()

View File

@ -1,6 +1,7 @@
from pymisp import MISPAttribute, MISPEvent, MISPObject
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['hostname', 'domain', "ip-src", "ip-dst", "md5", "sha1", "sha256", "url"],
@ -195,6 +196,11 @@ def handler(q=False):
if not request.get('config') or not request['config'].get('apikey'):
misperrors['error'] = "A VirusTotal api key is required for this module."
return misperrors
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
event_limit = request['config'].get('event_limit')
if not isinstance(event_limit, int):
event_limit = 5

View File

@ -1,6 +1,7 @@
from pymisp import MISPAttribute, MISPEvent, MISPObject
import json
import requests
from . import check_input_attribute, standard_error_message
from pymisp import MISPAttribute, MISPEvent, MISPObject
misperrors = {'error': 'Error'}
mispattributes = {'input': ['hostname', 'domain', "ip-src", "ip-dst", "md5", "sha1", "sha256", "url"],
@ -174,7 +175,11 @@ def handler(q=False):
if not request.get('config') or not request['config'].get('apikey'):
misperrors['error'] = "A VirusTotal api key is required for this module."
return misperrors
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
attribute = request['attribute']
if attribute['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
query_type, to_call = misp_type_mapping[attribute['type']]
parser = to_call(request['config']['apikey'], attribute)
query_result = parser.get_query_result(query_type)

View File

@ -1,6 +1,7 @@
import requests
import json
import sys
from . import check_input_attribute, standard_error_message
from collections import defaultdict
from pymisp import MISPAttribute, MISPEvent, MISPObject
from requests.auth import HTTPBasicAuth
@ -160,6 +161,10 @@ def handler(q=False):
return misperrors
key = request["config"]["apikey"]
password = request['config']['apipassword']
if not request.get('attribute') or not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message} which should contain at least a type, a value and an uuid.'}
if request['attribute']['type'] not in mispattributes['input']:
return {'error': 'Unsupported attribute type.'}
parser = XforceExchange(request['attribute'], key, password)
parser.parse()
return parser.get_result()